Telemetry in metainfo implementation (#670)
This commit is contained in:
parent
7958994ae2
commit
f730ce451a
@ -11,7 +11,9 @@ import (
|
||||
)
|
||||
|
||||
// CreateBucket creates a new bucket with the specified information
|
||||
func (db *DB) CreateBucket(ctx context.Context, bucket string, info *storj.Bucket) (storj.Bucket, error) {
|
||||
func (db *DB) CreateBucket(ctx context.Context, bucket string, info *storj.Bucket) (bucketInfo storj.Bucket, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if bucket == "" {
|
||||
return storj.Bucket{}, storj.ErrNoBucket.New("")
|
||||
}
|
||||
@ -25,7 +27,9 @@ func (db *DB) CreateBucket(ctx context.Context, bucket string, info *storj.Bucke
|
||||
}
|
||||
|
||||
// DeleteBucket deletes bucket
|
||||
func (db *DB) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
func (db *DB) DeleteBucket(ctx context.Context, bucket string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if bucket == "" {
|
||||
return storj.ErrNoBucket.New("")
|
||||
}
|
||||
@ -34,7 +38,9 @@ func (db *DB) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
}
|
||||
|
||||
// GetBucket gets bucket information
|
||||
func (db *DB) GetBucket(ctx context.Context, bucket string) (storj.Bucket, error) {
|
||||
func (db *DB) GetBucket(ctx context.Context, bucket string) (bucketInfo storj.Bucket, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if bucket == "" {
|
||||
return storj.Bucket{}, storj.ErrNoBucket.New("")
|
||||
}
|
||||
@ -48,7 +54,9 @@ func (db *DB) GetBucket(ctx context.Context, bucket string) (storj.Bucket, error
|
||||
}
|
||||
|
||||
// ListBuckets lists buckets
|
||||
func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions) (storj.BucketList, error) {
|
||||
func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions) (list storj.BucketList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var startAfter, endBefore string
|
||||
switch options.Direction {
|
||||
case storj.Before:
|
||||
@ -77,7 +85,7 @@ func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions)
|
||||
return storj.BucketList{}, err
|
||||
}
|
||||
|
||||
list := storj.BucketList{
|
||||
list = storj.BucketList{
|
||||
More: more,
|
||||
Items: make([]storj.Bucket, 0, len(items)),
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package kvmetainfo
|
||||
|
||||
import (
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
@ -15,6 +16,8 @@ import (
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
var errClass = errs.Class("kvmetainfo")
|
||||
|
||||
const defaultSegmentLimit = 8 // TODO
|
||||
|
@ -29,13 +29,18 @@ const (
|
||||
)
|
||||
|
||||
// GetObject returns information about an object
|
||||
func (db *DB) GetObject(ctx context.Context, bucket string, path storj.Path) (storj.Object, error) {
|
||||
_, info, err := db.getInfo(ctx, committedPrefix, bucket, path)
|
||||
func (db *DB) GetObject(ctx context.Context, bucket string, path storj.Path) (info storj.Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
_, info, err = db.getInfo(ctx, committedPrefix, bucket, path)
|
||||
|
||||
return info, err
|
||||
}
|
||||
|
||||
// GetObjectStream returns interface for reading the object stream
|
||||
func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Path) (storj.ReadOnlyStream, error) {
|
||||
func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Path) (stream storj.ReadOnlyStream, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
meta, info, err := db.getInfo(ctx, committedPrefix, bucket, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -55,17 +60,21 @@ func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Pat
|
||||
}
|
||||
|
||||
// CreateObject creates an uploading object and returns an interface for uploading Object information
|
||||
func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path, createInfo *storj.CreateObject) (storj.MutableObject, error) {
|
||||
func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path, createInfo *storj.CreateObject) (object storj.MutableObject, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
// ModifyObject modifies a committed object
|
||||
func (db *DB) ModifyObject(ctx context.Context, bucket string, path storj.Path) (storj.MutableObject, error) {
|
||||
func (db *DB) ModifyObject(ctx context.Context, bucket string, path storj.Path) (object storj.MutableObject, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
// DeleteObject deletes an object from database
|
||||
func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path) error {
|
||||
func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
store, err := db.buckets.GetObjectStore(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -80,17 +89,21 @@ func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path)
|
||||
}
|
||||
|
||||
// ModifyPendingObject creates an interface for updating a partially uploaded object
|
||||
func (db *DB) ModifyPendingObject(ctx context.Context, bucket string, path storj.Path) (storj.MutableObject, error) {
|
||||
func (db *DB) ModifyPendingObject(ctx context.Context, bucket string, path storj.Path) (object storj.MutableObject, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
// ListPendingObjects lists pending objects in bucket based on the ListOptions
|
||||
func (db *DB) ListPendingObjects(ctx context.Context, bucket string, options storj.ListOptions) (storj.ObjectList, error) {
|
||||
func (db *DB) ListPendingObjects(ctx context.Context, bucket string, options storj.ListOptions) (list storj.ObjectList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return storj.ObjectList{}, errors.New("not implemented")
|
||||
}
|
||||
|
||||
// ListObjects lists objects in bucket based on the ListOptions
|
||||
func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.ListOptions) (storj.ObjectList, error) {
|
||||
func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.ListOptions) (list storj.ObjectList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
objects, err := db.buckets.GetObjectStore(ctx, bucket)
|
||||
if err != nil {
|
||||
return storj.ObjectList{}, err
|
||||
@ -124,7 +137,7 @@ func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.List
|
||||
return storj.ObjectList{}, err
|
||||
}
|
||||
|
||||
list := storj.ObjectList{
|
||||
list = storj.ObjectList{
|
||||
Bucket: bucket,
|
||||
Prefix: options.Prefix,
|
||||
More: more,
|
||||
@ -146,7 +159,9 @@ type object struct {
|
||||
streamMeta pb.StreamMeta
|
||||
}
|
||||
|
||||
func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path storj.Path) (object, storj.Object, error) {
|
||||
func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path storj.Path) (obj object, info storj.Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
bucketInfo, err := db.GetBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return object{}, storj.Object{}, err
|
||||
@ -207,7 +222,7 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st
|
||||
return object{}, storj.Object{}, err
|
||||
}
|
||||
|
||||
info := objectStreamFromMeta(bucket, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
||||
info = objectStreamFromMeta(bucket, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme)
|
||||
|
||||
return object{
|
||||
fullpath: fullpath,
|
||||
|
@ -27,6 +27,8 @@ type readonlyStream struct {
|
||||
func (stream *readonlyStream) Info() storj.Object { return stream.info }
|
||||
|
||||
func (stream *readonlyStream) SegmentsAt(ctx context.Context, byteOffset int64, limit int64) (infos []storj.Segment, more bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if stream.info.FixedSegmentSize <= 0 {
|
||||
return nil, false, errors.New("not implemented")
|
||||
}
|
||||
@ -35,7 +37,9 @@ func (stream *readonlyStream) SegmentsAt(ctx context.Context, byteOffset int64,
|
||||
return stream.Segments(ctx, index, limit)
|
||||
}
|
||||
|
||||
func (stream *readonlyStream) segment(ctx context.Context, index int64) (storj.Segment, error) {
|
||||
func (stream *readonlyStream) segment(ctx context.Context, index int64) (info storj.Segment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
segment := storj.Segment{
|
||||
Index: index,
|
||||
}
|
||||
@ -64,7 +68,7 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (storj.S
|
||||
}
|
||||
|
||||
var nonce storj.Nonce
|
||||
_, err := encryption.Increment(&nonce, index+1)
|
||||
_, err = encryption.Increment(&nonce, index+1)
|
||||
if err != nil {
|
||||
return segment, err
|
||||
}
|
||||
@ -73,6 +77,8 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (storj.S
|
||||
}
|
||||
|
||||
func (stream *readonlyStream) Segments(ctx context.Context, index int64, limit int64) (infos []storj.Segment, more bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if index < 0 {
|
||||
return nil, false, errors.New("invalid argument")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user