satellite/metainfo: Improve metainfo ListSegments (#2882)

This commit is contained in:
Michal Niewrzal 2019-08-30 23:30:18 +02:00 committed by GitHub
parent 9a1b9f8431
commit a6721ba92f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 173 additions and 17 deletions

View File

@ -210,6 +210,26 @@ func (client *Uplink) UploadWithExpirationAndConfig(ctx context.Context, satelli
return nil
}
// UploadWithClientConfig uploads data to specific satellite with custom client configuration
func (client *Uplink) UploadWithClientConfig(ctx context.Context, satellite *satellite.Peer, clientConfig uplink.Config, bucketName string, path storj.Path, data []byte) (err error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, clientConfig)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, bucket.Close(), project.Close()) }()
opts := &libuplink.UploadOptions{}
opts.Volatile.RedundancyScheme = clientConfig.GetRedundancyScheme()
opts.Volatile.EncryptionParameters = clientConfig.GetEncryptionParameters()
reader := bytes.NewReader(data)
if err := bucket.UploadObject(ctx, path, reader, opts); err != nil {
return err
}
return nil
}
// Download data from specific satellite
func (client *Uplink) Download(ctx context.Context, satellite *satellite.Peer, bucketName string, path storj.Path) ([]byte, error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))

View File

@ -1630,27 +1630,92 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListR
limit = listLimit
}
index := int64(req.CursorPosition.Index)
more := false
path, err := CreatePath(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
pointer, err := endpoint.metainfo.Get(ctx, path)
if err != nil {
if storage.ErrKeyNotFound.Has(err) {
return nil, status.Error(codes.NotFound, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
streamMeta := &pb.StreamMeta{}
err = proto.Unmarshal(pointer.Metadata, streamMeta)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if streamMeta.NumberOfSegments > 0 {
// use unencrypted number of segments
// TODO cleanup int32 vs int64
return endpoint.listSegmentsFromNumberOfSegment(ctx, int32(streamMeta.NumberOfSegments), req.CursorPosition.Index, limit)
}
// list segments by requesting each segment from cursor index to n until n segment is not found
return endpoint.listSegmentsManually(ctx, keyInfo.ProjectID, streamID, req.CursorPosition.Index, limit)
}
func (endpoint *Endpoint) listSegmentsFromNumberOfSegment(ctx context.Context, numberOfSegments, cursorIndex int32, limit int32) (resp *pb.SegmentListResponse, err error) {
numberOfSegments -= cursorIndex
segmentItems := make([]*pb.SegmentListItem, 0)
// TODO think about better implementation
more := false
if numberOfSegments > 0 {
if numberOfSegments > limit {
more = true
numberOfSegments = limit
} else {
// remove last segment to avoid if statements in loop to detect last segment,
// last segment will be added manually at the end of this block
numberOfSegments--
}
for index := int32(0); index < numberOfSegments; index++ {
segmentItems = append(segmentItems, &pb.SegmentListItem{
Position: &pb.SegmentPosition{
Index: index + cursorIndex,
},
})
}
if !more {
// last segment is always the last one
segmentItems = append(segmentItems, &pb.SegmentListItem{
Position: &pb.SegmentPosition{
Index: lastSegment,
},
})
}
}
return &pb.SegmentListResponse{
Items: segmentItems,
More: more,
}, nil
}
func (endpoint *Endpoint) listSegmentsManually(ctx context.Context, projectID uuid.UUID, streamID *pb.SatStreamID, cursorIndex int32, limit int32) (resp *pb.SegmentListResponse, err error) {
index := int64(cursorIndex)
segmentItems := make([]*pb.SegmentListItem, 0)
more := false
for {
path, err := CreatePath(ctx, keyInfo.ProjectID, index, streamID.Bucket, streamID.EncryptedPath)
path, err := CreatePath(ctx, projectID, index, streamID.Bucket, streamID.EncryptedPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
_, err = endpoint.metainfo.Get(ctx, path)
if err != nil {
if storage.ErrKeyNotFound.Has(err) {
if index == lastSegment {
break
}
index = lastSegment
continue
break
}
return nil, status.Error(codes.Internal, err.Error())
}
if limit == 0 {
if limit == int32(len(segmentItems)) {
more = true
break
}
@ -1660,11 +1725,17 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListR
},
})
if index == lastSegment {
break
}
index++
limit--
}
if limit > int32(len(segmentItems)) {
segmentItems = append(segmentItems, &pb.SegmentListItem{
Position: &pb.SegmentPosition{
Index: lastSegment,
},
})
} else {
more = true
}
return &pb.SegmentListResponse{

View File

@ -1022,6 +1022,68 @@ func TestBeginCommitListSegment(t *testing.T) {
})
}
func TestListSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
uplink := planet.Uplinks[0]
data := testrand.Bytes(15 * memory.KiB)
config := uplink.GetConfig(planet.Satellites[0])
config.Client.SegmentSize = memory.KiB
err := uplink.UploadWithClientConfig(ctx, planet.Satellites[0], config, "testbucket", "test-path", data)
require.NoError(t, err)
// 15KiB + encryption should be uploaded into 16 segments with SegmentSize == 1KiB
numberOfSegments := 16
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfoClient.Close)
items, _, err := metainfoClient.ListObjects(ctx, metainfo.ListObjectsParams{
Bucket: []byte("testbucket"),
Limit: 1,
})
require.NoError(t, err)
object, err := metainfoClient.GetObject(ctx, metainfo.GetObjectParams{
Bucket: []byte("testbucket"),
EncryptedPath: items[0].EncryptedPath,
})
require.NoError(t, err)
for _, test := range []struct {
Index int32
Limit int32
Result int
More bool
}{
{Index: 0, Result: numberOfSegments},
{Index: 0, Result: numberOfSegments, Limit: int32(numberOfSegments), More: false},
{Index: 0, Result: 5, Limit: 5, More: true},
{Index: 16, Result: 0, More: false},
{Index: 11, Result: 5, Limit: 5, More: false},
{Index: 15, Result: 1, More: false},
} {
segments, more, err := metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{
StreamID: object.StreamID,
Limit: test.Limit,
CursorPosition: storj.SegmentPosition{
Index: test.Index,
},
})
require.NoError(t, err)
require.Len(t, segments, test.Result)
require.Equal(t, test.More, more)
if !more && test.Result > 0 {
require.Equal(t, int32(-1), segments[test.Result-1].Position.Index)
}
}
})
}
func TestInlineSegment(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
@ -1183,11 +1245,11 @@ func TestInlineSegment(t *testing.T) {
require.NoError(t, err)
}
items, _, err = metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{
_, _, err = metainfoClient.ListSegmentsNew(ctx, metainfo.ListSegmentsParams{
StreamID: streamID,
})
require.NoError(t, err)
require.Empty(t, items)
require.Error(t, err)
require.True(t, storj.ErrObjectNotFound.Has(err))
err = metainfoClient.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{
StreamID: streamID,

View File

@ -1200,6 +1200,9 @@ func (client *Client) ListSegmentsNew(ctx context.Context, params ListSegmentsPa
response, err := client.client.ListSegments(ctx, params.toRequest())
if err != nil {
if status.Code(err) == codes.NotFound {
return []storj.SegmentListItem{}, false, storj.ErrObjectNotFound.Wrap(err)
}
return []storj.SegmentListItem{}, false, Error.Wrap(err)
}