diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 7f6fe021d..acc8743db 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -41,7 +41,6 @@ import ( const ( satIDExpiration = 48 * time.Hour - listLimit = 1000 deleteObjectPiecesSuccessThreshold = 0.75 ) @@ -1054,36 +1053,8 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB // FinishDeleteObject finishes object deletion. func (endpoint *Endpoint) FinishDeleteObject(ctx context.Context, req *pb.ObjectFinishDeleteRequest) (resp *pb.ObjectFinishDeleteResponse, err error) { - defer mon.Task()(&ctx)(&err) - - streamID := &pb.SatStreamID{} - err = pb.Unmarshal(req.StreamId, streamID) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - - err = signing.VerifyStreamID(ctx, endpoint.satellite, streamID) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) - } - - if streamID.CreationDate.Before(time.Now().Add(-satIDExpiration)) { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "stream ID expired") - } - - _, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{ - Op: macaroon.ActionDelete, - Bucket: streamID.Bucket, - EncryptedPath: streamID.EncryptedPath, - Time: time.Now(), - }) - if err != nil { - return nil, err - } - - // we don't need to do anything for shim implementation - - return &pb.ObjectFinishDeleteResponse{}, nil + // all logic for deleting is now in BeginDeleteObject + return nil, rpcstatus.Error(rpcstatus.Unimplemented, "not implemented") } // GetObjectIPs returns the IP addresses of the nodes holding the pieces for @@ -1491,253 +1462,20 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment // BeginDeleteSegment begins segment deletion process. func (endpoint *Endpoint) BeginDeleteSegment(ctx context.Context, req *pb.SegmentBeginDeleteRequest) (resp *pb.SegmentBeginDeleteResponse, err error) { - defer mon.Task()(&ctx)(&err) - - streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) - } - - keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ - Op: macaroon.ActionDelete, - Bucket: streamID.Bucket, - EncryptedPath: streamID.EncryptedPath, - Time: time.Now(), - }) - if err != nil { - return nil, err - } - - pointer, location, err := endpoint.getPointer(ctx, keyInfo.ProjectID, int64(req.Position.Index), streamID.Bucket, streamID.EncryptedPath) - if err != nil { - return nil, err - } - - var limits []*pb.AddressedOrderLimit - var privateKey storj.PiecePrivateKey - if pointer.Type == pb.Pointer_REMOTE && pointer.Remote != nil { - bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)} - limits, privateKey, err = endpoint.orders.CreateDeleteOrderLimits(ctx, bucket, pointer) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - } - - // moved from FinishDeleteSegment to avoid inconsistency if someone will not - // call FinishDeleteSegment on uplink side - err = endpoint.metainfo.UnsynchronizedDelete(ctx, location.Encode()) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - - segmentID, err := endpoint.packSegmentID(ctx, &pb.SatSegmentID{ - StreamId: streamID, - OriginalOrderLimits: limits, - Index: req.Position.Index, - CreationDate: time.Now(), - }) - - endpoint.log.Info("Segment Delete", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "delete"), zap.String("type", "segment")) - mon.Meter("req_delete_segment").Mark(1) - - return &pb.SegmentBeginDeleteResponse{ - SegmentId: segmentID, - AddressedLimits: limits, - PrivateKey: privateKey, - }, nil + // all logic for deleting is now in BeginDeleteObject + return nil, rpcstatus.Error(rpcstatus.Unimplemented, "not implemented") } // FinishDeleteSegment finishes segment deletion process. func (endpoint *Endpoint) FinishDeleteSegment(ctx context.Context, req *pb.SegmentFinishDeleteRequest) (resp *pb.SegmentFinishDeleteResponse, err error) { - defer mon.Task()(&ctx)(&err) - - segmentID, err := endpoint.unmarshalSatSegmentID(ctx, req.SegmentId) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) - } - - streamID := segmentID.StreamId - - _, err = endpoint.validateAuth(ctx, req.Header, macaroon.Action{ - Op: macaroon.ActionDelete, - Bucket: streamID.Bucket, - EncryptedPath: streamID.EncryptedPath, - Time: time.Now(), - }) - if err != nil { - return nil, err - } - - // at the moment logic is in BeginDeleteSegment - - return &pb.SegmentFinishDeleteResponse{}, nil + // all logic for deleting is now in BeginDeleteObject + return nil, rpcstatus.Error(rpcstatus.Unimplemented, "not implemented") } // ListSegments list object segments. func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListRequest) (resp *pb.SegmentListResponse, err error) { - defer mon.Task()(&ctx)(&err) - - streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) - } - - keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{ - Op: macaroon.ActionList, - Bucket: streamID.Bucket, - EncryptedPath: streamID.EncryptedPath, - Time: time.Now(), - }) - if err != nil { - return nil, err - } - - limit := req.Limit - if limit == 0 || limit > listLimit { - limit = listLimit - } - - pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, metabase.LastSegmentIndex, streamID.Bucket, streamID.EncryptedPath) - if err != nil { - if rpcstatus.Code(err) == rpcstatus.NotFound { - return &pb.SegmentListResponse{}, nil - } - return nil, err - } - - streamMeta := &pb.StreamMeta{} - err = pb.Unmarshal(pointer.Metadata, streamMeta) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - - endpoint.log.Info("Segment List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "segment")) - mon.Meter("req_list_segment").Mark(1) - - if streamMeta.NumberOfSegments > 0 { - // use unencrypted number of segments - // TODO cleanup int32 vs int64 - return endpoint.listSegmentsFromNumberOfSegments(int32(streamMeta.NumberOfSegments), req.CursorPosition.Index, limit) - } - - // list segments by requesting each segment from cursor index to n until n segment is not found - return endpoint.listSegmentsManually(ctx, keyInfo.ProjectID, streamID, req.CursorPosition.Index, limit) -} - -func (endpoint *Endpoint) listSegmentsFromNumberOfSegments(numberOfSegments, cursorIndex, limit int32) (resp *pb.SegmentListResponse, err error) { - if numberOfSegments <= 0 { - endpoint.log.Error( - "Invalid number of segments; this function requires the value to be greater than 0", - zap.Int32("numberOfSegments", numberOfSegments), - ) - return nil, rpcstatus.Error(rpcstatus.Internal, "unable to list segments") - } - - if cursorIndex > numberOfSegments { - endpoint.log.Error( - "Invalid number cursor index; the index cannot be greater than the total number of segments", - zap.Int32("numberOfSegments", numberOfSegments), - zap.Int32("cursorIndex", cursorIndex), - ) - return nil, rpcstatus.Error(rpcstatus.Internal, "unable to list segments") - } - - numberOfSegments -= cursorIndex - - var ( - segmentItems = make([]*pb.SegmentListItem, 0) - more = false - ) - if numberOfSegments > 0 { - segmentItems = make([]*pb.SegmentListItem, 0, int(numberOfSegments)) - - if numberOfSegments > limit { - more = true - numberOfSegments = limit - } else { - // remove last segment to avoid if statements in loop to detect last segment, - // last segment will be added manually at the end of this block - numberOfSegments-- - } - - for index := int32(0); index < numberOfSegments; index++ { - segmentItems = append(segmentItems, &pb.SegmentListItem{ - Position: &pb.SegmentPosition{ - Index: index + cursorIndex, - }, - }) - } - - if !more { - // last segment is always the last one - segmentItems = append(segmentItems, &pb.SegmentListItem{ - Position: &pb.SegmentPosition{ - Index: metabase.LastSegmentIndex, - }, - }) - } - } - - return &pb.SegmentListResponse{ - Items: segmentItems, - More: more, - }, nil -} - -// listSegmentManually lists the segments that belongs to projectID and streamID -// from the cursorIndex up to the limit. It stops before the limit when -// cursorIndex + n returns a not found pointer. -// -// limit must be greater than 0 and cursorIndex greater than or equal than 0, -// otherwise an error is returned. -func (endpoint *Endpoint) listSegmentsManually(ctx context.Context, projectID uuid.UUID, streamID *pb.SatStreamID, cursorIndex, limit int32) (resp *pb.SegmentListResponse, err error) { - if limit <= 0 { - return nil, rpcstatus.Errorf( - rpcstatus.InvalidArgument, "invalid limit, cannot be 0 or negative. Got %d", limit, - ) - } - - index := int64(cursorIndex) - segmentItems := make([]*pb.SegmentListItem, 0) - more := false - - for { - _, _, err := endpoint.getPointer(ctx, projectID, index, streamID.Bucket, streamID.EncryptedPath) - if err != nil { - if rpcstatus.Code(err) != rpcstatus.NotFound { - return nil, err - } - - break - } - - if limit == int32(len(segmentItems)) { - more = true - break - } - segmentItems = append(segmentItems, &pb.SegmentListItem{ - Position: &pb.SegmentPosition{ - Index: int32(index), - }, - }) - - index++ - } - - if limit > int32(len(segmentItems)) { - segmentItems = append(segmentItems, &pb.SegmentListItem{ - Position: &pb.SegmentPosition{ - Index: metabase.LastSegmentIndex, - }, - }) - } else { - more = true - } - - return &pb.SegmentListResponse{ - Items: segmentItems, - More: more, - }, nil + // nothing is using this method + return nil, rpcstatus.Error(rpcstatus.Unimplemented, "not implemented") } // DownloadSegment returns data necessary to download segment. diff --git a/satellite/metainfo/metainfo_test.go b/satellite/metainfo/metainfo_test.go index 91b7d64e6..1c050775e 100644 --- a/satellite/metainfo/metainfo_test.go +++ b/satellite/metainfo/metainfo_test.go @@ -29,6 +29,7 @@ import ( "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" + satMetainfo "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/metainfo/metabase" "storj.io/uplink" "storj.io/uplink/private/metainfo" @@ -177,21 +178,12 @@ func TestRevokeMacaroon(t *testing.T) { err = client.CommitObject(ctx, metainfo.CommitObjectParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) - err = client.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{StreamID: encodedStreamID}) - assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) - _, _, _, err = client.BeginSegment(ctx, metainfo.BeginSegmentParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) - _, _, _, err = client.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{StreamID: encodedStreamID}) - assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) - err = client.MakeInlineSegment(ctx, metainfo.MakeInlineSegmentParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) - _, _, err = client.ListSegments(ctx, metainfo.ListSegmentsParams{StreamID: encodedStreamID}) - assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) - _, _, err = client.DownloadSegment(ctx, metainfo.DownloadSegmentParams{StreamID: encodedStreamID}) assert.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) @@ -276,21 +268,12 @@ func TestInvalidAPIKey(t *testing.T) { err = client.CommitObject(ctx, metainfo.CommitObjectParams{StreamID: streamID}) assertInvalidArgument(t, err, false) - err = client.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{StreamID: streamID}) - assertInvalidArgument(t, err, false) - _, _, _, err = client.BeginSegment(ctx, metainfo.BeginSegmentParams{StreamID: streamID}) assertInvalidArgument(t, err, false) - _, _, _, err = client.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{StreamID: streamID}) - assertInvalidArgument(t, err, false) - err = client.MakeInlineSegment(ctx, metainfo.MakeInlineSegmentParams{StreamID: streamID}) assertInvalidArgument(t, err, false) - _, _, err = client.ListSegments(ctx, metainfo.ListSegmentsParams{StreamID: streamID}) - assertInvalidArgument(t, err, false) - _, _, err = client.DownloadSegment(ctx, metainfo.DownloadSegmentParams{StreamID: streamID}) assertInvalidArgument(t, err, false) @@ -618,7 +601,7 @@ func TestBucketExistenceCheck(t *testing.T) { }) } -func TestBeginCommitListSegment(t *testing.T) { +func TestBeginCommit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { @@ -724,74 +707,13 @@ func TestBeginCommitListSegment(t *testing.T) { }) require.NoError(t, err) - segments, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ - StreamID: object.StreamID, - }) - require.NoError(t, err) - require.Len(t, segments, 1) - }) -} - -func TestListSegments(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.MaxSegmentSize(memory.KiB), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] - uplink := planet.Uplinks[0] - - data := testrand.Bytes(15 * memory.KiB) - err := uplink.Upload(ctx, planet.Satellites[0], "testbucket", "test-path", data) + project := planet.Uplinks[0].Projects[0] + location, err := satMetainfo.CreatePath(ctx, project.ID, metabase.LastSegmentIndex, []byte(object.Bucket), []byte{}) require.NoError(t, err) - // 15KiB + encryption should be uploaded into 16 segments with SegmentSize == 1KiB - numberOfSegments := 16 - - metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) + items, _, err := planet.Satellites[0].Metainfo.Service.List(ctx, location.Encode(), "", false, 1, 0) require.NoError(t, err) - defer ctx.Check(metainfoClient.Close) - - items, _, err := metainfoClient.ListObjects(ctx, metainfo.ListObjectsParams{ - Bucket: []byte("testbucket"), - Limit: 1, - }) - require.NoError(t, err) - - object, err := metainfoClient.GetObject(ctx, metainfo.GetObjectParams{ - Bucket: []byte("testbucket"), - EncryptedPath: items[0].EncryptedPath, - }) - require.NoError(t, err) - - for i, test := range []struct { - Index int32 - Limit int32 - Result int - More bool - }{ - {Index: 0, Result: numberOfSegments}, - {Index: 0, Result: numberOfSegments, Limit: int32(numberOfSegments), More: false}, - {Index: 0, Result: 5, Limit: 5, More: true}, - {Index: 16, Result: 0, More: false}, - {Index: 11, Result: 5, Limit: 5, More: false}, - {Index: 15, Result: 1, More: false}, - } { - segments, more, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ - StreamID: object.StreamID, - Limit: test.Limit, - CursorPosition: storj.SegmentPosition{ - Index: test.Index, - }, - }) - require.NoErrorf(t, err, "test case: %d", i) - require.Lenf(t, segments, test.Result, "test case: %d", i) - require.Equalf(t, test.More, more, "test case: %d", i) - if !more && test.Result > 0 { - require.Equalf(t, int32(-1), segments[test.Result-1].Position.Index, "test case: %d", i) - } - } + require.Len(t, items, 1) }) } @@ -898,44 +820,15 @@ func TestInlineSegment(t *testing.T) { }) require.Error(t, err) } - { // test listing inline segments - for _, test := range []struct { - Index int32 - Limit int - Result int - More bool - }{ - {Index: 0, Result: len(segments), More: false}, - {Index: 2, Result: len(segments) - 2, More: false}, - {Index: 0, Result: 3, More: true, Limit: 3}, - {Index: 0, Result: len(segments), More: false, Limit: len(segments)}, - {Index: 0, Result: len(segments) - 1, More: true, Limit: len(segments) - 1}, - } { - items, more, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ - StreamID: object.StreamID, - CursorPosition: storj.SegmentPosition{ - Index: test.Index, - }, - Limit: int32(test.Limit), - }) - require.NoError(t, err) - require.Equal(t, test.Result, len(items)) - require.Equal(t, test.More, more) - } - } { // test download inline segments - items, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ - StreamID: object.StreamID, - }) - require.NoError(t, err) - require.Equal(t, len(segments), len(items)) + existingSegments := []int32{0, 1, 2, 3, 4, 5, -1} - for i, item := range items { + for i, index := range existingSegments { info, limits, err := metainfoClient.DownloadSegment(ctx, metainfo.DownloadSegmentParams{ StreamID: object.StreamID, Position: storj.SegmentPosition{ - Index: item.Position.Index, + Index: index, }, }) require.NoError(t, err) @@ -983,7 +876,6 @@ func TestRemoteSegment(t *testing.T) { { // Get object - // List segments // Download segment object, err := metainfoClient.GetObject(ctx, metainfo.GetObjectParams{ @@ -992,16 +884,10 @@ func TestRemoteSegment(t *testing.T) { }) require.NoError(t, err) - segments, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ - StreamID: object.StreamID, - }) - require.NoError(t, err) - require.Len(t, segments, 1) - _, limits, err := metainfoClient.DownloadSegment(ctx, metainfo.DownloadSegmentParams{ StreamID: object.StreamID, Position: storj.SegmentPosition{ - Index: segments[0].Position.Index, + Index: metabase.LastSegmentIndex, }, }) require.NoError(t, err) @@ -1185,29 +1071,32 @@ func TestBatch(t *testing.T) { requests = append(requests, &metainfo.CommitObjectParams{ EncryptedMetadata: metadata, }) - requests = append(requests, &metainfo.ListSegmentsParams{}) responses, err := metainfoClient.Batch(ctx, requests...) require.NoError(t, err) - require.Equal(t, numOfSegments+3, len(responses)) - - listResponse, err := responses[numOfSegments+2].ListSegment() - require.NoError(t, err) - require.Equal(t, numOfSegments, len(listResponse.Items)) + require.Equal(t, numOfSegments+2, len(responses)) requests = make([]metainfo.BatchItem, 0) requests = append(requests, &metainfo.GetObjectParams{ Bucket: []byte("second-test-bucket"), EncryptedPath: []byte("encrypted-path"), }) - for _, segment := range listResponse.Items { + + for i := 0; i < numOfSegments-1; i++ { requests = append(requests, &metainfo.DownloadSegmentParams{ - Position: segment.Position, + Position: storj.SegmentPosition{ + Index: int32(i), + }, }) } + requests = append(requests, &metainfo.DownloadSegmentParams{ + Position: storj.SegmentPosition{ + Index: -1, + }, + }) responses, err = metainfoClient.Batch(ctx, requests...) require.NoError(t, err) - require.Equal(t, len(listResponse.Items)+1, len(responses)) + require.Equal(t, numOfSegments+1, len(responses)) for i, response := range responses[1:] { downloadResponse, err := response.DownloadSegment() @@ -1451,13 +1340,12 @@ func TestOverwriteZombieSegments(t *testing.T) { require.NoError(t, err) // delete some segments to leave only zombie segments + project := planet.Uplinks[0].Projects[0] for _, segment := range tc.deletedSegments { - _, _, _, err = metainfoClient.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{ - StreamID: object.StreamID, - Position: storj.SegmentPosition{ - Index: segment, - }, - }) + location, err := satMetainfo.CreatePath(ctx, project.ID, int64(segment), []byte(object.Bucket), items[0].EncryptedPath) + require.NoError(t, err) + + err = planet.Satellites[0].Metainfo.Service.UnsynchronizedDelete(ctx, location.Encode()) require.NoError(t, err) }