diff --git a/lib/uplink/project.go b/lib/uplink/project.go index 66f3b8e17..fa5b9b4eb 100644 --- a/lib/uplink/project.go +++ b/lib/uplink/project.go @@ -251,7 +251,10 @@ func (p *Project) checkBucketAttribution(ctx context.Context, bucketName string) return Error.Wrap(err) } - return p.metainfo.SetAttribution(ctx, bucketName, *partnerID) + return p.metainfo.SetBucketAttribution(ctx, metainfo.SetBucketAttributionParams{ + Bucket: bucketName, + PartnerID: *partnerID, + }) } // updateBucket updates an existing bucket's attribution info. diff --git a/satellite/metainfo/metainfo_test.go b/satellite/metainfo/metainfo_test.go index c7d36dcc8..b0dbd84e9 100644 --- a/satellite/metainfo/metainfo_test.go +++ b/satellite/metainfo/metainfo_test.go @@ -52,22 +52,22 @@ func TestInvalidAPIKey(t *testing.T) { client.SetRawAPIKey([]byte(invalidAPIKey)) - _, _, _, err = client.CreateSegment(ctx, "hello", "world", 1, &pb.RedundancyScheme{}, 123, time.Now().Add(time.Hour)) + _, _, _, err = client.CreateSegmentOld(ctx, "hello", "world", 1, &pb.RedundancyScheme{}, 123, time.Now().Add(time.Hour)) assertUnauthenticated(t, err, false) - _, err = client.CommitSegment(ctx, "testbucket", "testpath", 0, &pb.Pointer{}, nil) + _, err = client.CommitSegmentOld(ctx, "testbucket", "testpath", 0, &pb.Pointer{}, nil) assertUnauthenticated(t, err, false) - _, err = client.SegmentInfo(ctx, "testbucket", "testpath", 0) + _, err = client.SegmentInfoOld(ctx, "testbucket", "testpath", 0) assertUnauthenticated(t, err, false) - _, _, _, err = client.ReadSegment(ctx, "testbucket", "testpath", 0) + _, _, _, err = client.ReadSegmentOld(ctx, "testbucket", "testpath", 0) assertUnauthenticated(t, err, false) - _, _, err = client.DeleteSegment(ctx, "testbucket", "testpath", 0) + _, _, err = client.DeleteSegmentOld(ctx, "testbucket", "testpath", 0) assertUnauthenticated(t, err, false) - _, _, err = client.ListSegments(ctx, "testbucket", "", "", "", true, 1, 0) + _, _, err = client.ListSegmentsOld(ctx, "testbucket", "", "", "", true, 1, 0) assertUnauthenticated(t, err, false) } } @@ -165,25 +165,25 @@ func TestRestrictedAPIKey(t *testing.T) { require.NoError(t, err) defer ctx.Check(client.Close) - _, _, _, err = client.CreateSegment(ctx, "testbucket", "testpath", 1, &pb.RedundancyScheme{}, 123, time.Now().Add(time.Hour)) + _, _, _, err = client.CreateSegmentOld(ctx, "testbucket", "testpath", 1, &pb.RedundancyScheme{}, 123, time.Now().Add(time.Hour)) assertUnauthenticated(t, err, test.CreateSegmentAllowed) - _, err = client.CommitSegment(ctx, "testbucket", "testpath", 0, &pb.Pointer{}, nil) + _, err = client.CommitSegmentOld(ctx, "testbucket", "testpath", 0, &pb.Pointer{}, nil) assertUnauthenticated(t, err, test.CommitSegmentAllowed) - _, err = client.SegmentInfo(ctx, "testbucket", "testpath", 0) + _, err = client.SegmentInfoOld(ctx, "testbucket", "testpath", 0) assertUnauthenticated(t, err, test.SegmentInfoAllowed) - _, _, _, err = client.ReadSegment(ctx, "testbucket", "testpath", 0) + _, _, _, err = client.ReadSegmentOld(ctx, "testbucket", "testpath", 0) assertUnauthenticated(t, err, test.ReadSegmentAllowed) - _, _, err = client.DeleteSegment(ctx, "testbucket", "testpath", 0) + _, _, err = client.DeleteSegmentOld(ctx, "testbucket", "testpath", 0) assertUnauthenticated(t, err, test.DeleteSegmentAllowed) - _, _, err = client.ListSegments(ctx, "testbucket", "testpath", "", "", true, 1, 0) + _, _, err = client.ListSegmentsOld(ctx, "testbucket", "testpath", "", "", true, 1, 0) assertUnauthenticated(t, err, test.ListSegmentsAllowed) - _, _, _, err = client.ReadSegment(ctx, "testbucket", "", -1) + _, _, _, err = client.ReadSegmentOld(ctx, "testbucket", "", -1) assertUnauthenticated(t, err, test.ReadBucketAllowed) } } @@ -291,7 +291,7 @@ func TestCommitSegment(t *testing.T) { { // error if pointer is nil - _, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, nil, []*pb.OrderLimit{}) + _, err = metainfo.CommitSegmentOld(ctx, "bucket", "path", -1, nil, []*pb.OrderLimit{}) require.Error(t, err) } { @@ -304,7 +304,7 @@ func TestCommitSegment(t *testing.T) { ErasureShareSize: 256, } expirationDate := time.Now().Add(time.Hour) - addressedLimits, rootPieceID, _, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, redundancy, 1000, expirationDate) + addressedLimits, rootPieceID, _, err := metainfo.CreateSegmentOld(ctx, "bucket", "path", -1, redundancy, 1000, expirationDate) require.NoError(t, err) // create number of pieces below repair threshold @@ -348,7 +348,7 @@ func TestCommitSegment(t *testing.T) { for i, addressedLimit := range addressedLimits { limits[i] = addressedLimit.Limit } - _, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, pointer, limits) + _, err = metainfo.CommitSegmentOld(ctx, "bucket", "path", -1, pointer, limits) require.Error(t, err) require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument)) require.Contains(t, err.Error(), "is less than or equal to the repair threshold") @@ -364,7 +364,7 @@ func TestCommitSegment(t *testing.T) { ErasureShareSize: 256, } expirationDate := time.Now().Add(time.Hour) - addressedLimits, rootPieceID, _, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, redundancy, 1000, expirationDate) + addressedLimits, rootPieceID, _, err := metainfo.CreateSegmentOld(ctx, "bucket", "path", -1, redundancy, 1000, expirationDate) require.NoError(t, err) // create number of pieces below success threshold @@ -408,7 +408,7 @@ func TestCommitSegment(t *testing.T) { for i, addressedLimit := range addressedLimits { limits[i] = addressedLimit.Limit } - _, err = metainfo.CommitSegment(ctx, "bucket", "path", -1, pointer, limits) + _, err = metainfo.CommitSegmentOld(ctx, "bucket", "path", -1, pointer, limits) require.Error(t, err) require.Contains(t, err.Error(), "is less than the success threshold") } @@ -505,7 +505,7 @@ func TestCreateSegment(t *testing.T) { fail: false, }, } { - _, _, _, err := metainfo.CreateSegment(ctx, "bucket", "path", -1, r.rs, 1000, time.Now().Add(time.Hour)) + _, _, _, err := metainfo.CreateSegmentOld(ctx, "bucket", "path", -1, r.rs, 1000, time.Now().Add(time.Hour)) if r.fail { require.Error(t, err) } else { @@ -555,7 +555,7 @@ func TestExpirationTimeSegment(t *testing.T) { }, } { - _, _, _, err := metainfo.CreateSegment(ctx, "my-bucket-name", "file/path", -1, rs, memory.MiB.Int64(), r.expirationDate) + _, _, _, err := metainfo.CreateSegmentOld(ctx, "my-bucket-name", "file/path", -1, rs, memory.MiB.Int64(), r.expirationDate) if err != nil { assert.True(t, r.errFlag) } else { @@ -587,7 +587,7 @@ func TestMaxCommitInterval(t *testing.T) { pointer, limits := runCreateSegment(ctx, t, metainfo, fullIDMap) - _, err = metainfo.CommitSegment(ctx, "my-bucket-name", "file/path", -1, pointer, limits) + _, err = metainfo.CommitSegmentOld(ctx, "my-bucket-name", "file/path", -1, pointer, limits) require.Error(t, err) require.Contains(t, err.Error(), "not committed before max commit interval") }) @@ -610,11 +610,11 @@ func TestDoubleCommitSegment(t *testing.T) { pointer, limits := runCreateSegment(ctx, t, metainfo, fullIDMap) - savedPointer, err := metainfo.CommitSegment(ctx, "my-bucket-name", "file/path", -1, pointer, limits) + savedPointer, err := metainfo.CommitSegmentOld(ctx, "my-bucket-name", "file/path", -1, pointer, limits) require.NoError(t, err) require.True(t, savedPointer.PieceHashesVerified) - _, err = metainfo.CommitSegment(ctx, "my-bucket-name", "file/path", -1, pointer, limits) + _, err = metainfo.CommitSegmentOld(ctx, "my-bucket-name", "file/path", -1, pointer, limits) require.Error(t, err) require.Contains(t, err.Error(), "missing create request or request expired") }) @@ -762,7 +762,7 @@ func TestCommitSegmentPointer(t *testing.T) { pointer, limits := runCreateSegment(ctx, t, metainfo, fullIDMap) test.Modify(pointer, fullIDMap) - _, err = metainfo.CommitSegment(ctx, "my-bucket-name", "file/path", -1, pointer, limits) + _, err = metainfo.CommitSegmentOld(ctx, "my-bucket-name", "file/path", -1, pointer, limits) require.Error(t, err, "Case #%v", i) require.Contains(t, err.Error(), test.ErrorMessage, "Case #%v", i) } @@ -855,7 +855,7 @@ func TestGetProjectInfo(t *testing.T) { func runCreateSegment(ctx context.Context, t *testing.T, metainfo *metainfo.Client, fullIDMap map[storj.NodeID]*identity.FullIdentity) (*pb.Pointer, []*pb.OrderLimit) { pointer := createTestPointer(t) - addressedLimits, rootPieceID, _, err := metainfo.CreateSegment(ctx, "my-bucket-name", "file/path", -1, pointer.Remote.Redundancy, memory.MiB.Int64(), pointer.ExpirationDate) + addressedLimits, rootPieceID, _, err := metainfo.CreateSegmentOld(ctx, "my-bucket-name", "file/path", -1, pointer.Remote.Redundancy, memory.MiB.Int64(), pointer.ExpirationDate) require.NoError(t, err) pointer.Remote.RootPieceId = rootPieceID @@ -959,7 +959,7 @@ func TestBucketNameValidation(t *testing.T) { "testbucket-63-0123456789012345678901234567890123456789012345abc", } for _, name := range validNames { - _, _, _, err = metainfo.CreateSegment(ctx, name, "", -1, rs, 1, time.Now().Add(time.Hour)) + _, _, _, err = metainfo.CreateSegmentOld(ctx, name, "", -1, rs, 1, time.Now().Add(time.Hour)) require.NoError(t, err, "bucket name: %v", name) } @@ -973,7 +973,7 @@ func TestBucketNameValidation(t *testing.T) { "testbucket-64-0123456789012345678901234567890123456789012345abcd", } for _, name := range invalidNames { - _, _, _, err = metainfo.CreateSegment(ctx, name, "", -1, rs, 1, time.Now().Add(time.Hour)) + _, _, _, err = metainfo.CreateSegmentOld(ctx, name, "", -1, rs, 1, time.Now().Add(time.Hour)) require.Error(t, err, "bucket name: %v", name) } }) @@ -1104,7 +1104,7 @@ func TestBeginCommitListSegment(t *testing.T) { Hash: signedHash, } } - err = metainfoClient.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{ + err = metainfoClient.CommitSegment(ctx, metainfo.CommitSegmentParams{ SegmentID: segmentID, SizeEncryptedData: memory.MiB.Int64(), @@ -1136,7 +1136,7 @@ func TestBeginCommitListSegment(t *testing.T) { }) require.NoError(t, err) - segments, _, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + segments, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: object.StreamID, }) require.NoError(t, err) @@ -1189,7 +1189,7 @@ func TestListSegment(t *testing.T) { {Index: 11, Result: 5, Limit: 5, More: false}, {Index: 15, Result: 1, More: false}, } { - segments, more, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + segments, more, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: object.StreamID, Limit: test.Limit, CursorPosition: storj.SegmentPosition{ @@ -1307,7 +1307,7 @@ func TestInlineSegment(t *testing.T) { {Index: 0, Result: len(segments), More: false, Limit: len(segments)}, {Index: 0, Result: len(segments) - 1, More: true, Limit: len(segments) - 1}, } { - items, more, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + items, more, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: object.StreamID, CursorPosition: storj.SegmentPosition{ Index: test.Index, @@ -1321,7 +1321,7 @@ func TestInlineSegment(t *testing.T) { } { // test download inline segments - items, _, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + items, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: object.StreamID, }) require.NoError(t, err) @@ -1347,7 +1347,7 @@ func TestInlineSegment(t *testing.T) { }) require.NoError(t, err) - items, _, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + items, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: streamID, }) require.NoError(t, err) @@ -1367,7 +1367,7 @@ func TestInlineSegment(t *testing.T) { require.NoError(t, err) } - _, _, err = metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + _, _, err = metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: streamID, }) require.Error(t, err) @@ -1413,7 +1413,7 @@ func TestRemoteSegment(t *testing.T) { }) require.NoError(t, err) - segments, _, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + segments, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: object.StreamID, }) require.NoError(t, err) @@ -1441,7 +1441,7 @@ func TestRemoteSegment(t *testing.T) { }) require.NoError(t, err) - segments, _, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + segments, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: streamID, }) require.NoError(t, err) @@ -1494,7 +1494,7 @@ func TestIDs(t *testing.T) { require.Error(t, err) // invalid streamID segmentID := testrand.SegmentID(512) - err = metainfoClient.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{ + err = metainfoClient.CommitSegment(ctx, metainfo.CommitSegmentParams{ SegmentID: segmentID, }) require.Error(t, err) // invalid segmentID @@ -1532,7 +1532,7 @@ func TestIDs(t *testing.T) { segmentID, err := storj.SegmentIDFromBytes(encodedSegmentID) require.NoError(t, err) - err = metainfoClient.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{ + err = metainfoClient.CommitSegment(ctx, metainfo.CommitSegmentParams{ SegmentID: segmentID, }) require.Error(t, err) diff --git a/uplink/metainfo/client.go b/uplink/metainfo/client.go index 2c07d1d53..a15fdd557 100644 --- a/uplink/metainfo/client.go +++ b/uplink/metainfo/client.go @@ -18,7 +18,6 @@ import ( "storj.io/storj/pkg/rpc" "storj.io/storj/pkg/rpc/rpcstatus" "storj.io/storj/pkg/storj" - "storj.io/storj/storage" ) var ( @@ -78,167 +77,6 @@ func (client *Client) header() *pb.RequestHeader { } } -// CreateSegment requests the order limits for creating a new segment -func (client *Client) CreateSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) (limits []*pb.AddressedOrderLimit, rootPieceID storj.PieceID, piecePrivateKey storj.PiecePrivateKey, err error) { - defer mon.Task()(&ctx)(&err) - - response, err := client.client.CreateSegmentOld(ctx, &pb.SegmentWriteRequestOld{ - Header: client.header(), - Bucket: []byte(bucket), - Path: []byte(path), - Segment: segmentIndex, - Redundancy: redundancy, - MaxEncryptedSegmentSize: maxEncryptedSegmentSize, - Expiration: expiration, - }) - if err != nil { - return nil, rootPieceID, piecePrivateKey, Error.Wrap(err) - } - - return response.GetAddressedLimits(), response.RootPieceId, response.PrivateKey, nil -} - -// CommitSegment requests to store the pointer for the segment -func (client *Client) CommitSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (savedPointer *pb.Pointer, err error) { - defer mon.Task()(&ctx)(&err) - - response, err := client.client.CommitSegmentOld(ctx, &pb.SegmentCommitRequestOld{ - Header: client.header(), - Bucket: []byte(bucket), - Path: []byte(path), - Segment: segmentIndex, - Pointer: pointer, - OriginalLimits: originalLimits, - }) - if err != nil { - return nil, Error.Wrap(err) - } - - return response.GetPointer(), nil -} - -// SegmentInfo requests the pointer of a segment -func (client *Client) SegmentInfo(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, err error) { - defer mon.Task()(&ctx)(&err) - - response, err := client.client.SegmentInfoOld(ctx, &pb.SegmentInfoRequestOld{ - Header: client.header(), - Bucket: []byte(bucket), - Path: []byte(path), - Segment: segmentIndex, - }) - if err != nil { - if errs2.IsRPC(err, rpcstatus.NotFound) { - return nil, storage.ErrKeyNotFound.Wrap(err) - } - return nil, Error.Wrap(err) - } - - return response.GetPointer(), nil -} - -// ReadSegment requests the order limits for reading a segment -func (client *Client) ReadSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, err error) { - defer mon.Task()(&ctx)(&err) - - response, err := client.client.DownloadSegmentOld(ctx, &pb.SegmentDownloadRequestOld{ - Header: client.header(), - Bucket: []byte(bucket), - Path: []byte(path), - Segment: segmentIndex, - }) - if err != nil { - if errs2.IsRPC(err, rpcstatus.NotFound) { - return nil, nil, piecePrivateKey, storage.ErrKeyNotFound.Wrap(err) - } - return nil, nil, piecePrivateKey, Error.Wrap(err) - } - - return response.GetPointer(), sortLimits(response.GetAddressedLimits(), response.GetPointer()), response.PrivateKey, nil -} - -// sortLimits sorts order limits and fill missing ones with nil values -func sortLimits(limits []*pb.AddressedOrderLimit, pointer *pb.Pointer) []*pb.AddressedOrderLimit { - sorted := make([]*pb.AddressedOrderLimit, pointer.GetRemote().GetRedundancy().GetTotal()) - for _, piece := range pointer.GetRemote().GetRemotePieces() { - sorted[piece.GetPieceNum()] = getLimitByStorageNodeID(limits, piece.NodeId) - } - return sorted -} - -func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID storj.NodeID) *pb.AddressedOrderLimit { - for _, limit := range limits { - if limit.GetLimit().StorageNodeId == storageNodeID { - return limit - } - } - return nil -} - -// DeleteSegment requests the order limits for deleting a segment -func (client *Client) DeleteSegment(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, err error) { - defer mon.Task()(&ctx)(&err) - - response, err := client.client.DeleteSegmentOld(ctx, &pb.SegmentDeleteRequestOld{ - Header: client.header(), - Bucket: []byte(bucket), - Path: []byte(path), - Segment: segmentIndex, - }) - if err != nil { - if errs2.IsRPC(err, rpcstatus.NotFound) { - return nil, piecePrivateKey, storage.ErrKeyNotFound.Wrap(err) - } - return nil, piecePrivateKey, Error.Wrap(err) - } - - return response.GetAddressedLimits(), response.PrivateKey, nil -} - -// ListSegments lists the available segments -func (client *Client) ListSegments(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) { - defer mon.Task()(&ctx)(&err) - - response, err := client.client.ListSegmentsOld(ctx, &pb.ListSegmentsRequestOld{ - Header: client.header(), - Bucket: []byte(bucket), - Prefix: []byte(prefix), - StartAfter: []byte(startAfter), - EndBefore: []byte(endBefore), - Recursive: recursive, - Limit: limit, - MetaFlags: metaFlags, - }) - if err != nil { - return nil, false, Error.Wrap(err) - } - - list := response.GetItems() - items = make([]ListItem, len(list)) - for i, item := range list { - items[i] = ListItem{ - Path: storj.Path(item.GetPath()), - Pointer: item.GetPointer(), - IsPrefix: item.IsPrefix, - } - } - - return items, response.GetMore(), nil -} - -// SetAttribution tries to set the attribution information on the bucket. -func (client *Client) SetAttribution(ctx context.Context, bucket string, partnerID uuid.UUID) (err error) { - defer mon.Task()(&ctx)(&err) - - _, err = client.client.SetAttributionOld(ctx, &pb.SetAttributionRequestOld{ - Header: client.header(), - PartnerId: partnerID[:], // TODO: implement storj.UUID that can be sent using pb - BucketName: []byte(bucket), - }) - - return Error.Wrap(err) -} - // GetProjectInfo gets the ProjectInfo for the api key associated with the metainfo client. func (client *Client) GetProjectInfo(ctx context.Context) (resp *pb.ProjectInfoResponse, err error) { defer mon.Task()(&ctx)(&err) @@ -969,8 +807,8 @@ func (params *CommitSegmentParams) BatchItem() *pb.BatchRequestItem { } } -// CommitSegmentNew commits segment after upload -func (client *Client) CommitSegmentNew(ctx context.Context, params CommitSegmentParams) (err error) { +// CommitSegment commits segment after upload +func (client *Client) CommitSegment(ctx context.Context, params CommitSegmentParams) (err error) { defer mon.Task()(&ctx)(&err) _, err = client.client.CommitSegment(ctx, params.toRequest(client.header())) @@ -1229,8 +1067,8 @@ func newListSegmentsResponse(response *pb.SegmentListResponse) ListSegmentsRespo } } -// ListSegmentsNew lists object segments -func (client *Client) ListSegmentsNew(ctx context.Context, params ListSegmentsParams) (_ []storj.SegmentListItem, more bool, err error) { +// ListSegments lists object segments +func (client *Client) ListSegments(ctx context.Context, params ListSegmentsParams) (_ []storj.SegmentListItem, more bool, err error) { defer mon.Task()(&ctx)(&err) response, err := client.client.ListSegments(ctx, params.toRequest(client.header())) diff --git a/uplink/metainfo/client_old.go b/uplink/metainfo/client_old.go new file mode 100644 index 000000000..bd071f6ad --- /dev/null +++ b/uplink/metainfo/client_old.go @@ -0,0 +1,178 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package metainfo + +import ( + "context" + "time" + + "github.com/skyrings/skyring-common/tools/uuid" + + "storj.io/storj/internal/errs2" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/rpc/rpcstatus" + "storj.io/storj/pkg/storj" + "storj.io/storj/storage" +) + +// CreateSegmentOld requests the order limits for creating a new segment +func (client *Client) CreateSegmentOld(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, redundancy *pb.RedundancyScheme, maxEncryptedSegmentSize int64, expiration time.Time) (limits []*pb.AddressedOrderLimit, rootPieceID storj.PieceID, piecePrivateKey storj.PiecePrivateKey, err error) { + defer mon.Task()(&ctx)(&err) + + response, err := client.client.CreateSegmentOld(ctx, &pb.SegmentWriteRequestOld{ + Header: client.header(), + Bucket: []byte(bucket), + Path: []byte(path), + Segment: segmentIndex, + Redundancy: redundancy, + MaxEncryptedSegmentSize: maxEncryptedSegmentSize, + Expiration: expiration, + }) + if err != nil { + return nil, rootPieceID, piecePrivateKey, Error.Wrap(err) + } + + return response.GetAddressedLimits(), response.RootPieceId, response.PrivateKey, nil +} + +// CommitSegmentOld requests to store the pointer for the segment +func (client *Client) CommitSegmentOld(ctx context.Context, bucket string, path storj.Path, segmentIndex int64, pointer *pb.Pointer, originalLimits []*pb.OrderLimit) (savedPointer *pb.Pointer, err error) { + defer mon.Task()(&ctx)(&err) + + response, err := client.client.CommitSegmentOld(ctx, &pb.SegmentCommitRequestOld{ + Header: client.header(), + Bucket: []byte(bucket), + Path: []byte(path), + Segment: segmentIndex, + Pointer: pointer, + OriginalLimits: originalLimits, + }) + if err != nil { + return nil, Error.Wrap(err) + } + + return response.GetPointer(), nil +} + +// SegmentInfoOld requests the pointer of a segment +func (client *Client) SegmentInfoOld(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, err error) { + defer mon.Task()(&ctx)(&err) + + response, err := client.client.SegmentInfoOld(ctx, &pb.SegmentInfoRequestOld{ + Header: client.header(), + Bucket: []byte(bucket), + Path: []byte(path), + Segment: segmentIndex, + }) + if err != nil { + if errs2.IsRPC(err, rpcstatus.NotFound) { + return nil, storage.ErrKeyNotFound.Wrap(err) + } + return nil, Error.Wrap(err) + } + + return response.GetPointer(), nil +} + +// ReadSegmentOld requests the order limits for reading a segment +func (client *Client) ReadSegmentOld(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (pointer *pb.Pointer, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, err error) { + defer mon.Task()(&ctx)(&err) + + response, err := client.client.DownloadSegmentOld(ctx, &pb.SegmentDownloadRequestOld{ + Header: client.header(), + Bucket: []byte(bucket), + Path: []byte(path), + Segment: segmentIndex, + }) + if err != nil { + if errs2.IsRPC(err, rpcstatus.NotFound) { + return nil, nil, piecePrivateKey, storage.ErrKeyNotFound.Wrap(err) + } + return nil, nil, piecePrivateKey, Error.Wrap(err) + } + + return response.GetPointer(), sortLimits(response.GetAddressedLimits(), response.GetPointer()), response.PrivateKey, nil +} + +// sortLimits sorts order limits and fill missing ones with nil values +func sortLimits(limits []*pb.AddressedOrderLimit, pointer *pb.Pointer) []*pb.AddressedOrderLimit { + sorted := make([]*pb.AddressedOrderLimit, pointer.GetRemote().GetRedundancy().GetTotal()) + for _, piece := range pointer.GetRemote().GetRemotePieces() { + sorted[piece.GetPieceNum()] = getLimitByStorageNodeID(limits, piece.NodeId) + } + return sorted +} + +func getLimitByStorageNodeID(limits []*pb.AddressedOrderLimit, storageNodeID storj.NodeID) *pb.AddressedOrderLimit { + for _, limit := range limits { + if limit.GetLimit().StorageNodeId == storageNodeID { + return limit + } + } + return nil +} + +// DeleteSegmentOld requests the order limits for deleting a segment +func (client *Client) DeleteSegmentOld(ctx context.Context, bucket string, path storj.Path, segmentIndex int64) (limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, err error) { + defer mon.Task()(&ctx)(&err) + + response, err := client.client.DeleteSegmentOld(ctx, &pb.SegmentDeleteRequestOld{ + Header: client.header(), + Bucket: []byte(bucket), + Path: []byte(path), + Segment: segmentIndex, + }) + if err != nil { + if errs2.IsRPC(err, rpcstatus.NotFound) { + return nil, piecePrivateKey, storage.ErrKeyNotFound.Wrap(err) + } + return nil, piecePrivateKey, Error.Wrap(err) + } + + return response.GetAddressedLimits(), response.PrivateKey, nil +} + +// ListSegmentsOld lists the available segments +func (client *Client) ListSegmentsOld(ctx context.Context, bucket string, prefix, startAfter, endBefore storj.Path, recursive bool, limit int32, metaFlags uint32) (items []ListItem, more bool, err error) { + defer mon.Task()(&ctx)(&err) + + response, err := client.client.ListSegmentsOld(ctx, &pb.ListSegmentsRequestOld{ + Header: client.header(), + Bucket: []byte(bucket), + Prefix: []byte(prefix), + StartAfter: []byte(startAfter), + EndBefore: []byte(endBefore), + Recursive: recursive, + Limit: limit, + MetaFlags: metaFlags, + }) + if err != nil { + return nil, false, Error.Wrap(err) + } + + list := response.GetItems() + items = make([]ListItem, len(list)) + for i, item := range list { + items[i] = ListItem{ + Path: storj.Path(item.GetPath()), + Pointer: item.GetPointer(), + IsPrefix: item.IsPrefix, + } + } + + return items, response.GetMore(), nil +} + +// SetAttributionOld tries to set the attribution information on the bucket. +func (client *Client) SetAttributionOld(ctx context.Context, bucket string, partnerID uuid.UUID) (err error) { + defer mon.Task()(&ctx)(&err) + + _, err = client.client.SetAttributionOld(ctx, &pb.SetAttributionRequestOld{ + Header: client.header(), + PartnerId: partnerID[:], // TODO: implement storj.UUID that can be sent using pb + BucketName: []byte(bucket), + }) + + return Error.Wrap(err) +} diff --git a/uplink/metainfo/kvmetainfo/stream.go b/uplink/metainfo/kvmetainfo/stream.go index aab707114..8d98e809d 100644 --- a/uplink/metainfo/kvmetainfo/stream.go +++ b/uplink/metainfo/kvmetainfo/stream.go @@ -75,7 +75,7 @@ func (stream *readonlyStream) segment(ctx context.Context, index int64) (segment index = -1 } - pointer, err := stream.db.metainfo.SegmentInfo(ctx, stream.bucket, stream.encPath, index) + pointer, err := stream.db.metainfo.SegmentInfoOld(ctx, stream.bucket, stream.encPath, index) if err != nil { return segment, err } diff --git a/uplink/storage/segments/store.go b/uplink/storage/segments/store.go index 9e9a6e19e..449af5e55 100644 --- a/uplink/storage/segments/store.go +++ b/uplink/storage/segments/store.go @@ -138,7 +138,7 @@ func (s *segmentStore) Put(ctx context.Context, streamID storj.StreamID, data io return Meta{}, Error.New("uploaded results (%d) are below the optimal threshold (%d)", l, s.rs.OptimalThreshold()) } - err = s.metainfo.CommitSegmentNew(ctx, metainfo.CommitSegmentParams{ + err = s.metainfo.CommitSegment(ctx, metainfo.CommitSegmentParams{ SegmentID: segmentID, SizeEncryptedData: sizedReader.Size(), Encryption: encryption, diff --git a/uplink/storage/streams/store.go b/uplink/storage/streams/store.go index 26806096a..d59a40709 100644 --- a/uplink/storage/streams/store.go +++ b/uplink/storage/streams/store.go @@ -423,7 +423,7 @@ func (s *streamStore) Delete(ctx context.Context, path Path, pathCipher storj.Ci } // TODO handle `more` - items, _, err := s.metainfo.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + items, _, err := s.metainfo.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: streamID, CursorPosition: storj.SegmentPosition{ Index: 0, diff --git a/uplink/storage/streams/store_test.go b/uplink/storage/streams/store_test.go index d2731b8f2..78d9f3e75 100644 --- a/uplink/storage/streams/store_test.go +++ b/uplink/storage/streams/store_test.go @@ -142,7 +142,7 @@ func TestStreamsInterruptedDelete(t *testing.T) { }) require.NoError(t, err) - segmentItems, _, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + segmentItems, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ StreamID: streamID, CursorPosition: storj.SegmentPosition{ Index: 0,