diff --git a/internal/testplanet/uplink.go b/internal/testplanet/uplink.go index cfed63c64..c53f3e5b0 100644 --- a/internal/testplanet/uplink.go +++ b/internal/testplanet/uplink.go @@ -210,6 +210,26 @@ func (client *Uplink) UploadWithExpirationAndConfig(ctx context.Context, satelli return nil } +// UploadWithClientConfig uploads data to specific satellite with custom client configuration +func (client *Uplink) UploadWithClientConfig(ctx context.Context, satellite *satellite.Peer, clientConfig uplink.Config, bucketName string, path storj.Path, data []byte) (err error) { + project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, clientConfig) + if err != nil { + return err + } + defer func() { err = errs.Combine(err, bucket.Close(), project.Close()) }() + + opts := &libuplink.UploadOptions{} + opts.Volatile.RedundancyScheme = clientConfig.GetRedundancyScheme() + opts.Volatile.EncryptionParameters = clientConfig.GetEncryptionParameters() + + reader := bytes.NewReader(data) + if err := bucket.UploadObject(ctx, path, reader, opts); err != nil { + return err + } + + return nil +} + // Download data from specific satellite func (client *Uplink) Download(ctx context.Context, satellite *satellite.Peer, bucketName string, path storj.Path) ([]byte, error) { project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite)) diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index b4705c0c2..73415d7dd 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1630,27 +1630,92 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListR limit = listLimit } - index := int64(req.CursorPosition.Index) - more := false + path, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + pointer, err := endpoint.metainfo.Get(ctx, path) + if err != nil { + if storage.ErrKeyNotFound.Has(err) { + return nil, status.Error(codes.NotFound, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + + streamMeta := &pb.StreamMeta{} + err = proto.Unmarshal(pointer.Metadata, streamMeta) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + if streamMeta.NumberOfSegments > 0 { + // use unencrypted number of segments + // TODO cleanup int32 vs int64 + return endpoint.listSegmentsFromNumberOfSegment(ctx, int32(streamMeta.NumberOfSegments), req.CursorPosition.Index, limit) + } + + // list segments by requesting each segment from cursor index to n until n segment is not found + return endpoint.listSegmentsManually(ctx, keyInfo.ProjectID, streamID, req.CursorPosition.Index, limit) +} + +func (endpoint *Endpoint) listSegmentsFromNumberOfSegment(ctx context.Context, numberOfSegments, cursorIndex int32, limit int32) (resp *pb.SegmentListResponse, err error) { + numberOfSegments -= cursorIndex + segmentItems := make([]*pb.SegmentListItem, 0) - // TODO think about better implementation + more := false + + if numberOfSegments > 0 { + if numberOfSegments > limit { + more = true + numberOfSegments = limit + } else { + // remove last segment to avoid if statements in loop to detect last segment, + // last segment will be added manually at the end of this block + numberOfSegments-- + } + for index := int32(0); index < numberOfSegments; index++ { + segmentItems = append(segmentItems, &pb.SegmentListItem{ + Position: &pb.SegmentPosition{ + Index: index + cursorIndex, + }, + }) + } + if !more { + // last segment is always the last one + segmentItems = append(segmentItems, &pb.SegmentListItem{ + Position: &pb.SegmentPosition{ + Index: lastSegment, + }, + }) + } + } + + return &pb.SegmentListResponse{ + Items: segmentItems, + More: more, + }, nil +} + +func (endpoint *Endpoint) listSegmentsManually(ctx context.Context, projectID uuid.UUID, streamID *pb.SatStreamID, cursorIndex int32, limit int32) (resp *pb.SegmentListResponse, err error) { + index := int64(cursorIndex) + + segmentItems := make([]*pb.SegmentListItem, 0) + more := false + for { - path, err := CreatePath(ctx, keyInfo.ProjectID, index, streamID.Bucket, streamID.EncryptedPath) + path, err := CreatePath(ctx, projectID, index, streamID.Bucket, streamID.EncryptedPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } _, err = endpoint.metainfo.Get(ctx, path) if err != nil { if storage.ErrKeyNotFound.Has(err) { - if index == lastSegment { - break - } - index = lastSegment - continue + break } return nil, status.Error(codes.Internal, err.Error()) } - if limit == 0 { + if limit == int32(len(segmentItems)) { more = true break } @@ -1660,11 +1725,17 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListR }, }) - if index == lastSegment { - break - } index++ - limit-- + } + + if limit > int32(len(segmentItems)) { + segmentItems = append(segmentItems, &pb.SegmentListItem{ + Position: &pb.SegmentPosition{ + Index: lastSegment, + }, + }) + } else { + more = true } return &pb.SegmentListResponse{ diff --git a/satellite/metainfo/metainfo_test.go b/satellite/metainfo/metainfo_test.go index e01529f94..e244af56a 100644 --- a/satellite/metainfo/metainfo_test.go +++ b/satellite/metainfo/metainfo_test.go @@ -1022,6 +1022,68 @@ func TestBeginCommitListSegment(t *testing.T) { }) } +func TestListSegment(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] + uplink := planet.Uplinks[0] + + data := testrand.Bytes(15 * memory.KiB) + config := uplink.GetConfig(planet.Satellites[0]) + config.Client.SegmentSize = memory.KiB + err := uplink.UploadWithClientConfig(ctx, planet.Satellites[0], config, "testbucket", "test-path", data) + require.NoError(t, err) + + // 15KiB + encryption should be uploaded into 16 segments with SegmentSize == 1KiB + numberOfSegments := 16 + + metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey) + require.NoError(t, err) + defer ctx.Check(metainfoClient.Close) + + items, _, err := metainfoClient.ListObjects(ctx, metainfo.ListObjectsParams{ + Bucket: []byte("testbucket"), + Limit: 1, + }) + require.NoError(t, err) + + object, err := metainfoClient.GetObject(ctx, metainfo.GetObjectParams{ + Bucket: []byte("testbucket"), + EncryptedPath: items[0].EncryptedPath, + }) + require.NoError(t, err) + + for _, test := range []struct { + Index int32 + Limit int32 + Result int + More bool + }{ + {Index: 0, Result: numberOfSegments}, + {Index: 0, Result: numberOfSegments, Limit: int32(numberOfSegments), More: false}, + {Index: 0, Result: 5, Limit: 5, More: true}, + {Index: 16, Result: 0, More: false}, + {Index: 11, Result: 5, Limit: 5, More: false}, + {Index: 15, Result: 1, More: false}, + } { + segments, more, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + StreamID: object.StreamID, + Limit: test.Limit, + CursorPosition: storj.SegmentPosition{ + Index: test.Index, + }, + }) + require.NoError(t, err) + require.Len(t, segments, test.Result) + require.Equal(t, test.More, more) + if !more && test.Result > 0 { + require.Equal(t, int32(-1), segments[test.Result-1].Position.Index) + } + } + }) +} + func TestInlineSegment(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1, @@ -1183,11 +1245,11 @@ func TestInlineSegment(t *testing.T) { require.NoError(t, err) } - items, _, err = metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ + _, _, err = metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{ StreamID: streamID, }) - require.NoError(t, err) - require.Empty(t, items) + require.Error(t, err) + require.True(t, storj.ErrObjectNotFound.Has(err)) err = metainfoClient.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{ StreamID: streamID, diff --git a/uplink/metainfo/client.go b/uplink/metainfo/client.go index 6d580f3bd..facdee74f 100644 --- a/uplink/metainfo/client.go +++ b/uplink/metainfo/client.go @@ -1200,6 +1200,9 @@ func (client *Client) ListSegmentsNew(ctx context.Context, params ListSegmentsPa response, err := client.client.ListSegments(ctx, params.toRequest()) if err != nil { + if status.Code(err) == codes.NotFound { + return []storj.SegmentListItem{}, false, storj.ErrObjectNotFound.Wrap(err) + } return []storj.SegmentListItem{}, false, Error.Wrap(err) }