satellite/metainfo: use bucket versioning state for listing
Currently we need to use different methods for listing objects depends on versioning state. Change-Id: I5747a699ba2dafcfc384216e4be0b662c8ae95fa
This commit is contained in:
parent
bae3f7f73d
commit
26a04a5929
@ -899,7 +899,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
}
|
||||
|
||||
// TODO this needs to be optimized to avoid DB call on each request
|
||||
placement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID)
|
||||
bucket, err := endpoint.buckets.GetBucket(ctx, req.Bucket, keyInfo.ProjectID)
|
||||
if err != nil {
|
||||
if buckets.ErrBucketNotFound.Has(err) {
|
||||
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket)
|
||||
@ -914,9 +914,16 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
}
|
||||
metabase.ListLimit.Ensure(&limit)
|
||||
|
||||
// TODO(ver): this is only temporary logic for testing, will be cleanup later
|
||||
// TODO(ver): this is only temporary logic for testing, may require cleanup later
|
||||
// Current logic is:
|
||||
// * VersioningUnsupported || Unversioned - use metabase.IterateObjectsAllVersionsWithStatus
|
||||
// * VersioningEnabled || VersioningSuspended
|
||||
// * IncludeAllVersions == true - use metabase.IterateObjectsAllVersionsWithStatus
|
||||
// * IncludeAllVersions == false - use metabase.ListObjects
|
||||
// For now we want to use metabase.ListObjects only for versioned and suspended buckets because
|
||||
// we need to verify performance of this method before we will use it globally
|
||||
useListObjects := false
|
||||
if endpoint.config.UseBucketLevelObjectVersioningByProject(keyInfo.ProjectID) && endpoint.config.TestEnableBucketVersioning {
|
||||
if bucket.Versioning == buckets.VersioningEnabled || bucket.Versioning == buckets.VersioningSuspended {
|
||||
useListObjects = !req.IncludeAllVersions
|
||||
}
|
||||
|
||||
@ -981,7 +988,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
}
|
||||
|
||||
for _, entry := range result.Objects {
|
||||
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement)
|
||||
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement)
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
@ -1006,7 +1013,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
entry := metabase.ObjectEntry{}
|
||||
for len(resp.Items) < limit && it.Next(ctx, &entry) {
|
||||
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement)
|
||||
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2770,7 +2770,6 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.UseBucketLevelObjectVersioning = true
|
||||
config.Metainfo.TestEnableBucketVersioning = true
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
@ -2809,6 +2808,8 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) {
|
||||
|
||||
require.NoError(t, createBucket(bucketName))
|
||||
|
||||
require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID))
|
||||
|
||||
state, err := planet.Satellites[0].API.Buckets.Service.GetBucketVersioningState(ctx, []byte(bucketName), projectID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, buckets.VersioningEnabled, state)
|
||||
@ -2857,5 +2858,43 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) {
|
||||
require.Nil(t, getResponse.Object.RedundancyScheme)
|
||||
require.Equal(t, pb.Object_DELETE_MARKER_VERSIONED, getResponse.Object.Status)
|
||||
})
|
||||
|
||||
t.Run("listing objects, different versioning state", func(t *testing.T) {
|
||||
defer ctx.Check(deleteBucket(bucketName))
|
||||
|
||||
require.NoError(t, createBucket(bucketName))
|
||||
|
||||
err = planet.Uplinks[0].Upload(ctx, satelliteSys, bucketName, "objectA", testrand.Bytes(100))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = planet.Uplinks[0].Upload(ctx, satelliteSys, bucketName, "objectB", testrand.Bytes(100))
|
||||
require.NoError(t, err)
|
||||
|
||||
checkListing := func(expectedItems int, includeAllVersions bool) {
|
||||
response, err := satelliteSys.API.Metainfo.Endpoint.ListObjects(ctx, &pb.ListObjectsRequest{
|
||||
Header: &pb.RequestHeader{ApiKey: apiKey},
|
||||
Bucket: []byte(bucketName),
|
||||
IncludeAllVersions: includeAllVersions,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.Items, expectedItems)
|
||||
}
|
||||
|
||||
checkListing(2, false)
|
||||
|
||||
require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID))
|
||||
|
||||
// upload second version of objectA
|
||||
err = planet.Uplinks[0].Upload(ctx, satelliteSys, bucketName, "objectA", testrand.Bytes(100))
|
||||
require.NoError(t, err)
|
||||
|
||||
checkListing(2, false)
|
||||
checkListing(3, true)
|
||||
|
||||
require.NoError(t, planet.Satellites[0].API.Buckets.Service.SuspendBucketVersioning(ctx, []byte(bucketName), projectID))
|
||||
|
||||
checkListing(2, false)
|
||||
checkListing(3, true)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user