satellite/metainfo: adjust ListPendingObjectStreams to pending_objects

table

New method IteratePendingObjectsByKeyNew is used to provide results for
metainfo.ListPendingObjectStreams. This endpoint is used to list
pending objects with the same object key. In this case to support
both tables (objects, pending_objects) we need to do one query per table
and merge results.

Because existing metainfo protobuf API is missing some fields to have
proper listing cursor we are not able to make ListPendingObjectStreams
correct for returning more than single page. We need to fix it
separately.

With this change also turns out that approach to merge results from
listing objects for ListObjects method was wrong and this change is also
fixing this problem.

Handling both tables will be removed at some point and only
pending_objects will be used to look for results.

Part of https://github.com/storj/storj/issues/6047

Change-Id: I8a88a6f885ad529704e6c032f1d97926123c2909
This commit is contained in:
Michal Niewrzal 2023-08-07 14:10:17 +02:00 committed by Michał Niewrzał
parent 6e3da022e0
commit 780c0e0b35
2 changed files with 273 additions and 46 deletions

View File

@ -7,6 +7,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"sort"
"time" "time"
"github.com/jtolio/eventkit" "github.com/jtolio/eventkit"
@ -903,6 +904,13 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
resp.More = result.More resp.More = result.More
} else { } else {
if status == metabase.Pending && endpoint.config.UsePendingObjectsTable { if status == metabase.Pending && endpoint.config.UsePendingObjectsTable {
type ObjectListItem struct {
Item *pb.ObjectListItem
StreamID uuid.UUID
}
pendingObjectsEntries := make([]ObjectListItem, 0, limit)
// TODO when objects table will be free from pending objects only this listing method will remain
err = endpoint.metabase.IteratePendingObjects(ctx, metabase.IteratePendingObjects{ err = endpoint.metabase.IteratePendingObjects(ctx, metabase.IteratePendingObjects{
ProjectID: keyInfo.ProjectID, ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket), BucketName: string(req.Bucket),
@ -917,53 +925,118 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
IncludeSystemMetadata: includeSystemMetadata, IncludeSystemMetadata: includeSystemMetadata,
}, func(ctx context.Context, it metabase.PendingObjectsIterator) error { }, func(ctx context.Context, it metabase.PendingObjectsIterator) error {
entry := metabase.PendingObjectEntry{} entry := metabase.PendingObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) { for it.Next(ctx, &entry) {
item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement)
if err != nil { if err != nil {
return err return err
} }
resp.Items = append(resp.Items, item) pendingObjectsEntries = append(pendingObjectsEntries, ObjectListItem{
Item: item,
StreamID: entry.StreamID,
})
} }
resp.More = it.Next(ctx, &entry)
return nil return nil
}) })
if err != nil { if err != nil {
return nil, endpoint.convertMetabaseErr(err) return nil, endpoint.convertMetabaseErr(err)
} }
}
// we always need results from both tables for now // we always need results from both tables for now
err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, objectsEntries := make([]ObjectListItem, 0, limit)
metabase.IterateObjectsWithStatus{ err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx,
ProjectID: keyInfo.ProjectID, metabase.IterateObjectsWithStatus{
BucketName: string(req.Bucket), ProjectID: keyInfo.ProjectID,
Prefix: prefix, BucketName: string(req.Bucket),
Cursor: metabase.IterateCursor{ Prefix: prefix,
Key: cursorKey, Cursor: metabase.IterateCursor{
Version: cursorVersion, Key: cursorKey,
}, Version: cursorVersion,
Recursive: req.Recursive, },
BatchSize: limit + 1, Recursive: req.Recursive,
Status: status, BatchSize: limit + 1,
IncludeCustomMetadata: includeCustomMetadata, Status: metabase.Pending,
IncludeSystemMetadata: includeSystemMetadata, IncludeCustomMetadata: includeCustomMetadata,
}, func(ctx context.Context, it metabase.ObjectsIterator) error { IncludeSystemMetadata: includeSystemMetadata,
entry := metabase.ObjectEntry{} }, func(ctx context.Context, it metabase.ObjectsIterator) error {
for len(resp.Items) < limit && it.Next(ctx, &entry) { entry := metabase.ObjectEntry{}
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) for it.Next(ctx, &entry) {
if err != nil { item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement)
return err if err != nil {
return err
}
objectsEntries = append(objectsEntries, ObjectListItem{
Item: item,
StreamID: entry.StreamID,
})
} }
resp.Items = append(resp.Items, item) return nil
} },
)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
// we need to take into account also potential results from IteratePendingObjects // combine results from both tables and sort them by object key to be able to cut results to the limit
resp.More = resp.More || it.Next(ctx, &entry) allResults := make([]ObjectListItem, 0, len(pendingObjectsEntries)+len(objectsEntries))
return nil allResults = append(allResults, pendingObjectsEntries...)
}, allResults = append(allResults, objectsEntries...)
) sort.Slice(allResults, func(i, j int) bool {
if err != nil { keyCompare := bytes.Compare(allResults[i].Item.EncryptedObjectKey, allResults[j].Item.EncryptedObjectKey)
return nil, endpoint.convertMetabaseErr(err) switch {
case keyCompare == -1:
return true
case keyCompare == 1:
return false
case allResults[i].Item.Version < allResults[j].Item.Version:
return true
case allResults[i].Item.Version > allResults[j].Item.Version:
return false
default:
return allResults[i].StreamID.Less(allResults[j].StreamID)
}
})
if len(allResults) >= limit {
resp.More = len(allResults) > limit
allResults = allResults[:limit]
}
resp.Items = make([]*pb.ObjectListItem, len(allResults))
for i, objectListItem := range allResults {
resp.Items[i] = objectListItem.Item
}
} else {
// we always need results from both tables for now
err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx,
metabase.IterateObjectsWithStatus{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
Prefix: prefix,
Cursor: metabase.IterateCursor{
Key: cursorKey,
Version: cursorVersion,
},
Recursive: req.Recursive,
BatchSize: limit + 1,
Status: status,
IncludeCustomMetadata: includeCustomMetadata,
IncludeSystemMetadata: includeSystemMetadata,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
entry := metabase.ObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement)
if err != nil {
return err
}
resp.Items = append(resp.Items, item)
}
// we need to take into account also potential results from IteratePendingObjects
resp.More = resp.More || it.Next(ctx, &entry)
return nil
},
)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
} }
} }
endpoint.log.Info("Object List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object")) endpoint.log.Info("Object List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object"))
@ -1024,25 +1097,47 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
resp = &pb.ObjectListPendingStreamsResponse{} resp = &pb.ObjectListPendingStreamsResponse{}
resp.Items = []*pb.ObjectListItem{} resp.Items = []*pb.ObjectListItem{}
err = endpoint.metabase.IteratePendingObjectsByKey(ctx,
metabase.IteratePendingObjectsByKey{ options := metabase.IteratePendingObjectsByKey{
ObjectLocation: metabase.ObjectLocation{ ObjectLocation: metabase.ObjectLocation{
ProjectID: keyInfo.ProjectID, ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket), BucketName: string(req.Bucket),
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey), ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
},
BatchSize: limit + 1,
Cursor: cursor,
}
if endpoint.config.UsePendingObjectsTable {
err = endpoint.metabase.IteratePendingObjectsByKeyNew(ctx,
options, func(ctx context.Context, it metabase.PendingObjectsIterator) error {
entry := metabase.PendingObjectEntry{}
for it.Next(ctx, &entry) {
item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement)
if err != nil {
return err
}
resp.Items = append(resp.Items, item)
}
return nil
}, },
BatchSize: limit + 1, )
Cursor: cursor, if err != nil {
}, func(ctx context.Context, it metabase.ObjectsIterator) error { return nil, endpoint.convertMetabaseErr(err)
}
}
objectsEntries := make([]*pb.ObjectListItem, 0, limit)
err = endpoint.metabase.IteratePendingObjectsByKey(ctx,
options, func(ctx context.Context, it metabase.ObjectsIterator) error {
entry := metabase.ObjectEntry{} entry := metabase.ObjectEntry{}
for len(resp.Items) < limit && it.Next(ctx, &entry) { for it.Next(ctx, &entry) {
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement) item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement)
if err != nil { if err != nil {
return err return err
} }
resp.Items = append(resp.Items, item) objectsEntries = append(objectsEntries, item)
} }
resp.More = it.Next(ctx, &entry)
return nil return nil
}, },
) )
@ -1050,6 +1145,16 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb.
return nil, endpoint.convertMetabaseErr(err) return nil, endpoint.convertMetabaseErr(err)
} }
// TODO currently this request have a bug if we would like to list all pending objects
// with the same name if we have more than single page of them (1000) because protobuf
// cursor doesn't include additional things like StreamID so it's a bit useless to do
// anything else than just combine results
resp.Items = append(resp.Items, objectsEntries...)
if len(resp.Items) >= limit {
resp.More = len(resp.Items) > limit
resp.Items = resp.Items[:limit]
}
endpoint.log.Info("List pending object streams", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object")) endpoint.log.Info("List pending object streams", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object"))
mon.Meter("req_list_pending_object_streams").Mark(1) mon.Meter("req_list_pending_object_streams").Mark(1)

View File

@ -19,6 +19,7 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"storj.io/common/errs2" "storj.io/common/errs2"
"storj.io/common/identity" "storj.io/common/identity"
@ -802,6 +803,127 @@ func TestEndpoint_Object_No_StorageNodes_UsePendingObjectsTable(t *testing.T) {
require.False(t, iterator.Next()) require.False(t, iterator.Next())
require.NoError(t, iterator.Err()) require.NoError(t, iterator.Err())
}) })
t.Run("ListPendingObjectStreams", func(t *testing.T) {
defer ctx.Check(deleteBucket)
_, err = project.CreateBucket(ctx, bucketName)
require.NoError(t, err)
_, err = project.BeginUpload(ctx, bucketName, "pending_object", nil)
require.NoError(t, err)
iterator := project.ListUploads(ctx, bucketName, &uplink.ListUploadsOptions{
Prefix: "pending_object",
})
require.True(t, iterator.Next())
require.Equal(t, "pending_object", iterator.Item().Key)
require.NoError(t, iterator.Err())
// pending object in objects table
_, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: metabase.ObjectKey("pending_object"),
StreamID: testrand.UUID(),
Version: metabase.NextVersion,
},
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: false,
})
require.NoError(t, err)
keys := []string{}
iterator = project.ListUploads(ctx, bucketName, &uplink.ListUploadsOptions{
Prefix: "pending_object",
})
for iterator.Next() {
keys = append(keys, iterator.Item().Key)
}
require.NoError(t, iterator.Err())
// we should have two objects with the same name, one from pending_objects
// table and second from objects table
require.ElementsMatch(t, []string{
"pending_object",
"pending_object",
}, keys)
})
t.Run("mixed objects from both tables", func(t *testing.T) {
type TestCases struct {
PendingObjectsTable []string
ObjectsTable []string
}
for _, tc := range []TestCases{
{
PendingObjectsTable: []string{"A", "B", "C"},
ObjectsTable: []string{"X", "Y", "Z"},
},
{
PendingObjectsTable: []string{"A", "Y", "C"},
ObjectsTable: []string{"X", "B", "Z"},
},
{
PendingObjectsTable: []string{"X", "B", "Z"},
ObjectsTable: []string{"A", "Y", "C"},
},
{
PendingObjectsTable: []string{"A", "B", "C", "X", "Y", "Z"},
},
{
ObjectsTable: []string{"A", "B", "C", "X", "Y", "Z"},
},
} {
t.Run("", func(t *testing.T) {
defer ctx.Check(deleteBucket)
_, err = project.CreateBucket(ctx, bucketName)
require.NoError(t, err)
allKeys := []string{}
// create objects in pending_objects table
for _, key := range tc.PendingObjectsTable {
_, err = project.BeginUpload(ctx, bucketName, key, nil)
require.NoError(t, err)
allKeys = append(allKeys, key)
}
// create objects in objects table
for _, key := range tc.ObjectsTable {
_, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: metabase.ObjectKey(key),
StreamID: testrand.UUID(),
Version: metabase.NextVersion,
},
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: false,
})
require.NoError(t, err)
allKeys = append(allKeys, key)
}
slices.Sort(allKeys)
for _, limit := range []int{1, 2, 3, 10, 1000} {
ctx := testuplink.WithListLimit(ctx, limit)
resultKeys := []string{}
iterator := project.ListUploads(ctx, bucketName, nil)
for iterator.Next() {
resultKeys = append(resultKeys, iterator.Item().Key)
}
require.NoError(t, iterator.Err())
require.Equal(t, allKeys, resultKeys)
}
})
}
})
}) })
} }