satellite/metainfo: fix duplicates while listing committed objects
We have an issue where object can appear in two different listing pages. It's because protobuf listing cursor doesn't have version included and now we can have internally versions higher than 1. On satellite side version 1 was always used as a default cursor version. As a workaround for existing implementation of libuplink library we will use always maximum version for listing cursor on satellite side. Fixing protobuf and libuplink implementation will happen later. https://github.com/storj/storj/issues/5570 Change-Id: Ibd27b174556c9d8b8bd60fab8cff7862fd11e994
This commit is contained in:
parent
4a67b57103
commit
41bcc6bb62
@ -319,6 +319,10 @@ const NextVersion = Version(0)
|
||||
// DefaultVersion represents default version 1.
|
||||
const DefaultVersion = Version(1)
|
||||
|
||||
// MaxVersion represents maximum version.
|
||||
// Version in DB is represented as INT4.
|
||||
const MaxVersion = Version(math.MaxInt32)
|
||||
|
||||
// ObjectStatus defines the statuses that the object might be in.
|
||||
type ObjectStatus byte
|
||||
|
||||
|
@ -14,10 +14,7 @@ import (
|
||||
)
|
||||
|
||||
// ListObjectsCursor is a cursor used during iteration through objects.
|
||||
type ListObjectsCursor struct {
|
||||
Key ObjectKey
|
||||
Version Version
|
||||
}
|
||||
type ListObjectsCursor IterateCursor
|
||||
|
||||
// ListObjects contains arguments necessary for listing objects.
|
||||
type ListObjects struct {
|
||||
|
@ -798,16 +798,31 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
status = metabase.ObjectStatus(req.Status)
|
||||
}
|
||||
|
||||
cursor := string(req.EncryptedCursor)
|
||||
if len(cursor) != 0 {
|
||||
cursor = string(prefix) + cursor
|
||||
cursor := metabase.IterateCursor{
|
||||
Key: metabase.ObjectKey(req.EncryptedCursor),
|
||||
// TODO: set to a the version from the protobuf request when it supports this
|
||||
}
|
||||
if len(cursor.Key) != 0 {
|
||||
cursor.Key = prefix + cursor.Key
|
||||
|
||||
if status == metabase.Committed {
|
||||
// TODO this is a workaround to avoid duplicates while listing objects by libuplink.
|
||||
// because version is not part of cursor yet and we can have object with version higher
|
||||
// than 1 we cannot use hardcoded version 1 as default.
|
||||
// This workaround should be in place for a longer time even if metainfo protocol will be
|
||||
// fix as we still want to avoid this problem for older libuplink versions.
|
||||
cursor.Version = metabase.MaxVersion
|
||||
}
|
||||
}
|
||||
|
||||
includeCustomMetadata := true
|
||||
includeSystemMetadata := true
|
||||
if req.UseObjectIncludes {
|
||||
includeCustomMetadata = req.ObjectIncludes.Metadata
|
||||
includeSystemMetadata = !req.ObjectIncludes.ExcludeSystemMetadata
|
||||
// because multipart upload UploadID depends on some System metadata fields we need
|
||||
// to force reading it for listing pending object when its not included in options.
|
||||
// This is used by libuplink ListUploads method.
|
||||
includeSystemMetadata = status == metabase.Pending || !req.ObjectIncludes.ExcludeSystemMetadata
|
||||
}
|
||||
|
||||
resp = &pb.ObjectListResponse{}
|
||||
@ -817,18 +832,12 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
Prefix: prefix,
|
||||
Cursor: metabase.ListObjectsCursor{
|
||||
Key: metabase.ObjectKey(cursor),
|
||||
Version: metabase.DefaultVersion, // TODO: set to a the version from the protobuf request when it supports this
|
||||
},
|
||||
Cursor: metabase.ListObjectsCursor(cursor),
|
||||
Recursive: req.Recursive,
|
||||
Limit: limit,
|
||||
Status: status,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
// because multipart upload UploadID depends on some System metadata fields we need
|
||||
// to force reading it for listing pending object when its not included in options.
|
||||
// This is used by libuplink ListUploads method.
|
||||
IncludeSystemMetadata: status == metabase.Pending || includeSystemMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
@ -843,21 +852,17 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
}
|
||||
resp.More = result.More
|
||||
} else {
|
||||
// TODO: Replace with IterateObjectsLatestVersion when ready
|
||||
err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx,
|
||||
metabase.IterateObjectsWithStatus{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
Prefix: prefix,
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: metabase.ObjectKey(cursor),
|
||||
Version: metabase.DefaultVersion, // TODO: set to a the version from the protobuf request when it supports this
|
||||
},
|
||||
Cursor: cursor,
|
||||
Recursive: req.Recursive,
|
||||
BatchSize: limit + 1,
|
||||
Status: status,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
IncludeSystemMetadata: status == metabase.Pending || includeSystemMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
entry := metabase.ObjectEntry{}
|
||||
for len(resp.Items) < limit && it.Next(ctx, &entry) {
|
||||
|
@ -2340,3 +2340,69 @@ func TestEndpoint_Object_MoveObject_MultipleVersions(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestListObjectDuplicates(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
u := planet.Uplinks[0]
|
||||
s := planet.Satellites[0]
|
||||
|
||||
const amount = 23
|
||||
|
||||
require.NoError(t, u.CreateBucket(ctx, s, "test"))
|
||||
|
||||
prefixes := []string{"", "aprefix/"}
|
||||
|
||||
// reupload some objects many times to force different
|
||||
// object versions internally
|
||||
for _, prefix := range prefixes {
|
||||
for i := 0; i < amount; i++ {
|
||||
version := 1
|
||||
if i%2 == 0 {
|
||||
version = 2
|
||||
} else if i%3 == 0 {
|
||||
version = 3
|
||||
}
|
||||
|
||||
for v := 0; v < version; v++ {
|
||||
require.NoError(t, u.Upload(ctx, s, "test", prefix+fmt.Sprintf("file-%d", i), nil))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
project, err := u.GetProject(ctx, s)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(project.Close)
|
||||
|
||||
for _, prefix := range prefixes {
|
||||
prefixLabel := prefix
|
||||
if prefixLabel == "" {
|
||||
prefixLabel = "empty"
|
||||
}
|
||||
|
||||
for _, listLimit := range []int{0, 1, 2, 3, 7, amount - 1, amount} {
|
||||
t.Run(fmt.Sprintf("prefix %s limit %d", prefixLabel, listLimit), func(t *testing.T) {
|
||||
limitCtx := testuplink.WithListLimit(ctx, listLimit)
|
||||
|
||||
keys := make(map[string]struct{})
|
||||
iter := project.ListObjects(limitCtx, "test", &uplink.ListObjectsOptions{
|
||||
Prefix: prefix,
|
||||
})
|
||||
for iter.Next() {
|
||||
if iter.Item().IsPrefix {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := keys[iter.Item().Key]; ok {
|
||||
t.Fatal("duplicate", iter.Item().Key, len(keys))
|
||||
}
|
||||
keys[iter.Item().Key] = struct{}{}
|
||||
}
|
||||
require.NoError(t, iter.Err())
|
||||
require.Equal(t, amount, len(keys))
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user