satellite/metainfo: adjust ListObjects to use pending_objects table

Adjust metainfo.ListObjects method to use IteratePendingObjects to
support new pending_objects table. New method will be used only when
we are listing pending objects.

Because until objects table will be free from pending objects we can
have results in both tables we are merging listing results. This also
means that in some (rare?) cases we may return more results than
specified listing limit. This situation is temporary.

Part of https://github.com/storj/storj/issues/6047

Change-Id: I06389145e5d916c532dfdbd3dcc9ef68ef70e515
This commit is contained in:
Michal Niewrzal 2023-08-07 12:52:13 +02:00 committed by Storj Robot
parent 273ebd61d7
commit c010e37374
2 changed files with 272 additions and 16 deletions

View File

@ -844,12 +844,11 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
status = metabase.ObjectStatus(req.Status)
}
cursor := metabase.IterateCursor{
Key: metabase.ObjectKey(req.EncryptedCursor),
// TODO: set to a the version from the protobuf request when it supports this
}
if len(cursor.Key) != 0 {
cursor.Key = prefix + cursor.Key
cursorKey := metabase.ObjectKey(req.EncryptedCursor)
cursorVersion := metabase.Version(0)
cursorStreamID := uuid.UUID{}
if len(cursorKey) != 0 {
cursorKey = prefix + cursorKey
// TODO this is a workaround to avoid duplicates while listing objects by libuplink.
// because version is not part of cursor yet and we can have object with version higher
@ -858,7 +857,9 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
// fix as we still want to avoid this problem for older libuplink versions.
//
// it should be set in case of pending and committed objects
cursor.Version = metabase.MaxVersion
cursorVersion = metabase.MaxVersion
// for the same reasons as above we need to set maximum UUID as a cursor stream id
cursorStreamID = uuid.Max()
}
includeCustomMetadata := true
@ -875,10 +876,13 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
if endpoint.config.TestListingQuery {
result, err := endpoint.metabase.ListObjects(ctx,
metabase.ListObjects{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
Prefix: prefix,
Cursor: metabase.ListObjectsCursor(cursor),
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
Prefix: prefix,
Cursor: metabase.ListObjectsCursor{
Key: cursorKey,
Version: cursorVersion,
},
Recursive: req.Recursive,
Limit: limit,
Status: status,
@ -898,12 +902,46 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
}
resp.More = result.More
} else {
if status == metabase.Pending && endpoint.config.UsePendingObjectsTable {
err = endpoint.metabase.IteratePendingObjects(ctx, metabase.IteratePendingObjects{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
Prefix: prefix,
Cursor: metabase.PendingObjectsCursor{
Key: cursorKey,
StreamID: cursorStreamID,
},
Recursive: req.Recursive,
BatchSize: limit + 1,
IncludeCustomMetadata: includeCustomMetadata,
IncludeSystemMetadata: includeSystemMetadata,
}, func(ctx context.Context, it metabase.PendingObjectsIterator) error {
entry := metabase.PendingObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) {
item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement)
if err != nil {
return err
}
resp.Items = append(resp.Items, item)
}
resp.More = it.Next(ctx, &entry)
return nil
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
}
// we always need results from both tables for now
err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx,
metabase.IterateObjectsWithStatus{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
Prefix: prefix,
Cursor: cursor,
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
Prefix: prefix,
Cursor: metabase.IterateCursor{
Key: cursorKey,
Version: cursorVersion,
},
Recursive: req.Recursive,
BatchSize: limit + 1,
Status: status,
@ -918,7 +956,9 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
}
resp.Items = append(resp.Items, item)
}
resp.More = it.Next(ctx, &entry)
// we need to take into account also potential results from IteratePendingObjects
resp.More = resp.More || it.Next(ctx, &entry)
return nil
},
)
@ -1461,6 +1501,87 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket
return item, nil
}
func (endpoint *Endpoint) pendingObjectEntryToProtoListItem(ctx context.Context, bucket []byte,
entry metabase.PendingObjectEntry, prefixToPrependInSatStreamID metabase.ObjectKey,
includeSystem, includeMetadata bool, placement storj.PlacementConstraint) (item *pb.ObjectListItem, err error) {
item = &pb.ObjectListItem{
EncryptedObjectKey: []byte(entry.ObjectKey),
Status: pb.Object_UPLOADING,
}
expiresAt := time.Time{}
if entry.ExpiresAt != nil {
expiresAt = *entry.ExpiresAt
}
if includeSystem {
item.ExpiresAt = expiresAt
item.CreatedAt = entry.CreatedAt
}
if includeMetadata {
var nonce storj.Nonce
if len(entry.EncryptedMetadataNonce) > 0 {
nonce, err = storj.NonceFromBytes(entry.EncryptedMetadataNonce)
if err != nil {
return nil, err
}
}
streamMeta := &pb.StreamMeta{}
err = pb.Unmarshal(entry.EncryptedMetadata, streamMeta)
if err != nil {
return nil, err
}
if entry.Encryption != (storj.EncryptionParameters{}) {
streamMeta.EncryptionType = int32(entry.Encryption.CipherSuite)
streamMeta.EncryptionBlockSize = entry.Encryption.BlockSize
}
if entry.EncryptedMetadataEncryptedKey != nil {
streamMeta.LastSegmentMeta = &pb.SegmentMeta{
EncryptedKey: entry.EncryptedMetadataEncryptedKey,
KeyNonce: entry.EncryptedMetadataNonce,
}
}
metadataBytes, err := pb.Marshal(streamMeta)
if err != nil {
return nil, err
}
item.EncryptedMetadata = metadataBytes
item.EncryptedMetadataNonce = nonce
item.EncryptedMetadataEncryptedKey = entry.EncryptedMetadataEncryptedKey
}
// Add Stream ID to list items if listing is for pending objects.
// The client requires the Stream ID to use in the MultipartInfo.
satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
Bucket: bucket,
EncryptedObjectKey: append([]byte(prefixToPrependInSatStreamID), []byte(entry.ObjectKey)...),
Version: 1,
CreationDate: entry.CreatedAt,
ExpirationDate: expiresAt,
StreamId: entry.StreamID[:],
MultipartObject: true,
EncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(entry.Encryption.CipherSuite),
BlockSize: int64(entry.Encryption.BlockSize),
},
Placement: int32(placement),
UsePendingObjectsTable: true,
})
if err != nil {
return nil, err
}
item.StreamId = &satStreamID
return item, nil
}
// DeleteCommittedObject deletes all the pieces of the storage nodes that belongs
// to the specified object.
//

View File

@ -38,6 +38,7 @@ import (
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
@ -660,6 +661,7 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
iterator := project.ListUploads(ctx, bucketName, &tt.options)
require.True(t, iterator.Next())
require.Equal(t, uploadInfo.UploadID, iterator.Item().UploadID)
require.NoError(t, iterator.Err())
err = project.AbortUpload(ctx, bucketName, "multipart-object", iterator.Item().UploadID)
require.NoError(t, err)
@ -670,6 +672,139 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
})
}
func TestEndpoint_Object_No_StorageNodes_UsePendingObjectsTable(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.UsePendingObjectsTable = true
},
),
Uplink: func(log *zap.Logger, index int, config *testplanet.UplinkConfig) {
// we need to not encrypt paths because one of tests is creating object
// manually in DB directly. With path encryption listing would skip such object.
config.DefaultPathCipher = storj.EncNull
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
projectID := planet.Uplinks[0].Projects[0].ID
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
bucketName := "testbucket"
deleteBucket := func() error {
_, err := project.DeleteBucketWithObjects(ctx, bucketName)
return err
}
t.Run("UploadID check", func(t *testing.T) {
defer ctx.Check(deleteBucket)
_, err = project.CreateBucket(ctx, bucketName)
require.NoError(t, err)
for _, tt := range []struct {
expires time.Time
options uplink.ListUploadsOptions
}{
{
options: uplink.ListUploadsOptions{System: false, Custom: false},
},
{
options: uplink.ListUploadsOptions{System: true, Custom: false},
},
{
options: uplink.ListUploadsOptions{System: true, Custom: true},
},
{
options: uplink.ListUploadsOptions{System: false, Custom: true},
},
{
expires: time.Now().Add(24 * time.Hour),
options: uplink.ListUploadsOptions{System: false, Custom: false},
},
{
expires: time.Now().Add(24 * time.Hour),
options: uplink.ListUploadsOptions{System: true, Custom: false},
},
{
expires: time.Now().Add(24 * time.Hour),
options: uplink.ListUploadsOptions{System: true, Custom: true},
},
{
expires: time.Now().Add(24 * time.Hour),
options: uplink.ListUploadsOptions{System: false, Custom: true},
},
} {
t.Run(fmt.Sprintf("expires:%v;system:%v;custom:%v", !tt.expires.IsZero(), tt.options.System, tt.options.Custom), func(t *testing.T) {
objectKey := "multipart-object"
uploadInfo, err := project.BeginUpload(ctx, bucketName, objectKey, &uplink.UploadOptions{
Expires: tt.expires,
})
require.NoError(t, err)
iterator := project.ListUploads(ctx, bucketName, &tt.options)
require.True(t, iterator.Next())
require.Equal(t, uploadInfo.UploadID, iterator.Item().UploadID)
require.NoError(t, iterator.Err())
err = project.AbortUpload(ctx, bucketName, objectKey, iterator.Item().UploadID)
require.NoError(t, err)
})
}
})
t.Run("object in pending_object and object tables", func(t *testing.T) {
defer ctx.Check(deleteBucket)
_, err = project.CreateBucket(ctx, bucketName)
require.NoError(t, err)
// pending object in objects table
_, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: metabase.ObjectKey("objects_table"),
StreamID: testrand.UUID(),
Version: metabase.NextVersion,
},
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: false,
})
require.NoError(t, err)
// pending object in pending_objects table
_, err = project.BeginUpload(ctx, bucketName, "pending_object_table", nil)
require.NoError(t, err)
keys := []string{}
iterator := project.ListUploads(ctx, bucketName, nil)
for iterator.Next() {
keys = append(keys, iterator.Item().Key)
}
require.NoError(t, iterator.Err())
require.ElementsMatch(t, []string{
"objects_table",
"pending_object_table",
}, keys)
iterator = project.ListUploads(ctx, bucketName, nil)
for iterator.Next() {
require.NoError(t, project.AbortUpload(ctx, bucketName, iterator.Item().Key, iterator.Item().UploadID))
}
require.NoError(t, iterator.Err())
iterator = project.ListUploads(ctx, bucketName, nil)
require.False(t, iterator.Next())
require.NoError(t, iterator.Err())
})
})
}
func TestEndpoint_Object_UploadLimit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, UplinkCount: 1,