From 64c467ffe761882e766c981d0fdcb409882f0093 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Tue, 10 Sep 2019 08:39:47 -0700 Subject: [PATCH] uplink: integrate new Metainfo calls (#2640) --- lib/uplink/bucket.go | 2 +- lib/uplink/project.go | 2 +- pkg/miniogw/gateway_test.go | 2 +- satellite/metainfo/metainfo_test.go | 2 +- uplink/metainfo/client.go | 8 +- uplink/metainfo/kvmetainfo/buckets_test.go | 2 +- uplink/metainfo/kvmetainfo/objects.go | 62 +-- uplink/metainfo/kvmetainfo/objects_test.go | 466 +++++++++++---------- uplink/metainfo/kvmetainfo/paths.go | 41 +- uplink/metainfo/kvmetainfo/stream.go | 16 +- uplink/metainfo/kvmetainfo/temputils.go | 2 +- uplink/storage/segments/store.go | 338 +++++---------- uplink/storage/segments/store_test.go | 253 +---------- uplink/storage/streams/shim.go | 5 +- uplink/storage/streams/store.go | 363 +++++++--------- uplink/storage/streams/store_test.go | 228 ++++++++++ 16 files changed, 780 insertions(+), 1012 deletions(-) create mode 100644 uplink/storage/streams/store_test.go diff --git a/lib/uplink/bucket.go b/lib/uplink/bucket.go index 2eda49ddf..fd1023fd5 100644 --- a/lib/uplink/bucket.go +++ b/lib/uplink/bucket.go @@ -121,7 +121,7 @@ type ListOptions = storj.ListOptions func (b *Bucket) ListObjects(ctx context.Context, cfg *ListOptions) (list storj.ObjectList, err error) { defer mon.Task()(&ctx)(&err) if cfg == nil { - cfg = &storj.ListOptions{Direction: storj.Forward} + cfg = &storj.ListOptions{Direction: storj.After} } return b.metainfo.ListObjects(ctx, b.bucket.Name, *cfg) } diff --git a/lib/uplink/project.go b/lib/uplink/project.go index b8ea0a9a0..26c0314a4 100644 --- a/lib/uplink/project.go +++ b/lib/uplink/project.go @@ -199,7 +199,7 @@ func (p *Project) OpenBucket(ctx context.Context, bucketName string, access *Enc } segmentStore := segments.NewSegmentStore(p.metainfo, ec, rs, p.maxInlineSize.Int(), maxEncryptedSegmentSize) - streamStore, err := streams.NewStreamStore(segmentStore, cfg.Volatile.SegmentsSize.Int64(), access.store, int(encryptionParameters.BlockSize), encryptionParameters.CipherSuite, p.maxInlineSize.Int()) + streamStore, err := streams.NewStreamStore(p.metainfo, segmentStore, cfg.Volatile.SegmentsSize.Int64(), access.store, int(encryptionParameters.BlockSize), encryptionParameters.CipherSuite, p.maxInlineSize.Int()) if err != nil { return nil, err } diff --git a/pkg/miniogw/gateway_test.go b/pkg/miniogw/gateway_test.go index 1bf13ab0c..19c3a22d1 100644 --- a/pkg/miniogw/gateway_test.go +++ b/pkg/miniogw/gateway_test.go @@ -710,7 +710,7 @@ func initEnv(ctx context.Context, t *testing.T, planet *testplanet.Planet) (mini blockSize := rs.StripeSize() inlineThreshold := 4 * memory.KiB.Int() - strms, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), encStore, blockSize, storj.EncAESGCM, inlineThreshold) + strms, err := streams.NewStreamStore(m, segments, 64*memory.MiB.Int64(), encStore, blockSize, storj.EncAESGCM, inlineThreshold) if err != nil { return nil, nil, nil, err } diff --git a/satellite/metainfo/metainfo_test.go b/satellite/metainfo/metainfo_test.go index e244af56a..9279d71fb 100644 --- a/satellite/metainfo/metainfo_test.go +++ b/satellite/metainfo/metainfo_test.go @@ -966,7 +966,7 @@ func TestBeginCommitListSegment(t *testing.T) { Position: storj.SegmentPosition{ Index: 0, }, - MaxOderLimit: memory.MiB.Int64(), + MaxOrderLimit: memory.MiB.Int64(), }) require.NoError(t, err) diff --git a/uplink/metainfo/client.go b/uplink/metainfo/client.go index facdee74f..4b07ab922 100644 --- a/uplink/metainfo/client.go +++ b/uplink/metainfo/client.go @@ -858,9 +858,9 @@ func (client *Client) ListObjects(ctx context.Context, params ListObjectsParams) // BeginSegmentParams parameters for BeginSegment method type BeginSegmentParams struct { - StreamID storj.StreamID - Position storj.SegmentPosition - MaxOderLimit int64 + StreamID storj.StreamID + Position storj.SegmentPosition + MaxOrderLimit int64 } func (params *BeginSegmentParams) toRequest() *pb.SegmentBeginRequest { @@ -870,7 +870,7 @@ func (params *BeginSegmentParams) toRequest() *pb.SegmentBeginRequest { PartNumber: params.Position.PartNumber, Index: params.Position.Index, }, - MaxOrderLimit: params.MaxOderLimit, + MaxOrderLimit: params.MaxOrderLimit, } } diff --git a/uplink/metainfo/kvmetainfo/buckets_test.go b/uplink/metainfo/kvmetainfo/buckets_test.go index 2bbe6e6d0..767466826 100644 --- a/uplink/metainfo/kvmetainfo/buckets_test.go +++ b/uplink/metainfo/kvmetainfo/buckets_test.go @@ -278,7 +278,7 @@ func newMetainfoParts(planet *testplanet.Planet) (*kvmetainfo.DB, streams.Store, const stripesPerBlock = 2 blockSize := stripesPerBlock * rs.StripeSize() inlineThreshold := 8 * memory.KiB.Int() - streams, err := streams.NewStreamStore(segments, 64*memory.MiB.Int64(), encStore, blockSize, storj.EncAESGCM, inlineThreshold) + streams, err := streams.NewStreamStore(metainfo, segments, 64*memory.MiB.Int64(), encStore, blockSize, storj.EncAESGCM, inlineThreshold) if err != nil { return nil, nil, err } diff --git a/uplink/metainfo/kvmetainfo/objects.go b/uplink/metainfo/kvmetainfo/objects.go index bb501148f..54003e9e5 100644 --- a/uplink/metainfo/kvmetainfo/objects.go +++ b/uplink/metainfo/kvmetainfo/objects.go @@ -15,6 +15,7 @@ import ( "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" "storj.io/storj/storage" + "storj.io/storj/uplink/metainfo" "storj.io/storj/uplink/storage/meta" "storj.io/storj/uplink/storage/objects" "storj.io/storj/uplink/storage/segments" @@ -174,15 +175,16 @@ func (db *DB) ListObjects(ctx context.Context, bucket string, options storj.List var startAfter, endBefore string switch options.Direction { - case storj.Before: - // before lists backwards from cursor, without cursor - endBefore = options.Cursor - case storj.Backward: - // backward lists backwards from cursor, including cursor - endBefore = keyAfter(options.Cursor) - case storj.Forward: - // forward lists forwards from cursor, including cursor - startAfter = keyBefore(options.Cursor) + // TODO for now we are supporting only storj.After + // case storj.Before: + // // before lists backwards from cursor, without cursor + // endBefore = options.Cursor + // case storj.Backward: + // // backward lists backwards from cursor, including cursor + // endBefore = keyAfter(options.Cursor) + // case storj.Forward: + // // forward lists forwards from cursor, including cursor + // startAfter = keyBefore(options.Cursor) case storj.After: // after lists forwards from cursor, without cursor startAfter = options.Cursor @@ -243,34 +245,21 @@ func (db *DB) getInfo(ctx context.Context, bucket string, path storj.Path) (obj return object{}, storj.Object{}, err } - pointer, err := db.metainfo.SegmentInfo(ctx, bucket, encPath.Raw(), -1) + objectInfo, err := db.metainfo.GetObject(ctx, metainfo.GetObjectParams{ + Bucket: []byte(bucket), + EncryptedPath: []byte(encPath.Raw()), + }) if err != nil { - if storage.ErrKeyNotFound.Has(err) { - err = storj.ErrObjectNotFound.Wrap(err) - } return object{}, storj.Object{}, err } - var redundancyScheme *pb.RedundancyScheme - if pointer.GetType() == pb.Pointer_REMOTE { - redundancyScheme = pointer.GetRemote().GetRedundancy() - } else { - // TODO: handle better - redundancyScheme = &pb.RedundancyScheme{ - Type: pb.RedundancyScheme_RS, - MinReq: -1, - Total: -1, - RepairThreshold: -1, - SuccessThreshold: -1, - ErasureShareSize: -1, - } - } + redundancyScheme := objectInfo.Stream.RedundancyScheme lastSegmentMeta := segments.Meta{ - Modified: pointer.CreationDate, - Expiration: pointer.GetExpirationDate(), - Size: pointer.GetSegmentSize(), - Data: pointer.GetMetadata(), + Modified: objectInfo.Created, + Expiration: objectInfo.Expires, + Size: objectInfo.Size, + Data: objectInfo.Metadata, } streamInfoData, streamMeta, err := streams.TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, fullpath, db.encStore) @@ -320,7 +309,7 @@ func objectFromMeta(bucket storj.Bucket, path storj.Path, isPrefix bool, meta ob } } -func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme *pb.RedundancyScheme) (storj.Object, error) { +func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta, redundancyScheme storj.RedundancyScheme) (storj.Object, error) { var nonce storj.Nonce var encryptedKey storj.EncryptedPrivateKey if streamMeta.LastSegmentMeta != nil { @@ -359,14 +348,7 @@ func objectStreamFromMeta(bucket storj.Bucket, path storj.Path, lastSegment segm SegmentCount: numberOfSegments, FixedSegmentSize: stream.SegmentsSize, - RedundancyScheme: storj.RedundancyScheme{ - Algorithm: storj.ReedSolomon, - ShareSize: redundancyScheme.GetErasureShareSize(), - RequiredShares: int16(redundancyScheme.GetMinReq()), - RepairShares: int16(redundancyScheme.GetRepairThreshold()), - OptimalShares: int16(redundancyScheme.GetSuccessThreshold()), - TotalShares: int16(redundancyScheme.GetTotal()), - }, + RedundancyScheme: redundancyScheme, EncryptionParameters: storj.EncryptionParameters{ CipherSuite: storj.CipherSuite(streamMeta.EncryptionType), BlockSize: streamMeta.EncryptionBlockSize, diff --git a/uplink/metainfo/kvmetainfo/objects_test.go b/uplink/metainfo/kvmetainfo/objects_test.go index 9350cce78..5eeca6034 100644 --- a/uplink/metainfo/kvmetainfo/objects_test.go +++ b/uplink/metainfo/kvmetainfo/objects_test.go @@ -277,10 +277,11 @@ func TestListObjectsEmpty(t *testing.T) { _, err = db.ListObjects(ctx, bucket.Name, storj.ListOptions{}) assert.EqualError(t, err, "kvmetainfo: invalid direction 0") + // TODO for now we are supporting only storj.After for _, direction := range []storj.ListDirection{ - storj.Before, - storj.Backward, - storj.Forward, + // storj.Before, + // storj.Backward, + // storj.Forward, storj.After, } { list, err := db.ListObjects(ctx, bucket.Name, storj.ListOptions{Direction: direction}) @@ -391,235 +392,238 @@ func TestListObjects(t *testing.T) { options: options("a/", "xaa", storj.After, 2), more: true, result: []string{"xb", "xbb"}, - }, { - options: options("", "", storj.Forward, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, - }, { - options: options("", "`", storj.Forward, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, - }, { - options: options("", "b", storj.Forward, 0), - result: []string{"b", "b/", "bb", "c"}, - }, { - options: options("", "c", storj.Forward, 0), - result: []string{"c"}, - }, { - options: options("", "ca", storj.Forward, 0), - result: []string{}, - }, { - options: options("", "", storj.Forward, 1), - more: true, - result: []string{"a"}, - }, { - options: options("", "`", storj.Forward, 1), - more: true, - result: []string{"a"}, - }, { - options: options("", "aa", storj.Forward, 1), - more: true, - result: []string{"aa"}, - }, { - options: options("", "c", storj.Forward, 1), - result: []string{"c"}, - }, { - options: options("", "ca", storj.Forward, 1), - result: []string{}, - }, { - options: options("", "", storj.Forward, 2), - more: true, - result: []string{"a", "a/"}, - }, { - options: options("", "`", storj.Forward, 2), - more: true, - result: []string{"a", "a/"}, - }, { - options: options("", "aa", storj.Forward, 2), - more: true, - result: []string{"aa", "b"}, - }, { - options: options("", "bb", storj.Forward, 2), - result: []string{"bb", "c"}, - }, { - options: options("", "c", storj.Forward, 2), - result: []string{"c"}, - }, { - options: options("", "ca", storj.Forward, 2), - result: []string{}, - }, { - options: optionsRecursive("", "", storj.Forward, 0), - result: []string{"a", "a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc", "aa", "b", "b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc", "bb", "c"}, - }, { - options: options("a", "", storj.Forward, 0), - result: []string{"xa", "xaa", "xb", "xbb", "xc"}, - }, { - options: options("a/", "", storj.Forward, 0), - result: []string{"xa", "xaa", "xb", "xbb", "xc"}, - }, { - options: options("a/", "xb", storj.Forward, 0), - result: []string{"xb", "xbb", "xc"}, - }, { - options: optionsRecursive("", "a/xbb", storj.Forward, 5), - more: true, - result: []string{"a/xbb", "a/xc", "aa", "b", "b/ya"}, - }, { - options: options("a/", "xaa", storj.Forward, 2), - more: true, - result: []string{"xaa", "xb"}, - }, { - options: options("", "", storj.Backward, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, - }, { - options: options("", "`", storj.Backward, 0), - result: []string{}, - }, { - options: options("", "b", storj.Backward, 0), - result: []string{"a", "a/", "aa", "b"}, - }, { - options: options("", "c", storj.Backward, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, - }, { - options: options("", "ca", storj.Backward, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, - }, { - options: options("", "", storj.Backward, 1), - more: true, - result: []string{"c"}, - }, { - options: options("", "`", storj.Backward, 1), - result: []string{}, - }, { - options: options("", "aa", storj.Backward, 1), - more: true, - result: []string{"aa"}, - }, { - options: options("", "c", storj.Backward, 1), - more: true, - result: []string{"c"}, - }, { - options: options("", "ca", storj.Backward, 1), - more: true, - result: []string{"c"}, - }, { - options: options("", "", storj.Backward, 2), - more: true, - result: []string{"bb", "c"}, - }, { - options: options("", "`", storj.Backward, 2), - result: []string{}, - }, { - options: options("", "a/", storj.Backward, 2), - result: []string{"a"}, - }, { - options: options("", "bb", storj.Backward, 2), - more: true, - result: []string{"b/", "bb"}, - }, { - options: options("", "c", storj.Backward, 2), - more: true, - result: []string{"bb", "c"}, - }, { - options: options("", "ca", storj.Backward, 2), - more: true, - result: []string{"bb", "c"}, - }, { - options: optionsRecursive("", "", storj.Backward, 0), - result: []string{"a", "a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc", "aa", "b", "b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc", "bb", "c"}, - }, { - options: options("a", "", storj.Backward, 0), - result: []string{"xa", "xaa", "xb", "xbb", "xc"}, - }, { - options: options("a/", "", storj.Backward, 0), - result: []string{"xa", "xaa", "xb", "xbb", "xc"}, - }, { - options: options("a/", "xb", storj.Backward, 0), - result: []string{"xa", "xaa", "xb"}, - }, { - options: optionsRecursive("", "b/yaa", storj.Backward, 5), - more: true, - result: []string{"a/xc", "aa", "b", "b/ya", "b/yaa"}, - }, { - options: options("a/", "xbb", storj.Backward, 2), - more: true, - result: []string{"xb", "xbb"}, - }, { - options: options("", "", storj.Before, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, - }, { - options: options("", "`", storj.Before, 0), - result: []string{}, - }, { - options: options("", "a", storj.Before, 0), - result: []string{}, - }, { - options: options("", "b", storj.Before, 0), - result: []string{"a", "a/", "aa"}, - }, { - options: options("", "c", storj.Before, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb"}, - }, { - options: options("", "ca", storj.Before, 0), - result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, - }, { - options: options("", "", storj.Before, 1), - more: true, - result: []string{"c"}, - }, { - options: options("", "`", storj.Before, 1), - result: []string{}, - }, { - options: options("", "a/", storj.Before, 1), - result: []string{"a"}, - }, { - options: options("", "c", storj.Before, 1), - more: true, - result: []string{"bb"}, - }, { - options: options("", "ca", storj.Before, 1), - more: true, - result: []string{"c"}, - }, { - options: options("", "", storj.Before, 2), - more: true, - result: []string{"bb", "c"}, - }, { - options: options("", "`", storj.Before, 2), - result: []string{}, - }, { - options: options("", "a/", storj.Before, 2), - result: []string{"a"}, - }, { - options: options("", "bb", storj.Before, 2), - more: true, - result: []string{"b", "b/"}, - }, { - options: options("", "c", storj.Before, 2), - more: true, - result: []string{"b/", "bb"}, - }, { - options: options("", "ca", storj.Before, 2), - more: true, - result: []string{"bb", "c"}, - }, { - options: optionsRecursive("", "", storj.Before, 0), - result: []string{"a", "a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc", "aa", "b", "b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc", "bb", "c"}, - }, { - options: options("a", "", storj.Before, 0), - result: []string{"xa", "xaa", "xb", "xbb", "xc"}, - }, { - options: options("a/", "", storj.Before, 0), - result: []string{"xa", "xaa", "xb", "xbb", "xc"}, - }, { - options: options("a/", "xb", storj.Before, 0), - result: []string{"xa", "xaa"}, - }, { - options: optionsRecursive("", "b/yaa", storj.Before, 5), - more: true, - result: []string{"a/xbb", "a/xc", "aa", "b", "b/ya"}, - }, { - options: options("a/", "xbb", storj.Before, 2), - more: true, - result: []string{"xaa", "xb"}, }, + // TODO commented until we will decide if we will support direction for object listing + // + // { + // options: options("", "", storj.Forward, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, + // }, { + // options: options("", "`", storj.Forward, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, + // }, { + // options: options("", "b", storj.Forward, 0), + // result: []string{"b", "b/", "bb", "c"}, + // }, { + // options: options("", "c", storj.Forward, 0), + // result: []string{"c"}, + // }, { + // options: options("", "ca", storj.Forward, 0), + // result: []string{}, + // }, { + // options: options("", "", storj.Forward, 1), + // more: true, + // result: []string{"a"}, + // }, { + // options: options("", "`", storj.Forward, 1), + // more: true, + // result: []string{"a"}, + // }, { + // options: options("", "aa", storj.Forward, 1), + // more: true, + // result: []string{"aa"}, + // }, { + // options: options("", "c", storj.Forward, 1), + // result: []string{"c"}, + // }, { + // options: options("", "ca", storj.Forward, 1), + // result: []string{}, + // }, { + // options: options("", "", storj.Forward, 2), + // more: true, + // result: []string{"a", "a/"}, + // }, { + // options: options("", "`", storj.Forward, 2), + // more: true, + // result: []string{"a", "a/"}, + // }, { + // options: options("", "aa", storj.Forward, 2), + // more: true, + // result: []string{"aa", "b"}, + // }, { + // options: options("", "bb", storj.Forward, 2), + // result: []string{"bb", "c"}, + // }, { + // options: options("", "c", storj.Forward, 2), + // result: []string{"c"}, + // }, { + // options: options("", "ca", storj.Forward, 2), + // result: []string{}, + // }, { + // options: optionsRecursive("", "", storj.Forward, 0), + // result: []string{"a", "a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc", "aa", "b", "b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc", "bb", "c"}, + // }, { + // options: options("a", "", storj.Forward, 0), + // result: []string{"xa", "xaa", "xb", "xbb", "xc"}, + // }, { + // options: options("a/", "", storj.Forward, 0), + // result: []string{"xa", "xaa", "xb", "xbb", "xc"}, + // }, { + // options: options("a/", "xb", storj.Forward, 0), + // result: []string{"xb", "xbb", "xc"}, + // }, { + // options: optionsRecursive("", "a/xbb", storj.Forward, 5), + // more: true, + // result: []string{"a/xbb", "a/xc", "aa", "b", "b/ya"}, + // }, { + // options: options("a/", "xaa", storj.Forward, 2), + // more: true, + // result: []string{"xaa", "xb"}, + // }, { + // options: options("", "", storj.Backward, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, + // }, { + // options: options("", "`", storj.Backward, 0), + // result: []string{}, + // }, { + // options: options("", "b", storj.Backward, 0), + // result: []string{"a", "a/", "aa", "b"}, + // }, { + // options: options("", "c", storj.Backward, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, + // }, { + // options: options("", "ca", storj.Backward, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, + // }, { + // options: options("", "", storj.Backward, 1), + // more: true, + // result: []string{"c"}, + // }, { + // options: options("", "`", storj.Backward, 1), + // result: []string{}, + // }, { + // options: options("", "aa", storj.Backward, 1), + // more: true, + // result: []string{"aa"}, + // }, { + // options: options("", "c", storj.Backward, 1), + // more: true, + // result: []string{"c"}, + // }, { + // options: options("", "ca", storj.Backward, 1), + // more: true, + // result: []string{"c"}, + // }, { + // options: options("", "", storj.Backward, 2), + // more: true, + // result: []string{"bb", "c"}, + // }, { + // options: options("", "`", storj.Backward, 2), + // result: []string{}, + // }, { + // options: options("", "a/", storj.Backward, 2), + // result: []string{"a"}, + // }, { + // options: options("", "bb", storj.Backward, 2), + // more: true, + // result: []string{"b/", "bb"}, + // }, { + // options: options("", "c", storj.Backward, 2), + // more: true, + // result: []string{"bb", "c"}, + // }, { + // options: options("", "ca", storj.Backward, 2), + // more: true, + // result: []string{"bb", "c"}, + // }, { + // options: optionsRecursive("", "", storj.Backward, 0), + // result: []string{"a", "a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc", "aa", "b", "b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc", "bb", "c"}, + // }, { + // options: options("a", "", storj.Backward, 0), + // result: []string{"xa", "xaa", "xb", "xbb", "xc"}, + // }, { + // options: options("a/", "", storj.Backward, 0), + // result: []string{"xa", "xaa", "xb", "xbb", "xc"}, + // }, { + // options: options("a/", "xb", storj.Backward, 0), + // result: []string{"xa", "xaa", "xb"}, + // }, { + // options: optionsRecursive("", "b/yaa", storj.Backward, 5), + // more: true, + // result: []string{"a/xc", "aa", "b", "b/ya", "b/yaa"}, + // }, { + // options: options("a/", "xbb", storj.Backward, 2), + // more: true, + // result: []string{"xb", "xbb"}, + // }, { + // options: options("", "", storj.Before, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, + // }, { + // options: options("", "`", storj.Before, 0), + // result: []string{}, + // }, { + // options: options("", "a", storj.Before, 0), + // result: []string{}, + // }, { + // options: options("", "b", storj.Before, 0), + // result: []string{"a", "a/", "aa"}, + // }, { + // options: options("", "c", storj.Before, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb"}, + // }, { + // options: options("", "ca", storj.Before, 0), + // result: []string{"a", "a/", "aa", "b", "b/", "bb", "c"}, + // }, { + // options: options("", "", storj.Before, 1), + // more: true, + // result: []string{"c"}, + // }, { + // options: options("", "`", storj.Before, 1), + // result: []string{}, + // }, { + // options: options("", "a/", storj.Before, 1), + // result: []string{"a"}, + // }, { + // options: options("", "c", storj.Before, 1), + // more: true, + // result: []string{"bb"}, + // }, { + // options: options("", "ca", storj.Before, 1), + // more: true, + // result: []string{"c"}, + // }, { + // options: options("", "", storj.Before, 2), + // more: true, + // result: []string{"bb", "c"}, + // }, { + // options: options("", "`", storj.Before, 2), + // result: []string{}, + // }, { + // options: options("", "a/", storj.Before, 2), + // result: []string{"a"}, + // }, { + // options: options("", "bb", storj.Before, 2), + // more: true, + // result: []string{"b", "b/"}, + // }, { + // options: options("", "c", storj.Before, 2), + // more: true, + // result: []string{"b/", "bb"}, + // }, { + // options: options("", "ca", storj.Before, 2), + // more: true, + // result: []string{"bb", "c"}, + // }, { + // options: optionsRecursive("", "", storj.Before, 0), + // result: []string{"a", "a/xa", "a/xaa", "a/xb", "a/xbb", "a/xc", "aa", "b", "b/ya", "b/yaa", "b/yb", "b/ybb", "b/yc", "bb", "c"}, + // }, { + // options: options("a", "", storj.Before, 0), + // result: []string{"xa", "xaa", "xb", "xbb", "xc"}, + // }, { + // options: options("a/", "", storj.Before, 0), + // result: []string{"xa", "xaa", "xb", "xbb", "xc"}, + // }, { + // options: options("a/", "xb", storj.Before, 0), + // result: []string{"xa", "xaa"}, + // }, { + // options: optionsRecursive("", "b/yaa", storj.Before, 5), + // more: true, + // result: []string{"a/xbb", "a/xc", "aa", "b", "b/ya"}, + // }, { + // options: options("a/", "xbb", storj.Before, 2), + // more: true, + // result: []string{"xaa", "xb"}, + // }, } { errTag := fmt.Sprintf("%d. %+v", i, tt) diff --git a/uplink/metainfo/kvmetainfo/paths.go b/uplink/metainfo/kvmetainfo/paths.go index 1928331e4..7562f5baf 100644 --- a/uplink/metainfo/kvmetainfo/paths.go +++ b/uplink/metainfo/kvmetainfo/paths.go @@ -3,36 +3,27 @@ package kvmetainfo -import ( - "fmt" - - "storj.io/storj/pkg/storj" -) - // TODO: known issue: // this is incorrect since there's no good way to get such a path // since the exact previous key is // append(previousPrefix(cursor), infinite(0xFF)...) -func keyBefore(cursor string) string { - if cursor == "" { - return "" - } - before := []byte(cursor) - if before[len(before)-1] == 0 { - return string(before[:len(before)-1]) - } - before[len(before)-1]-- +// TODO commented until we will decide if we will support direction for objects listing +// func keyBefore(cursor string) string { +// if cursor == "" { +// return "" +// } - before = append(before, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f) - return string(before) -} +// before := []byte(cursor) +// if before[len(before)-1] == 0 { +// return string(before[:len(before)-1]) +// } +// before[len(before)-1]-- -func keyAfter(cursor string) string { - return cursor + "\x00" -} +// before = append(before, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f) +// return string(before) +// } -// getSegmentPath returns the unique path for a particular segment -func getSegmentPath(encryptedPath storj.Path, segNum int64) storj.Path { - return storj.JoinPaths(fmt.Sprintf("s%d", segNum), encryptedPath) -} +// func keyAfter(cursor string) string { +// return cursor + "\x00" +// } diff --git a/uplink/metainfo/kvmetainfo/stream.go b/uplink/metainfo/kvmetainfo/stream.go index 1413c7732..aab707114 100644 --- a/uplink/metainfo/kvmetainfo/stream.go +++ b/uplink/metainfo/kvmetainfo/stream.go @@ -7,8 +7,6 @@ import ( "context" "errors" - "github.com/gogo/protobuf/proto" - "storj.io/storj/pkg/encryption" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" @@ -19,6 +17,7 @@ var _ storj.ReadOnlyStream = (*readonlyStream)(nil) type readonlyStream struct { db *DB + id storj.StreamID info storj.Object bucket string encPath storj.Path @@ -47,21 +46,14 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment isLastSegment := segment.Index+1 == stream.info.SegmentCount if !isLastSegment { - segmentPath := getSegmentPath(storj.JoinPaths(stream.bucket, stream.encPath), index) - _, meta, err := stream.db.segments.Get(ctx, segmentPath) - if err != nil { - return segment, err - } - - segmentMeta := pb.SegmentMeta{} - err = proto.Unmarshal(meta.Data, &segmentMeta) + _, segmentEnc, err := stream.db.segments.Get(ctx, stream.id, int32(index), stream.info.RedundancyScheme) if err != nil { return segment, err } segment.Size = stream.info.FixedSegmentSize - copy(segment.EncryptedKeyNonce[:], segmentMeta.KeyNonce) - segment.EncryptedKey = segmentMeta.EncryptedKey + segment.EncryptedKeyNonce = segmentEnc.EncryptedKeyNonce + segment.EncryptedKey = segmentEnc.EncryptedKey } else { segment.Size = stream.info.LastSegment.Size segment.EncryptedKeyNonce = stream.info.LastSegment.EncryptedKeyNonce diff --git a/uplink/metainfo/kvmetainfo/temputils.go b/uplink/metainfo/kvmetainfo/temputils.go index f374ce203..87b5a4597 100644 --- a/uplink/metainfo/kvmetainfo/temputils.go +++ b/uplink/metainfo/kvmetainfo/temputils.go @@ -41,7 +41,7 @@ func SetupProject(m *metainfo.Client) (*Project, error) { // TODO: https://storjlabs.atlassian.net/browse/V3-1967 encStore := encryption.NewStore() encStore.SetDefaultKey(new(storj.Key)) - strms, err := streams.NewStreamStore(segment, maxBucketMetaSize.Int64(), encStore, memory.KiB.Int(), storj.EncAESGCM, maxBucketMetaSize.Int()) + strms, err := streams.NewStreamStore(m, segment, maxBucketMetaSize.Int64(), encStore, memory.KiB.Int(), storj.EncAESGCM, maxBucketMetaSize.Int()) if err != nil { return nil, Error.New("failed to create streams: %v", err) } diff --git a/uplink/storage/segments/store.go b/uplink/storage/segments/store.go index 0d2025c68..f7c50e2a4 100644 --- a/uplink/storage/segments/store.go +++ b/uplink/storage/segments/store.go @@ -7,11 +7,10 @@ import ( "context" "io" "math/rand" - "strconv" - "strings" "sync" "time" + "github.com/vivint/infectious" "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/pb" @@ -43,11 +42,9 @@ type ListItem struct { // Store for segments type Store interface { - Meta(ctx context.Context, path storj.Path) (meta Meta, err error) - Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error) - Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (storj.Path, []byte, error)) (meta Meta, err error) - Delete(ctx context.Context, path storj.Path) (err error) - List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) + Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, encryption storj.SegmentEncryption, err error) + Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error) + Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error) } type segmentStore struct { @@ -72,133 +69,102 @@ func NewSegmentStore(metainfo *metainfo.Client, ec ecclient.Client, rs eestream. } } -// Meta retrieves the metadata of the segment -func (s *segmentStore) Meta(ctx context.Context, path storj.Path) (meta Meta, err error) { - defer mon.Task()(&ctx)(&err) - - bucket, objectPath, segmentIndex, err := splitPathFragments(path) - if err != nil { - return Meta{}, err - } - - pointer, err := s.metainfo.SegmentInfo(ctx, bucket, objectPath, segmentIndex) - if err != nil { - return Meta{}, Error.Wrap(err) - } - - return convertMeta(pointer), nil -} - // Put uploads a segment to an erasure code client -func (s *segmentStore) Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (storj.Path, []byte, error)) (meta Meta, err error) { +func (s *segmentStore) Put(ctx context.Context, streamID storj.StreamID, data io.Reader, expiration time.Time, segmentInfo func() (int64, storj.SegmentEncryption, error)) (meta Meta, err error) { defer mon.Task()(&ctx)(&err) - redundancy := &pb.RedundancyScheme{ - Type: pb.RedundancyScheme_RS, - MinReq: int32(s.rs.RequiredCount()), - Total: int32(s.rs.TotalCount()), - RepairThreshold: int32(s.rs.RepairThreshold()), - SuccessThreshold: int32(s.rs.OptimalThreshold()), - ErasureShareSize: int32(s.rs.ErasureShareSize()), - } - peekReader := NewPeekThresholdReader(data) remoteSized, err := peekReader.IsLargerThan(s.thresholdSize) if err != nil { return Meta{}, err } - var path storj.Path - var pointer *pb.Pointer - var originalLimits []*pb.OrderLimit if !remoteSized { - p, metadata, err := segmentInfo() - if err != nil { - return Meta{}, Error.Wrap(err) - } - path = p - - pointer = &pb.Pointer{ - CreationDate: time.Now(), - Type: pb.Pointer_INLINE, - InlineSegment: peekReader.thresholdBuf, - SegmentSize: int64(len(peekReader.thresholdBuf)), - ExpirationDate: expiration, - Metadata: metadata, - } - } else { - // early call to get bucket name, rest of the path cannot be determine yet - p, _, err := segmentInfo() - if err != nil { - return Meta{}, Error.Wrap(err) - } - bucket, objectPath, _, err := splitPathFragments(p) - if err != nil { - return Meta{}, err - } - - // path and segment index are not known at this point - limits, rootPieceID, piecePrivateKey, err := s.metainfo.CreateSegment(ctx, bucket, objectPath, -1, redundancy, s.maxEncryptedSegmentSize, expiration) + segmentIndex, encryption, err := segmentInfo() if err != nil { return Meta{}, Error.Wrap(err) } - sizedReader := SizeReader(peekReader) - - successfulNodes, successfulHashes, err := s.ec.Put(ctx, limits, piecePrivateKey, s.rs, sizedReader, expiration) + err = s.metainfo.MakeInlineSegment(ctx, metainfo.MakeInlineSegmentParams{ + StreamID: streamID, + Position: storj.SegmentPosition{ + Index: int32(segmentIndex), + }, + Encryption: encryption, + EncryptedInlineData: peekReader.thresholdBuf, + }) if err != nil { return Meta{}, Error.Wrap(err) } - - p, metadata, err := segmentInfo() - if err != nil { - return Meta{}, Error.Wrap(err) - } - path = p - - pointer, err = makeRemotePointer(successfulNodes, successfulHashes, s.rs, rootPieceID, sizedReader.Size(), expiration, metadata) - if err != nil { - return Meta{}, Error.Wrap(err) - } - - originalLimits = make([]*pb.OrderLimit, len(limits)) - for i, addressedLimit := range limits { - originalLimits[i] = addressedLimit.GetLimit() - } + return Meta{}, nil } - bucket, objectPath, segmentIndex, err := splitPathFragments(path) - if err != nil { - return Meta{}, err - } - - savedPointer, err := s.metainfo.CommitSegment(ctx, bucket, objectPath, segmentIndex, pointer, originalLimits) + segmentIndex, encryption, err := segmentInfo() if err != nil { return Meta{}, Error.Wrap(err) } - return convertMeta(savedPointer), nil + segmentID, limits, piecePrivateKey, err := s.metainfo.BeginSegment(ctx, metainfo.BeginSegmentParams{ + StreamID: streamID, + MaxOrderLimit: s.maxEncryptedSegmentSize, + Position: storj.SegmentPosition{ + Index: int32(segmentIndex), + }, + }) + if err != nil { + return Meta{}, Error.Wrap(err) + } + + sizedReader := SizeReader(peekReader) + + successfulNodes, successfulHashes, err := s.ec.Put(ctx, limits, piecePrivateKey, s.rs, sizedReader, expiration) + if err != nil { + return Meta{}, Error.Wrap(err) + } + + uploadResults := make([]*pb.SegmentPieceUploadResult, 0, len(successfulNodes)) + for i := range successfulNodes { + if successfulNodes[i] == nil { + continue + } + uploadResults = append(uploadResults, &pb.SegmentPieceUploadResult{ + PieceNum: int32(i), + NodeId: successfulNodes[i].Id, + Hash: successfulHashes[i], + }) + } + err = s.metainfo.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{ + SegmentID: segmentID, + SizeEncryptedData: sizedReader.Size(), + Encryption: encryption, + UploadResult: uploadResults, + }) + if err != nil { + return Meta{}, Error.Wrap(err) + } + + return Meta{}, nil } // Get requests the satellite to read a segment and downloaded the pieces from the storage nodes -func (s *segmentStore) Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error) { +func (s *segmentStore) Get(ctx context.Context, streamID storj.StreamID, segmentIndex int32, objectRS storj.RedundancyScheme) (rr ranger.Ranger, _ storj.SegmentEncryption, err error) { defer mon.Task()(&ctx)(&err) - bucket, objectPath, segmentIndex, err := splitPathFragments(path) + info, limits, err := s.metainfo.DownloadSegment(ctx, metainfo.DownloadSegmentParams{ + StreamID: streamID, + Position: storj.SegmentPosition{ + Index: segmentIndex, + }, + }) if err != nil { - return nil, Meta{}, err + return nil, storj.SegmentEncryption{}, Error.Wrap(err) } - pointer, limits, piecePrivateKey, err := s.metainfo.ReadSegment(ctx, bucket, objectPath, segmentIndex) - if err != nil { - return nil, Meta{}, Error.Wrap(err) - } - - switch pointer.GetType() { - case pb.Pointer_INLINE: - return ranger.ByteRanger(pointer.InlineSegment), convertMeta(pointer), nil - case pb.Pointer_REMOTE: - needed := CalcNeededNodes(pointer.GetRemote().GetRedundancy()) + switch { + case len(info.EncryptedInlineData) != 0: + return ranger.ByteRanger(info.EncryptedInlineData), info.SegmentEncryption, nil + default: + needed := CalcNeededNodes(objectRS) selected := make([]*pb.AddressedOrderLimit, len(limits)) s.rngMu.Lock() perm := s.rng.Perm(len(limits)) @@ -218,84 +184,52 @@ func (s *segmentStore) Get(ctx context.Context, path storj.Path) (rr ranger.Rang } } - redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy()) + fc, err := infectious.NewFEC(int(objectRS.RequiredShares), int(objectRS.TotalShares)) if err != nil { - return nil, Meta{}, err + return nil, storj.SegmentEncryption{}, err } - - rr, err = s.ec.Get(ctx, selected, piecePrivateKey, redundancy, pointer.GetSegmentSize()) + es := eestream.NewRSScheme(fc, int(objectRS.ShareSize)) + redundancy, err := eestream.NewRedundancyStrategy(es, int(objectRS.RepairShares), int(objectRS.OptimalShares)) if err != nil { - return nil, Meta{}, Error.Wrap(err) + return nil, storj.SegmentEncryption{}, err } - return rr, convertMeta(pointer), nil - default: - return nil, Meta{}, Error.New("unsupported pointer type: %d", pointer.GetType()) - } -} - -// makeRemotePointer creates a pointer of type remote -func makeRemotePointer(nodes []*pb.Node, hashes []*pb.PieceHash, rs eestream.RedundancyStrategy, pieceID storj.PieceID, readerSize int64, expiration time.Time, metadata []byte) (pointer *pb.Pointer, err error) { - if len(nodes) != len(hashes) { - return nil, Error.New("unable to make pointer: size of nodes != size of hashes") - } - - var remotePieces []*pb.RemotePiece - for i := range nodes { - if nodes[i] == nil { - continue + rr, err = s.ec.Get(ctx, selected, info.PiecePrivateKey, redundancy, info.Size) + if err != nil { + return nil, storj.SegmentEncryption{}, Error.Wrap(err) } - remotePieces = append(remotePieces, &pb.RemotePiece{ - PieceNum: int32(i), - NodeId: nodes[i].Id, - Hash: hashes[i], - }) - } - pointer = &pb.Pointer{ - CreationDate: time.Now(), - Type: pb.Pointer_REMOTE, - Remote: &pb.RemoteSegment{ - Redundancy: &pb.RedundancyScheme{ - Type: pb.RedundancyScheme_RS, - MinReq: int32(rs.RequiredCount()), - Total: int32(rs.TotalCount()), - RepairThreshold: int32(rs.RepairThreshold()), - SuccessThreshold: int32(rs.OptimalThreshold()), - ErasureShareSize: int32(rs.ErasureShareSize()), - }, - RootPieceId: pieceID, - RemotePieces: remotePieces, - }, - SegmentSize: readerSize, - ExpirationDate: expiration, - Metadata: metadata, + return rr, info.SegmentEncryption, nil } - return pointer, nil } // Delete requests the satellite to delete a segment and tells storage nodes // to delete the segment's pieces. -func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error) { +func (s *segmentStore) Delete(ctx context.Context, streamID storj.StreamID, segmentIndex int32) (err error) { defer mon.Task()(&ctx)(&err) - bucket, objectPath, segmentIndex, err := splitPathFragments(path) - if err != nil { - return err - } - - limits, privateKey, err := s.metainfo.DeleteSegment(ctx, bucket, objectPath, segmentIndex) + segmentID, limits, privateKey, err := s.metainfo.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{ + StreamID: streamID, + Position: storj.SegmentPosition{ + Index: segmentIndex, + }, + }) if err != nil { return Error.Wrap(err) } - if len(limits) == 0 { - // inline segment - nothing else to do - return + if len(limits) != 0 { + // remote segment - delete the pieces from storage nodes + err = s.ec.Delete(ctx, limits, privateKey) + if err != nil { + return Error.Wrap(err) + } } - // remote segment - delete the pieces from storage nodes - err = s.ec.Delete(ctx, limits, privateKey) + err = s.metainfo.FinishDeleteSegment(ctx, metainfo.FinishDeleteSegmentParams{ + SegmentID: segmentID, + // TODO add delete results + }) if err != nil { return Error.Wrap(err) } @@ -303,94 +237,24 @@ func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error) return nil } -// List retrieves paths to segments and their metadata stored in the metainfo -func (s *segmentStore) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) { - defer mon.Task()(&ctx)(&err) - - bucket, strippedPrefix, _, err := splitPathFragments(prefix) - if err != nil { - return nil, false, Error.Wrap(err) - } - - list, more, err := s.metainfo.ListSegments(ctx, bucket, strippedPrefix, startAfter, endBefore, recursive, int32(limit), metaFlags) - if err != nil { - return nil, false, Error.Wrap(err) - } - - items = make([]ListItem, len(list)) - for i, itm := range list { - items[i] = ListItem{ - Path: itm.Path, - Meta: convertMeta(itm.Pointer), - IsPrefix: itm.IsPrefix, - } - } - - return items, more, nil -} - // CalcNeededNodes calculate how many minimum nodes are needed for download, // based on t = k + (n-o)k/o -func CalcNeededNodes(rs *pb.RedundancyScheme) int32 { +func CalcNeededNodes(rs storj.RedundancyScheme) int32 { extra := int32(1) - if rs.GetSuccessThreshold() > 0 { - extra = ((rs.GetTotal() - rs.GetSuccessThreshold()) * rs.GetMinReq()) / rs.GetSuccessThreshold() + if rs.OptimalShares > 0 { + extra = int32(((rs.TotalShares - rs.OptimalShares) * rs.RequiredShares) / rs.OptimalShares) if extra == 0 { // ensure there is at least one extra node, so we can have error detection/correction extra = 1 } } - needed := rs.GetMinReq() + extra + needed := int32(rs.RequiredShares) + extra - if needed > rs.GetTotal() { - needed = rs.GetTotal() + if needed > int32(rs.TotalShares) { + needed = int32(rs.TotalShares) } return needed } - -// convertMeta converts pointer to segment metadata -func convertMeta(pr *pb.Pointer) Meta { - return Meta{ - Modified: pr.GetCreationDate(), - Expiration: pr.GetExpirationDate(), - Size: pr.GetSegmentSize(), - Data: pr.GetMetadata(), - } -} - -func splitPathFragments(path storj.Path) (bucket string, objectPath storj.Path, segmentIndex int64, err error) { - components := storj.SplitPath(path) - if len(components) < 1 { - return "", "", -2, Error.New("empty path") - } - - segmentIndex, err = convertSegmentIndex(components[0]) - if err != nil { - return "", "", -2, err - } - - if len(components) > 1 { - bucket = components[1] - objectPath = storj.JoinPaths(components[2:]...) - } - - return bucket, objectPath, segmentIndex, nil -} - -func convertSegmentIndex(segmentComp string) (segmentIndex int64, err error) { - switch { - case segmentComp == "l": - return -1, nil - case strings.HasPrefix(segmentComp, "s"): - num, err := strconv.Atoi(segmentComp[1:]) - if err != nil { - return -2, Error.Wrap(err) - } - return int64(num), nil - default: - return -2, Error.New("invalid segment component: %s", segmentComp) - } -} diff --git a/uplink/storage/segments/store_test.go b/uplink/storage/segments/store_test.go index b6fea75b5..57185f339 100644 --- a/uplink/storage/segments/store_test.go +++ b/uplink/storage/segments/store_test.go @@ -4,212 +4,18 @@ package segments_test import ( - "bytes" - "context" "fmt" - "io/ioutil" - "strconv" "testing" - time "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/vivint/infectious" - "storj.io/storj/internal/memory" - "storj.io/storj/internal/testcontext" - "storj.io/storj/internal/testplanet" - "storj.io/storj/internal/testrand" - "storj.io/storj/pkg/macaroon" - "storj.io/storj/pkg/pb" - storj "storj.io/storj/pkg/storj" - "storj.io/storj/satellite/console" - "storj.io/storj/storage" - "storj.io/storj/uplink/ecclient" - "storj.io/storj/uplink/eestream" - "storj.io/storj/uplink/storage/meta" + "storj.io/storj/pkg/storj" "storj.io/storj/uplink/storage/segments" ) -func TestSegmentStoreMeta(t *testing.T) { - for i, tt := range []struct { - path string - data []byte - metadata []byte - expiration time.Time - err string - }{ - {"l/path/1/2/3", []byte("content"), []byte("metadata"), time.Now().UTC().Add(time.Hour * 12), ""}, - {"l/not-exists-path/1/2/3", []byte{}, []byte{}, time.Now(), "key not found"}, - {"", []byte{}, []byte{}, time.Now(), "invalid segment component"}, - } { - test := tt - t.Run("#"+strconv.Itoa(i), func(t *testing.T) { - runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, segmentStore segments.Store) { - expectedSize := int64(len(test.data)) - reader := bytes.NewReader(test.data) - - beforeModified := time.Now() - if test.err == "" { - meta, err := segmentStore.Put(ctx, reader, test.expiration, func() (storj.Path, []byte, error) { - return test.path, test.metadata, nil - }) - require.NoError(t, err) - assert.Equal(t, expectedSize, meta.Size) - assert.Equal(t, test.metadata, meta.Data) - assert.True(t, test.expiration.Equal(meta.Expiration)) - assert.True(t, meta.Modified.After(beforeModified)) - } - - meta, err := segmentStore.Meta(ctx, test.path) - if test.err == "" { - require.NoError(t, err) - assert.Equal(t, expectedSize, meta.Size) - assert.Equal(t, test.metadata, meta.Data) - assert.True(t, test.expiration.Equal(meta.Expiration)) - assert.True(t, meta.Modified.After(beforeModified)) - } else { - require.Contains(t, err.Error(), test.err) - } - }) - }) - } -} - -func TestSegmentStorePutGet(t *testing.T) { - for _, tt := range []struct { - name string - path string - metadata []byte - expiration time.Time - content []byte - }{ - {"test inline put/get", "l/path/1", []byte("metadata-intline"), time.Time{}, testrand.Bytes(2 * memory.KiB)}, - {"test remote put/get", "s0/test-bucket/mypath/1", []byte("metadata-remote"), time.Time{}, testrand.Bytes(100 * memory.KiB)}, - } { - test := tt - runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, segmentStore segments.Store) { - metadata, err := segmentStore.Put(ctx, bytes.NewReader(test.content), test.expiration, func() (storj.Path, []byte, error) { - return test.path, test.metadata, nil - }) - require.NoError(t, err, test.name) - require.Equal(t, test.metadata, metadata.Data) - - rr, metadata, err := segmentStore.Get(ctx, test.path) - require.NoError(t, err, test.name) - require.Equal(t, test.metadata, metadata.Data) - - reader, err := rr.Range(ctx, 0, rr.Size()) - require.NoError(t, err, test.name) - content, err := ioutil.ReadAll(reader) - require.NoError(t, err, test.name) - require.Equal(t, test.content, content) - - require.NoError(t, reader.Close(), test.name) - }) - } -} - -func TestSegmentStoreDelete(t *testing.T) { - for _, tt := range []struct { - name string - path string - metadata []byte - expiration time.Time - content []byte - }{ - {"test inline delete", "l/path/1", []byte("metadata"), time.Time{}, testrand.Bytes(2 * memory.KiB)}, - {"test remote delete", "s0/test-bucket/mypath/1", []byte("metadata"), time.Time{}, testrand.Bytes(100 * memory.KiB)}, - } { - test := tt - runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, segmentStore segments.Store) { - _, err := segmentStore.Put(ctx, bytes.NewReader(test.content), test.expiration, func() (storj.Path, []byte, error) { - return test.path, test.metadata, nil - }) - require.NoError(t, err, test.name) - - _, _, err = segmentStore.Get(ctx, test.path) - require.NoError(t, err, test.name) - - // delete existing - err = segmentStore.Delete(ctx, test.path) - require.NoError(t, err, test.name) - - _, _, err = segmentStore.Get(ctx, test.path) - require.Error(t, err, test.name) - require.True(t, storage.ErrKeyNotFound.Has(err)) - - // delete non existing - err = segmentStore.Delete(ctx, test.path) - require.Error(t, err, test.name) - require.True(t, storage.ErrKeyNotFound.Has(err)) - }) - } -} - -func TestSegmentStoreList(t *testing.T) { - runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, segmentStore segments.Store) { - expiration := time.Now().Add(24 * time.Hour * 10) - - segments := []struct { - path string - content []byte - }{ - {"l/aaaa/afile1", []byte("content")}, - {"l/aaaa/bfile2", []byte("content")}, - {"l/bbbb/afile1", []byte("content")}, - {"l/bbbb/bfile2", []byte("content")}, - {"l/bbbb/bfolder/file1", []byte("content")}, - } - for _, seg := range segments { - segment := seg - _, err := segmentStore.Put(ctx, bytes.NewReader(segment.content), expiration, func() (storj.Path, []byte, error) { - return segment.path, []byte{}, nil - }) - require.NoError(t, err) - } - - // should list all - items, more, err := segmentStore.List(ctx, "l", "", "", true, 10, meta.None) - require.NoError(t, err) - require.False(t, more) - require.Equal(t, len(segments), len(items)) - - // should list first two and more = true - items, more, err = segmentStore.List(ctx, "l", "", "", true, 2, meta.None) - require.NoError(t, err) - require.True(t, more) - require.Equal(t, 2, len(items)) - - // should list only prefixes - items, more, err = segmentStore.List(ctx, "l", "", "", false, 10, meta.None) - require.NoError(t, err) - require.False(t, more) - require.Equal(t, 2, len(items)) - - // should list only BBBB bucket - items, more, err = segmentStore.List(ctx, "l/bbbb", "", "", false, 10, meta.None) - require.NoError(t, err) - require.False(t, more) - require.Equal(t, 3, len(items)) - - // should list only BBBB bucket after afile1 - items, more, err = segmentStore.List(ctx, "l/bbbb", "afile1", "", false, 10, meta.None) - require.NoError(t, err) - require.False(t, more) - require.Equal(t, 2, len(items)) - - // should list nothing - items, more, err = segmentStore.List(ctx, "l/cccc", "", "", true, 10, meta.None) - require.NoError(t, err) - require.False(t, more) - require.Equal(t, 0, len(items)) - }) -} - func TestCalcNeededNodes(t *testing.T) { for i, tt := range []struct { - k, m, o, n int32 + k, m, o, n int16 needed int32 }{ {k: 0, m: 0, o: 0, n: 0, needed: 0}, @@ -223,56 +29,13 @@ func TestCalcNeededNodes(t *testing.T) { } { tag := fmt.Sprintf("#%d. %+v", i, tt) - rs := pb.RedundancyScheme{ - MinReq: tt.k, - RepairThreshold: tt.m, - SuccessThreshold: tt.o, - Total: tt.n, + rs := storj.RedundancyScheme{ + RequiredShares: tt.k, + RepairShares: tt.m, + OptimalShares: tt.o, + TotalShares: tt.n, } - assert.Equal(t, tt.needed, segments.CalcNeededNodes(&rs), tag) + assert.Equal(t, tt.needed, segments.CalcNeededNodes(rs), tag) } } - -func runTest(t *testing.T, test func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, segmentStore segments.Store)) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - // TODO move apikey creation to testplanet - project, err := planet.Satellites[0].DB.Console().Projects().Insert(context.Background(), &console.Project{ - Name: "testProject", - }) - require.NoError(t, err) - - apiKey, err := macaroon.NewAPIKey([]byte("testSecret")) - require.NoError(t, err) - - apiKeyInfo := console.APIKeyInfo{ - ProjectID: project.ID, - Name: "testKey", - Secret: []byte("testSecret"), - } - - // add api key to db - _, err = planet.Satellites[0].DB.Console().APIKeys().Create(context.Background(), apiKey.Head(), apiKeyInfo) - require.NoError(t, err) - - TestAPIKey := apiKey.Serialize() - - metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], TestAPIKey) - require.NoError(t, err) - defer ctx.Check(metainfo.Close) - - ec := ecclient.NewClient(planet.Uplinks[0].Log.Named("ecclient"), planet.Uplinks[0].Transport, 0) - fc, err := infectious.NewFEC(2, 4) - require.NoError(t, err) - - rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, 1*memory.KiB.Int()), 0, 0) - require.NoError(t, err) - - segmentStore := segments.NewSegmentStore(metainfo, ec, rs, 4*memory.KiB.Int(), 8*memory.MiB.Int64()) - assert.NotNil(t, segmentStore) - - test(t, ctx, planet, segmentStore) - }) -} diff --git a/uplink/storage/streams/shim.go b/uplink/storage/streams/shim.go index 0a90dae83..4724c4021 100644 --- a/uplink/storage/streams/shim.go +++ b/uplink/storage/streams/shim.go @@ -11,6 +11,7 @@ import ( "storj.io/storj/pkg/encryption" "storj.io/storj/pkg/ranger" "storj.io/storj/pkg/storj" + "storj.io/storj/uplink/metainfo" "storj.io/storj/uplink/storage/segments" ) @@ -28,8 +29,8 @@ type shimStore struct { } // NewStreamStore constructs a Store. -func NewStreamStore(segments segments.Store, segmentSize int64, encStore *encryption.Store, encBlockSize int, cipher storj.CipherSuite, inlineThreshold int) (Store, error) { - typedStore, err := newTypedStreamStore(segments, segmentSize, encStore, encBlockSize, cipher, inlineThreshold) +func NewStreamStore(metainfo *metainfo.Client, segments segments.Store, segmentSize int64, encStore *encryption.Store, encBlockSize int, cipher storj.CipherSuite, inlineThreshold int) (Store, error) { + typedStore, err := newTypedStreamStore(metainfo, segments, segmentSize, encStore, encBlockSize, cipher, inlineThreshold) if err != nil { return nil, err } diff --git a/uplink/storage/streams/store.go b/uplink/storage/streams/store.go index dfdcacdd3..26806096a 100644 --- a/uplink/storage/streams/store.go +++ b/uplink/storage/streams/store.go @@ -9,7 +9,6 @@ import ( "crypto/rand" "io" "io/ioutil" - "strconv" "strings" "time" @@ -23,9 +22,8 @@ import ( "storj.io/storj/pkg/pb" "storj.io/storj/pkg/ranger" "storj.io/storj/pkg/storj" - "storj.io/storj/storage" "storj.io/storj/uplink/eestream" - "storj.io/storj/uplink/storage/meta" + "storj.io/storj/uplink/metainfo" "storj.io/storj/uplink/storage/segments" ) @@ -47,10 +45,10 @@ func numberOfSegments(stream *pb.StreamInfo, streamMeta *pb.StreamMeta) int64 { } // convertMeta converts segment metadata to stream metadata -func convertMeta(lastSegmentMeta segments.Meta, stream pb.StreamInfo, streamMeta pb.StreamMeta) Meta { +func convertMeta(modified, expiration time.Time, stream pb.StreamInfo, streamMeta pb.StreamMeta) Meta { return Meta{ - Modified: lastSegmentMeta.Modified, - Expiration: lastSegmentMeta.Expiration, + Modified: modified, + Expiration: expiration, Size: ((numberOfSegments(&stream, &streamMeta) - 1) * stream.SegmentsSize) + stream.LastSegmentSize, Data: stream.Metadata, } @@ -68,6 +66,7 @@ type typedStore interface { // streamStore is a store for streams. It implements typedStore as part of an ongoing migration // to use typed paths. See the shim for the store that the rest of the world interacts with. type streamStore struct { + metainfo *metainfo.Client segments segments.Store segmentSize int64 encStore *encryption.Store @@ -77,7 +76,7 @@ type streamStore struct { } // newTypedStreamStore constructs a typedStore backed by a streamStore. -func newTypedStreamStore(segments segments.Store, segmentSize int64, encStore *encryption.Store, encBlockSize int, cipher storj.CipherSuite, inlineThreshold int) (typedStore, error) { +func newTypedStreamStore(metainfo *metainfo.Client, segments segments.Store, segmentSize int64, encStore *encryption.Store, encBlockSize int, cipher storj.CipherSuite, inlineThreshold int) (typedStore, error) { if segmentSize <= 0 { return nil, errs.New("segment size must be larger than 0") } @@ -86,6 +85,7 @@ func newTypedStreamStore(segments segments.Store, segmentSize int64, encStore *e } return &streamStore{ + metainfo: metainfo, segments: segments, segmentSize: segmentSize, encStore: encStore, @@ -104,44 +104,54 @@ func (s *streamStore) Put(ctx context.Context, path Path, pathCipher storj.Ciphe // previously file uploaded? err = s.Delete(ctx, path, pathCipher) - if err != nil && !storage.ErrKeyNotFound.Has(err) { + if err != nil && !storj.ErrObjectNotFound.Has(err) { // something wrong happened checking for an existing // file with the same name return Meta{}, err } - m, lastSegment, err := s.upload(ctx, path, pathCipher, data, metadata, expiration) + m, lastSegment, streamID, err := s.upload(ctx, path, pathCipher, data, metadata, expiration) if err != nil { - s.cancelHandler(context.Background(), lastSegment, path, pathCipher) + s.cancelHandler(context.Background(), streamID, lastSegment, path, pathCipher) } return m, err } -func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (m Meta, lastSegment int64, err error) { +func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.CipherSuite, data io.Reader, metadata []byte, expiration time.Time) (m Meta, lastSegment int64, streamID storj.StreamID, err error) { defer mon.Task()(&ctx)(&err) var currentSegment int64 var streamSize int64 var putMeta segments.Meta + var objectMetadata []byte + + derivedKey, err := encryption.DeriveContentKey(path.Bucket(), path.UnencryptedPath(), s.encStore) + if err != nil { + return Meta{}, currentSegment, streamID, err + } + encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore) + if err != nil { + return Meta{}, currentSegment, streamID, err + } + + streamID, err = s.metainfo.BeginObject(ctx, metainfo.BeginObjectParams{ + Bucket: []byte(path.Bucket()), + EncryptedPath: []byte(encPath.Raw()), + ExpiresAt: expiration, + }) + if err != nil { + return Meta{}, currentSegment, streamID, err + } defer func() { select { case <-ctx.Done(): - s.cancelHandler(context.Background(), currentSegment, path, pathCipher) + s.cancelHandler(context.Background(), streamID, currentSegment, path, pathCipher) default: } }() - derivedKey, err := encryption.DeriveContentKey(path.Bucket(), path.UnencryptedPath(), s.encStore) - if err != nil { - return Meta{}, currentSegment, err - } - encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore) - if err != nil { - return Meta{}, currentSegment, err - } - eofReader := NewEOFReader(data) for !eofReader.isEOF() && !eofReader.hasError() { @@ -149,7 +159,7 @@ func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Ci var contentKey storj.Key _, err = rand.Read(contentKey[:]) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } // Initialize the content nonce with the segment's index incremented by 1. @@ -158,24 +168,24 @@ func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Ci var contentNonce storj.Nonce _, err := encryption.Increment(&contentNonce, currentSegment+1) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } encrypter, err := encryption.NewEncrypter(s.cipher, &contentKey, &contentNonce, s.encBlockSize) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } // generate random nonce for encrypting the content key var keyNonce storj.Nonce _, err = rand.Read(keyNonce[:]) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } encryptedKey, err := encryption.EncryptKey(&contentKey, s.cipher, derivedKey, &keyNonce) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } sizeReader := NewSizeReader(eofReader) @@ -184,7 +194,7 @@ func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Ci // If the data is larger than the inline threshold size, then it will be a remote segment isRemote, err := peekReader.IsLargerThan(s.inlineThreshold) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } var transformedReader io.Reader if isRemote { @@ -193,81 +203,62 @@ func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Ci } else { data, err := ioutil.ReadAll(peekReader) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } cipherData, err := encryption.Encrypt(data, s.cipher, &contentKey, &contentNonce) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err } transformedReader = bytes.NewReader(cipherData) } - putMeta, err = s.segments.Put(ctx, transformedReader, expiration, func() (storj.Path, []byte, error) { - if !eofReader.isEOF() { - segmentPath, err := createSegmentPath(ctx, currentSegment, path.Bucket(), encPath) - if err != nil { - return "", nil, err - } - - if s.cipher == storj.EncNull { - return segmentPath, nil, nil - } - - segmentMeta, err := proto.Marshal(&pb.SegmentMeta{ - EncryptedKey: encryptedKey, - KeyNonce: keyNonce[:], - }) - if err != nil { - return "", nil, err - } - - return segmentPath, segmentMeta, nil - } - - lastSegmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath) - if err != nil { - return "", nil, err - } - - streamInfo, err := proto.Marshal(&pb.StreamInfo{ - DeprecatedNumberOfSegments: currentSegment + 1, - SegmentsSize: s.segmentSize, - LastSegmentSize: sizeReader.Size(), - Metadata: metadata, - }) - if err != nil { - return "", nil, err - } - - // encrypt metadata with the content encryption key and zero nonce - encryptedStreamInfo, err := encryption.Encrypt(streamInfo, s.cipher, &contentKey, &storj.Nonce{}) - if err != nil { - return "", nil, err - } - - streamMeta := pb.StreamMeta{ - NumberOfSegments: currentSegment + 1, - EncryptedStreamInfo: encryptedStreamInfo, - EncryptionType: int32(s.cipher), - EncryptionBlockSize: int32(s.encBlockSize), - } - + putMeta, err = s.segments.Put(ctx, streamID, transformedReader, expiration, func() (_ int64, segmentEncryption storj.SegmentEncryption, err error) { if s.cipher != storj.EncNull { - streamMeta.LastSegmentMeta = &pb.SegmentMeta{ - EncryptedKey: encryptedKey, - KeyNonce: keyNonce[:], + segmentEncryption = storj.SegmentEncryption{ + EncryptedKey: encryptedKey, + EncryptedKeyNonce: keyNonce, } } - - lastSegmentMeta, err := proto.Marshal(&streamMeta) - if err != nil { - return "", nil, err - } - - return lastSegmentPath, lastSegmentMeta, nil + return currentSegment, segmentEncryption, nil }) if err != nil { - return Meta{}, currentSegment, err + return Meta{}, currentSegment, streamID, err + } + + streamInfo, err := proto.Marshal(&pb.StreamInfo{ + DeprecatedNumberOfSegments: currentSegment + 1, + SegmentsSize: s.segmentSize, + LastSegmentSize: sizeReader.Size(), + Metadata: metadata, + }) + if err != nil { + return Meta{}, currentSegment, streamID, err + } + + // encrypt metadata with the content encryption key and zero nonce + encryptedStreamInfo, err := encryption.Encrypt(streamInfo, s.cipher, &contentKey, &storj.Nonce{}) + if err != nil { + return Meta{}, currentSegment, streamID, err + } + + streamMeta := pb.StreamMeta{ + NumberOfSegments: currentSegment + 1, + EncryptedStreamInfo: encryptedStreamInfo, + EncryptionType: int32(s.cipher), + EncryptionBlockSize: int32(s.encBlockSize), + } + + if s.cipher != storj.EncNull { + streamMeta.LastSegmentMeta = &pb.SegmentMeta{ + EncryptedKey: encryptedKey, + KeyNonce: keyNonce[:], + } + + } + + objectMetadata, err = proto.Marshal(&streamMeta) + if err != nil { + return Meta{}, currentSegment, streamID, err } currentSegment++ @@ -275,7 +266,15 @@ func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Ci } if eofReader.hasError() { - return Meta{}, currentSegment, eofReader.err + return Meta{}, currentSegment, streamID, eofReader.err + } + + err = s.metainfo.CommitObject(ctx, metainfo.CommitObjectParams{ + StreamID: streamID, + EncryptedMetadata: objectMetadata, + }) + if err != nil { + return Meta{}, currentSegment, streamID, err } resultMeta := Meta{ @@ -285,7 +284,7 @@ func (s *streamStore) upload(ctx context.Context, path Path, pathCipher storj.Ci Data: metadata, } - return resultMeta, currentSegment, nil + return resultMeta, currentSegment, streamID, nil } // Get returns a ranger that knows what the overall size is (from l/) @@ -299,17 +298,20 @@ func (s *streamStore) Get(ctx context.Context, path Path, pathCipher storj.Ciphe return nil, Meta{}, err } - segmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath) + object, err := s.metainfo.GetObject(ctx, metainfo.GetObjectParams{ + Bucket: []byte(path.Bucket()), + EncryptedPath: []byte(encPath.Raw()), + }) if err != nil { return nil, Meta{}, err } - lastSegmentRanger, lastSegmentMeta, err := s.segments.Get(ctx, segmentPath) + lastSegmentRanger, _, err := s.segments.Get(ctx, object.StreamID, -1, object.RedundancyScheme) if err != nil { return nil, Meta{}, err } - streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.encStore) + streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, object.Metadata, path, s.encStore) if err != nil { return nil, Meta{}, err } @@ -327,11 +329,6 @@ func (s *streamStore) Get(ctx context.Context, path Path, pathCipher storj.Ciphe var rangers []ranger.Ranger for i := int64(0); i < numberOfSegments(&stream, &streamMeta)-1; i++ { - currentPath, err := createSegmentPath(ctx, i, path.Bucket(), encPath) - if err != nil { - return nil, Meta{}, err - } - var contentNonce storj.Nonce _, err = encryption.Increment(&contentNonce, i+1) if err != nil { @@ -340,7 +337,10 @@ func (s *streamStore) Get(ctx context.Context, path Path, pathCipher storj.Ciphe rangers = append(rangers, &lazySegmentRanger{ segments: s.segments, - path: currentPath, + streamID: object.StreamID, + segmentIndex: int32(i), + rs: object.RedundancyScheme, + m: streamMeta.LastSegmentMeta, size: stream.SegmentsSize, derivedKey: derivedKey, startingNonce: &contentNonce, @@ -373,7 +373,7 @@ func (s *streamStore) Get(ctx context.Context, path Path, pathCipher storj.Ciphe rangers = append(rangers, decryptedLastSegmentRanger) catRangers := ranger.Concat(rangers...) - meta = convertMeta(lastSegmentMeta, stream, streamMeta) + meta = convertMeta(object.Modified, object.Expires, stream, streamMeta) return catRangers, meta, nil } @@ -386,17 +386,12 @@ func (s *streamStore) Meta(ctx context.Context, path Path, pathCipher storj.Ciph return Meta{}, err } - segmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath) - if err != nil { - return Meta{}, err - } + object, err := s.metainfo.GetObject(ctx, metainfo.GetObjectParams{ + Bucket: []byte(path.Bucket()), + EncryptedPath: []byte(encPath.Raw()), + }) - lastSegmentMeta, err := s.segments.Meta(ctx, segmentPath) - if err != nil { - return Meta{}, err - } - - streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.encStore) + streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, object.Metadata, path, s.encStore) if err != nil { return Meta{}, err } @@ -406,7 +401,7 @@ func (s *streamStore) Meta(ctx context.Context, path Path, pathCipher storj.Ciph return Meta{}, err } - return convertMeta(lastSegmentMeta, stream, streamMeta), nil + return convertMeta(object.Modified, object.Expires, stream, streamMeta), nil } // Delete all the segments, with the last one last @@ -418,42 +413,35 @@ func (s *streamStore) Delete(ctx context.Context, path Path, pathCipher storj.Ci return err } - lastSegmentPath, err := createSegmentPath(ctx, -1, path.Bucket(), encPath) + // TODO do it in batch + streamID, err := s.metainfo.BeginDeleteObject(ctx, metainfo.BeginDeleteObjectParams{ + Bucket: []byte(path.Bucket()), + EncryptedPath: []byte(encPath.Raw()), + }) if err != nil { return err } - lastSegmentMeta, err := s.segments.Meta(ctx, lastSegmentPath) + // TODO handle `more` + items, _, err := s.metainfo.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + StreamID: streamID, + CursorPosition: storj.SegmentPosition{ + Index: 0, + }, + }) if err != nil { return err } - streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, lastSegmentMeta.Data, path, s.encStore) - if err != nil { - return err - } - var stream pb.StreamInfo - if err := proto.Unmarshal(streamInfo, &stream); err != nil { - return err - } - var errlist errs.Group - for i := 0; i < int(numberOfSegments(&stream, &streamMeta)-1); i++ { - currentPath, err := createSegmentPath(ctx, int64(i), path.Bucket(), encPath) - if err != nil { - errlist.Add(err) - continue - } - - err = s.segments.Delete(ctx, currentPath) + for _, item := range items { + err = s.segments.Delete(ctx, streamID, item.Position.Index) if err != nil { errlist.Add(err) continue } } - errlist.Add(s.segments.Delete(ctx, lastSegmentPath)) - return errlist.Err() } @@ -475,11 +463,12 @@ func pathForKey(raw string) paths.Unencrypted { func (s *streamStore) List(ctx context.Context, prefix Path, startAfter, endBefore string, pathCipher storj.CipherSuite, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) { defer mon.Task()(&ctx)(&err) - if metaFlags&meta.Size != 0 { - // Calculating the stream's size require also the user-defined metadata, - // where stream store keeps info about the number of segments and their size. - metaFlags |= meta.UserDefined - } + // TODO use flags with listing + // if metaFlags&meta.Size != 0 { + // Calculating the stream's size require also the user-defined metadata, + // where stream store keeps info about the number of segments and their size. + // metaFlags |= meta.UserDefined + // } prefixKey, err := encryption.DerivePathKey(prefix.Bucket(), pathForKey(prefix.UnencryptedPath().Raw()), s.encStore) if err != nil { @@ -510,29 +499,26 @@ func (s *streamStore) List(ctx context.Context, prefix Path, startAfter, endBefo if err != nil { return nil, false, err } - endBefore, err = encryption.EncryptPathRaw(endBefore, pathCipher, prefixKey) - if err != nil { - return nil, false, err - } } - segmentPrefix, err := createSegmentPath(ctx, -1, prefix.Bucket(), encPrefix) + objects, more, err := s.metainfo.ListObjects(ctx, metainfo.ListObjectsParams{ + Bucket: []byte(prefix.Bucket()), + EncryptedPrefix: []byte(encPrefix.Raw()), + EncryptedCursor: []byte(startAfter), + Limit: int32(limit), + Recursive: recursive, + }) if err != nil { return nil, false, err } - segments, more, err := s.segments.List(ctx, segmentPrefix, startAfter, endBefore, recursive, limit, metaFlags) - if err != nil { - return nil, false, err - } - - items = make([]ListItem, len(segments)) - for i, item := range segments { + items = make([]ListItem, len(objects)) + for i, item := range objects { var path Path var itemPath string if needsEncryption { - itemPath, err = encryption.DecryptPathRaw(item.Path, pathCipher, prefixKey) + itemPath, err = encryption.DecryptPathRaw(string(item.EncryptedPath), pathCipher, prefixKey) if err != nil { return nil, false, err } @@ -547,11 +533,11 @@ func (s *streamStore) List(ctx context.Context, prefix Path, startAfter, endBefo path = CreatePath(prefix.Bucket(), paths.NewUnencrypted(fullPath)) } else { - itemPath = item.Path - path = CreatePath(item.Path, paths.Unencrypted{}) + itemPath = string(item.EncryptedPath) + path = CreatePath(string(item.EncryptedPath), paths.Unencrypted{}) } - streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, item.Meta.Data, path, s.encStore) + streamInfo, streamMeta, err := TypedDecryptStreamInfo(ctx, item.EncryptedMetadata, path, s.encStore) if err != nil { return nil, false, err } @@ -561,7 +547,7 @@ func (s *streamStore) List(ctx context.Context, prefix Path, startAfter, endBefo return nil, false, err } - newMeta := convertMeta(item.Meta, stream, streamMeta) + newMeta := convertMeta(item.CreatedAt, item.ExpiresAt, stream, streamMeta) items[i] = ListItem{ Path: itemPath, Meta: newMeta, @@ -575,7 +561,10 @@ func (s *streamStore) List(ctx context.Context, prefix Path, startAfter, endBefo type lazySegmentRanger struct { ranger ranger.Ranger segments segments.Store - path storj.Path + streamID storj.StreamID + segmentIndex int32 + rs storj.RedundancyScheme + m *pb.SegmentMeta size int64 derivedKey *storj.Key startingNonce *storj.Nonce @@ -592,17 +581,13 @@ func (lr *lazySegmentRanger) Size() int64 { func (lr *lazySegmentRanger) Range(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) { defer mon.Task()(&ctx)(&err) if lr.ranger == nil { - rr, m, err := lr.segments.Get(ctx, lr.path) + rr, encryption, err := lr.segments.Get(ctx, lr.streamID, lr.segmentIndex, lr.rs) if err != nil { return nil, err } - segmentMeta := pb.SegmentMeta{} - err = proto.Unmarshal(m.Data, &segmentMeta) - if err != nil { - return nil, err - } - encryptedKey, keyNonce := getEncryptedKeyAndNonce(&segmentMeta) - lr.ranger, err = decryptRanger(ctx, rr, lr.size, lr.cipher, lr.derivedKey, encryptedKey, keyNonce, lr.startingNonce, lr.encBlockSize) + + encryptedKey, keyNonce := encryption.EncryptedKey, encryption.EncryptedKeyNonce + lr.ranger, err = decryptRanger(ctx, rr, lr.size, lr.cipher, lr.derivedKey, encryptedKey, &keyNonce, lr.startingNonce, lr.encBlockSize) if err != nil { return nil, err } @@ -649,25 +634,13 @@ func decryptRanger(ctx context.Context, rr ranger.Ranger, decryptedSize int64, c } // CancelHandler handles clean up of segments on receiving CTRL+C -func (s *streamStore) cancelHandler(ctx context.Context, totalSegments int64, path Path, pathCipher storj.CipherSuite) { +func (s *streamStore) cancelHandler(ctx context.Context, streamID storj.StreamID, totalSegments int64, path Path, pathCipher storj.CipherSuite) { defer mon.Task()(&ctx)(nil) - encPath, err := encryption.EncryptPath(path.Bucket(), path.UnencryptedPath(), pathCipher, s.encStore) - if err != nil { - zap.S().Warnf("Failed deleting segments: %v", err) - return - } - for i := int64(0); i < totalSegments; i++ { - currentPath, err := createSegmentPath(ctx, i, path.Bucket(), encPath) + err := s.segments.Delete(ctx, streamID, int32(i)) if err != nil { - zap.S().Warnf("Failed deleting segment %d: %v", i, err) - continue - } - - err = s.segments.Delete(ctx, currentPath) - if err != nil { - zap.S().Warnf("Failed deleting segment %v: %v", currentPath, err) + zap.L().Warn("Failed deleting segment", zap.String("path", path.String()), zap.Int64("segmentIndex", i), zap.Error(err)) continue } } @@ -710,33 +683,3 @@ func TypedDecryptStreamInfo(ctx context.Context, streamMetaBytes []byte, path Pa streamInfo, err = encryption.Decrypt(streamMeta.EncryptedStreamInfo, cipher, contentKey, &storj.Nonce{}) return streamInfo, streamMeta, err } - -// createSegmentPath will create a storj.Path that the segment store expects. -func createSegmentPath(ctx context.Context, segmentIndex int64, bucket string, encPath paths.Encrypted) (path storj.Path, err error) { - defer mon.Task()(&ctx)(&err) - - if segmentIndex < -1 { - return "", errs.New("invalid segment index") - } - - var raw []byte - if segmentIndex > -1 { - raw = append(raw, 's') - raw = append(raw, strconv.FormatInt(segmentIndex, 10)...) - } else { - raw = append(raw, 'l') - } - raw = append(raw, '/') - - if len(bucket) > 0 { - raw = append(raw, bucket...) - raw = append(raw, '/') - - if encPath.Valid() { - raw = append(raw, encPath.Raw()...) - raw = append(raw, '/') - } - } - - return storj.Path(raw[:len(raw)-1]), nil -} diff --git a/uplink/storage/streams/store_test.go b/uplink/storage/streams/store_test.go new file mode 100644 index 000000000..7136304fb --- /dev/null +++ b/uplink/storage/streams/store_test.go @@ -0,0 +1,228 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package streams_test + +import ( + "bytes" + "context" + "io/ioutil" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "storj.io/storj/internal/memory" + "storj.io/storj/internal/testcontext" + "storj.io/storj/internal/testplanet" + "storj.io/storj/internal/testrand" + "storj.io/storj/pkg/encryption" + "storj.io/storj/pkg/macaroon" + "storj.io/storj/pkg/storj" + "storj.io/storj/satellite/console" + "storj.io/storj/uplink/ecclient" + "storj.io/storj/uplink/eestream" + "storj.io/storj/uplink/storage/meta" + "storj.io/storj/uplink/storage/segments" + "storj.io/storj/uplink/storage/streams" +) + +const ( + TestEncKey = "test-encryption-key" +) + +func TestStreamsStorePutGet(t *testing.T) { + runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) { + bucketName := "bucket-name" + err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName) + require.NoError(t, err) + + for _, tt := range []struct { + name string + path string + metadata []byte + expiration time.Time + content []byte + }{ + {"test inline put/get", "path/1", []byte("inline-metadata"), time.Time{}, testrand.Bytes(2 * memory.KiB)}, + {"test remote put/get", "mypath/1", []byte("remote-metadata"), time.Time{}, testrand.Bytes(100 * memory.KiB)}, + } { + test := tt + + path := storj.JoinPaths(bucketName, test.path) + _, err = streamStore.Put(ctx, path, storj.EncNull, bytes.NewReader(test.content), test.metadata, test.expiration) + require.NoError(t, err, test.name) + + rr, metadata, err := streamStore.Get(ctx, path, storj.EncNull) + require.NoError(t, err, test.name) + require.Equal(t, test.metadata, metadata.Data) + + reader, err := rr.Range(ctx, 0, rr.Size()) + require.NoError(t, err, test.name) + content, err := ioutil.ReadAll(reader) + require.NoError(t, err, test.name) + require.Equal(t, test.content, content) + + require.NoError(t, reader.Close(), test.name) + } + }) +} + +func TestStreamsStoreDelete(t *testing.T) { + runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) { + bucketName := "bucket-name" + err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName) + require.NoError(t, err) + + for _, tt := range []struct { + name string + path string + metadata []byte + expiration time.Time + content []byte + }{ + {"test inline delete", "path/1", []byte("inline-metadata"), time.Time{}, testrand.Bytes(2 * memory.KiB)}, + {"test remote delete", "mypath/1", []byte("remote-metadata"), time.Time{}, testrand.Bytes(100 * memory.KiB)}, + } { + test := tt + + path := storj.JoinPaths(bucketName, test.path) + _, err = streamStore.Put(ctx, path, storj.EncNull, bytes.NewReader(test.content), test.metadata, test.expiration) + require.NoError(t, err, test.name) + + // delete existing + err = streamStore.Delete(ctx, path, storj.EncNull) + require.NoError(t, err, test.name) + + _, _, err = streamStore.Get(ctx, path, storj.EncNull) + require.Error(t, err, test.name) + require.True(t, storj.ErrObjectNotFound.Has(err)) + + // delete non existing + err = streamStore.Delete(ctx, path, storj.EncNull) + require.Error(t, err, test.name) + require.True(t, storj.ErrObjectNotFound.Has(err)) + } + }) +} + +func TestStreamStoreList(t *testing.T) { + runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) { + expiration := time.Now().Add(10 * 24 * time.Hour) + + bucketName := "bucket-name" + err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName) + require.NoError(t, err) + + objects := []struct { + path string + content []byte + }{ + {"aaaa/afile1", []byte("content")}, + {"aaaa/bfile2", []byte("content")}, + {"bbbb/afile1", []byte("content")}, + {"bbbb/bfile2", []byte("content")}, + {"bbbb/bfolder/file1", []byte("content")}, + } + for _, test := range objects { + test := test + data := bytes.NewReader(test.content) + path := storj.JoinPaths(bucketName, test.path) + _, err := streamStore.Put(ctx, path, storj.EncNull, data, []byte{}, expiration) + require.NoError(t, err) + } + + prefix := bucketName + + // should list all + items, more, err := streamStore.List(ctx, prefix, "", "", storj.EncNull, true, 10, meta.None) + require.NoError(t, err) + require.False(t, more) + require.Equal(t, len(objects), len(items)) + + // should list first two and more = true + items, more, err = streamStore.List(ctx, prefix, "", "", storj.EncNull, true, 2, meta.None) + require.NoError(t, err) + require.True(t, more) + require.Equal(t, 2, len(items)) + + // should list only prefixes + items, more, err = streamStore.List(ctx, prefix, "", "", storj.EncNull, false, 10, meta.None) + require.NoError(t, err) + require.False(t, more) + require.Equal(t, 2, len(items)) + + // should list only BBBB bucket + prefix = storj.JoinPaths(bucketName, "bbbb") + items, more, err = streamStore.List(ctx, prefix, "", "", storj.EncNull, false, 10, meta.None) + require.NoError(t, err) + require.False(t, more) + require.Equal(t, 3, len(items)) + + // should list only BBBB bucket after afile + items, more, err = streamStore.List(ctx, prefix, "afile1", "", storj.EncNull, false, 10, meta.None) + require.NoError(t, err) + require.False(t, more) + require.Equal(t, 2, len(items)) + + // should list nothing + prefix = storj.JoinPaths(bucketName, "cccc") + items, more, err = streamStore.List(ctx, prefix, "", "", storj.EncNull, true, 10, meta.None) + require.NoError(t, err) + require.False(t, more) + require.Equal(t, 0, len(items)) + }) +} + +func runTest(t *testing.T, test func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamsStore streams.Store)) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + // TODO move apikey creation to testplanet + projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx) + require.NoError(t, err) + + apiKey, err := macaroon.NewAPIKey([]byte("testSecret")) + require.NoError(t, err) + + apiKeyInfo := console.APIKeyInfo{ + ProjectID: projects[0].ID, + Name: "testKey", + Secret: []byte("testSecret"), + } + + // add api key to db + _, err = planet.Satellites[0].DB.Console().APIKeys().Create(context.Background(), apiKey.Head(), apiKeyInfo) + require.NoError(t, err) + + TestAPIKey := apiKey.Serialize() + + metainfo, err := planet.Uplinks[0].DialMetainfo(context.Background(), planet.Satellites[0], TestAPIKey) + require.NoError(t, err) + defer ctx.Check(metainfo.Close) + + ec := ecclient.NewClient(planet.Uplinks[0].Log.Named("ecclient"), planet.Uplinks[0].Transport, 0) + + cfg := planet.Uplinks[0].GetConfig(planet.Satellites[0]) + rs, err := eestream.NewRedundancyStrategyFromStorj(cfg.GetRedundancyScheme()) + require.NoError(t, err) + + segmentStore := segments.NewSegmentStore(metainfo, ec, rs, 4*memory.KiB.Int(), 8*memory.MiB.Int64()) + assert.NotNil(t, segmentStore) + + key := new(storj.Key) + copy(key[:], TestEncKey) + + encStore := encryption.NewStore() + encStore.SetDefaultKey(key) + + const stripesPerBlock = 2 + blockSize := stripesPerBlock * rs.StripeSize() + inlineThreshold := 8 * memory.KiB.Int() + streamStore, err := streams.NewStreamStore(metainfo, segmentStore, 64*memory.MiB.Int64(), encStore, blockSize, storj.EncNull, inlineThreshold) + require.NoError(t, err) + + test(t, ctx, planet, streamStore) + }) +}