From f730ce451ae9d3a8c87cd651b047b62dff5b59ad Mon Sep 17 00:00:00 2001 From: Kaloyan Raev Date: Fri, 16 Nov 2018 15:59:27 +0200 Subject: [PATCH] Telemetry in metainfo implementation (#670) --- pkg/metainfo/kvmetainfo/buckets.go | 18 +++++++++---- pkg/metainfo/kvmetainfo/metainfo.go | 3 +++ pkg/metainfo/kvmetainfo/objects.go | 39 ++++++++++++++++++++--------- pkg/metainfo/kvmetainfo/stream.go | 10 ++++++-- 4 files changed, 51 insertions(+), 19 deletions(-) diff --git a/pkg/metainfo/kvmetainfo/buckets.go b/pkg/metainfo/kvmetainfo/buckets.go index 51c368aa8..259918be3 100644 --- a/pkg/metainfo/kvmetainfo/buckets.go +++ b/pkg/metainfo/kvmetainfo/buckets.go @@ -11,7 +11,9 @@ import ( ) // CreateBucket creates a new bucket with the specified information -func (db *DB) CreateBucket(ctx context.Context, bucket string, info *storj.Bucket) (storj.Bucket, error) { +func (db *DB) CreateBucket(ctx context.Context, bucket string, info *storj.Bucket) (bucketInfo storj.Bucket, err error) { + defer mon.Task()(&ctx)(&err) + if bucket == "" { return storj.Bucket{}, storj.ErrNoBucket.New("") } @@ -25,7 +27,9 @@ func (db *DB) CreateBucket(ctx context.Context, bucket string, info *storj.Bucke } // DeleteBucket deletes bucket -func (db *DB) DeleteBucket(ctx context.Context, bucket string) error { +func (db *DB) DeleteBucket(ctx context.Context, bucket string) (err error) { + defer mon.Task()(&ctx)(&err) + if bucket == "" { return storj.ErrNoBucket.New("") } @@ -34,7 +38,9 @@ func (db *DB) DeleteBucket(ctx context.Context, bucket string) error { } // GetBucket gets bucket information -func (db *DB) GetBucket(ctx context.Context, bucket string) (storj.Bucket, error) { +func (db *DB) GetBucket(ctx context.Context, bucket string) (bucketInfo storj.Bucket, err error) { + defer mon.Task()(&ctx)(&err) + if bucket == "" { return storj.Bucket{}, storj.ErrNoBucket.New("") } @@ -48,7 +54,9 @@ func (db *DB) GetBucket(ctx context.Context, bucket string) (storj.Bucket, error } // ListBuckets lists buckets -func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions) (storj.BucketList, error) { +func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions) (list storj.BucketList, err error) { + defer mon.Task()(&ctx)(&err) + var startAfter, endBefore string switch options.Direction { case storj.Before: @@ -77,7 +85,7 @@ func (db *DB) ListBuckets(ctx context.Context, options storj.BucketListOptions) return storj.BucketList{}, err } - list := storj.BucketList{ + list = storj.BucketList{ More: more, Items: make([]storj.Bucket, 0, len(items)), } diff --git a/pkg/metainfo/kvmetainfo/metainfo.go b/pkg/metainfo/kvmetainfo/metainfo.go index ad77072cb..30e498d06 100644 --- a/pkg/metainfo/kvmetainfo/metainfo.go +++ b/pkg/metainfo/kvmetainfo/metainfo.go @@ -5,6 +5,7 @@ package kvmetainfo import ( "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/internal/memory" "storj.io/storj/pkg/pointerdb/pdbclient" @@ -15,6 +16,8 @@ import ( "storj.io/storj/storage" ) +var mon = monkit.Package() + var errClass = errs.Class("kvmetainfo") const defaultSegmentLimit = 8 // TODO diff --git a/pkg/metainfo/kvmetainfo/objects.go b/pkg/metainfo/kvmetainfo/objects.go index a3a7f8f6d..8a7247fb2 100644 --- a/pkg/metainfo/kvmetainfo/objects.go +++ b/pkg/metainfo/kvmetainfo/objects.go @@ -29,13 +29,18 @@ const ( ) // GetObject returns information about an object -func (db *DB) GetObject(ctx context.Context, bucket string, path storj.Path) (storj.Object, error) { - _, info, err := db.getInfo(ctx, committedPrefix, bucket, path) +func (db *DB) GetObject(ctx context.Context, bucket string, path storj.Path) (info storj.Object, err error) { + defer mon.Task()(&ctx)(&err) + + _, info, err = db.getInfo(ctx, committedPrefix, bucket, path) + return info, err } // GetObjectStream returns interface for reading the object stream -func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Path) (storj.ReadOnlyStream, error) { +func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Path) (stream storj.ReadOnlyStream, err error) { + defer mon.Task()(&ctx)(&err) + meta, info, err := db.getInfo(ctx, committedPrefix, bucket, path) if err != nil { return nil, err @@ -55,17 +60,21 @@ func (db *DB) GetObjectStream(ctx context.Context, bucket string, path storj.Pat } // CreateObject creates an uploading object and returns an interface for uploading Object information -func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path, createInfo *storj.CreateObject) (storj.MutableObject, error) { +func (db *DB) CreateObject(ctx context.Context, bucket string, path storj.Path, createInfo *storj.CreateObject) (object storj.MutableObject, err error) { + defer mon.Task()(&ctx)(&err) return nil, errors.New("not implemented") } // ModifyObject modifies a committed object -func (db *DB) ModifyObject(ctx context.Context, bucket string, path storj.Path) (storj.MutableObject, error) { +func (db *DB) ModifyObject(ctx context.Context, bucket string, path storj.Path) (object storj.MutableObject, err error) { + defer mon.Task()(&ctx)(&err) return nil, errors.New("not implemented") } // DeleteObject deletes an object from database -func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path) error { +func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path) (err error) { + defer mon.Task()(&ctx)(&err) + store, err := db.buckets.GetObjectStore(ctx, bucket) if err != nil { return err @@ -80,17 +89,21 @@ func (db *DB) DeleteObject(ctx context.Context, bucket string, path storj.Path) } // ModifyPendingObject creates an interface for updating a partially uploaded object -func (db *DB) ModifyPendingObject(ctx context.Context, bucket string, path storj.Path) (storj.MutableObject, error) { +func (db *DB) ModifyPendingObject(ctx context.Context, bucket string, path storj.Path) (object storj.MutableObject, err error) { + defer mon.Task()(&ctx)(&err) return nil, errors.New("not implemented") } // ListPendingObjects lists pending objects in bucket based on the ListOptions -func (db *DB) ListPendingObjects(ctx context.Context, bucket string, options storj.ListOptions) (storj.ObjectList, error) { +func (db *DB) ListPendingObjects(ctx context.Context, bucket string, options storj.ListOptions) (list storj.ObjectList, err error) { + defer mon.Task()(&ctx)(&err) return storj.ObjectList{}, errors.New("not implemented") } // ListObjects lists objects in bucket based on the ListOptions -func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.ListOptions) (storj.ObjectList, error) { +func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.ListOptions) (list storj.ObjectList, err error) { + defer mon.Task()(&ctx)(&err) + objects, err := db.buckets.GetObjectStore(ctx, bucket) if err != nil { return storj.ObjectList{}, err @@ -124,7 +137,7 @@ func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.List return storj.ObjectList{}, err } - list := storj.ObjectList{ + list = storj.ObjectList{ Bucket: bucket, Prefix: options.Prefix, More: more, @@ -146,7 +159,9 @@ type object struct { streamMeta pb.StreamMeta } -func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path storj.Path) (object, storj.Object, error) { +func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path storj.Path) (obj object, info storj.Object, err error) { + defer mon.Task()(&ctx)(&err) + bucketInfo, err := db.GetBucket(ctx, bucket) if err != nil { return object{}, storj.Object{}, err @@ -207,7 +222,7 @@ func (db *DB) getInfo(ctx context.Context, prefix string, bucket string, path st return object{}, storj.Object{}, err } - info := objectStreamFromMeta(bucket, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme) + info = objectStreamFromMeta(bucket, path, lastSegmentMeta, streamInfo, streamMeta, redundancyScheme) return object{ fullpath: fullpath, diff --git a/pkg/metainfo/kvmetainfo/stream.go b/pkg/metainfo/kvmetainfo/stream.go index d6fd31c4f..8c1d3798e 100644 --- a/pkg/metainfo/kvmetainfo/stream.go +++ b/pkg/metainfo/kvmetainfo/stream.go @@ -27,6 +27,8 @@ type readonlyStream struct { func (stream *readonlyStream) Info() storj.Object { return stream.info } func (stream *readonlyStream) SegmentsAt(ctx context.Context, byteOffset int64, limit int64) (infos []storj.Segment, more bool, err error) { + defer mon.Task()(&ctx)(&err) + if stream.info.FixedSegmentSize <= 0 { return nil, false, errors.New("not implemented") } @@ -35,7 +37,9 @@ func (stream *readonlyStream) SegmentsAt(ctx context.Context, byteOffset int64, return stream.Segments(ctx, index, limit) } -func (stream *readonlyStream) segment(ctx context.Context, index int64) (storj.Segment, error) { +func (stream *readonlyStream) segment(ctx context.Context, index int64) (info storj.Segment, err error) { + defer mon.Task()(&ctx)(&err) + segment := storj.Segment{ Index: index, } @@ -64,7 +68,7 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (storj.S } var nonce storj.Nonce - _, err := encryption.Increment(&nonce, index+1) + _, err = encryption.Increment(&nonce, index+1) if err != nil { return segment, err } @@ -73,6 +77,8 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (storj.S } func (stream *readonlyStream) Segments(ctx context.Context, index int64, limit int64) (infos []storj.Segment, more bool, err error) { + defer mon.Task()(&ctx)(&err) + if index < 0 { return nil, false, errors.New("invalid argument") }