satellite/metainfo: temporary feature flag for listing query testing
Fixes: https://github.com/storj/storj/issues/5144 Change-Id: I7650f4d5dd0378e2246339e79710a695996a845c
This commit is contained in:
parent
802ff18bd8
commit
02924d0ded
@ -139,4 +139,6 @@ type Config struct {
|
||||
ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"`
|
||||
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`
|
||||
MultipleVersions bool `help:"feature flag to enable using multple objects versions in the system internally" default:"false"`
|
||||
// TODO remove when we benchmarking are done and decision is made.
|
||||
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
|
||||
}
|
||||
|
@ -753,38 +753,67 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
}
|
||||
|
||||
resp = &pb.ObjectListResponse{}
|
||||
// TODO: Replace with IterateObjectsLatestVersion when ready
|
||||
err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx,
|
||||
metabase.IterateObjectsWithStatus{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
Prefix: prefix,
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: metabase.ObjectKey(cursor),
|
||||
Version: metabase.DefaultVersion, // TODO: set to a the version from the protobuf request when it supports this
|
||||
},
|
||||
Recursive: req.Recursive,
|
||||
BatchSize: limit + 1,
|
||||
Status: status,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
entry := metabase.ObjectEntry{}
|
||||
for len(resp.Items) < limit && it.Next(ctx, &entry) {
|
||||
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeCustomMetadata, placement)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Items = append(resp.Items, item)
|
||||
}
|
||||
resp.More = it.Next(ctx, &entry)
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
if endpoint.config.TestListingQuery {
|
||||
result, err := endpoint.metabase.ListObjects(ctx,
|
||||
metabase.ListObjects{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
Prefix: prefix,
|
||||
Cursor: metabase.ListObjectsCursor{
|
||||
Key: metabase.ObjectKey(cursor),
|
||||
Version: metabase.DefaultVersion, // TODO: set to a the version from the protobuf request when it supports this
|
||||
},
|
||||
Recursive: req.Recursive,
|
||||
Limit: limit,
|
||||
Status: status,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
|
||||
for _, entry := range result.Objects {
|
||||
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeCustomMetadata, placement)
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
resp.Items = append(resp.Items, item)
|
||||
}
|
||||
resp.More = result.More
|
||||
} else {
|
||||
// TODO: Replace with IterateObjectsLatestVersion when ready
|
||||
err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx,
|
||||
metabase.IterateObjectsWithStatus{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
Prefix: prefix,
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: metabase.ObjectKey(cursor),
|
||||
Version: metabase.DefaultVersion, // TODO: set to a the version from the protobuf request when it supports this
|
||||
},
|
||||
Recursive: req.Recursive,
|
||||
BatchSize: limit + 1,
|
||||
Status: status,
|
||||
IncludeCustomMetadata: includeCustomMetadata,
|
||||
IncludeSystemMetadata: includeSystemMetadata,
|
||||
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
|
||||
entry := metabase.ObjectEntry{}
|
||||
for len(resp.Items) < limit && it.Next(ctx, &entry) {
|
||||
item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeCustomMetadata, placement)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp.Items = append(resp.Items, item)
|
||||
}
|
||||
resp.More = it.Next(ctx, &entry)
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
}
|
||||
endpoint.log.Info("Object List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object"))
|
||||
mon.Meter("req_list_object").Mark(1)
|
||||
|
||||
|
@ -595,9 +595,120 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
|
||||
require.Contains(t, err.Error(), "Invalid expiration time")
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.InvalidArgument))
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// TODO remove when listing query tests feature flag is removed.
|
||||
func TestEndpoint_Object_No_StorageNodes_TestListingQuery(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(testplanet.MaxObjectKeyLength(1024), func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.TestListingQuery = true
|
||||
}),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
|
||||
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(metainfoClient.Close)
|
||||
|
||||
bucketName := "testbucket"
|
||||
deleteBucket := func() error {
|
||||
_, err := metainfoClient.DeleteBucket(ctx, metaclient.DeleteBucketParams{
|
||||
Name: []byte(bucketName),
|
||||
DeleteAll: true,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
t.Run("list service with listing query test", func(t *testing.T) {
|
||||
defer ctx.Check(deleteBucket)
|
||||
|
||||
items := []struct {
|
||||
Key string
|
||||
Value []byte
|
||||
}{
|
||||
{Key: "sample.😶", Value: []byte{1}},
|
||||
{Key: "müsic", Value: []byte{2}},
|
||||
{Key: "müsic/söng1.mp3", Value: []byte{3}},
|
||||
{Key: "müsic/söng2.mp3", Value: []byte{4}},
|
||||
{Key: "müsic/album/söng3.mp3", Value: []byte{5}},
|
||||
{Key: "müsic/söng4.mp3", Value: []byte{6}},
|
||||
{Key: "ビデオ/movie.mkv", Value: []byte{7}},
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, item.Key, item.Value)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
project, err := planet.Uplinks[0].GetProject(ctx, planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(project.Close)
|
||||
|
||||
objects := project.ListObjects(ctx, "testbucket", &uplink.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
})
|
||||
|
||||
listItems := make([]*uplink.Object, 0)
|
||||
for objects.Next() {
|
||||
listItems = append(listItems, objects.Item())
|
||||
}
|
||||
require.NoError(t, objects.Err())
|
||||
|
||||
expected := []storj.Object{
|
||||
{Path: "müsic"},
|
||||
{Path: "müsic/album/söng3.mp3"},
|
||||
{Path: "müsic/söng1.mp3"},
|
||||
{Path: "müsic/söng2.mp3"},
|
||||
{Path: "müsic/söng4.mp3"},
|
||||
{Path: "sample.😶"},
|
||||
{Path: "ビデオ/movie.mkv"},
|
||||
}
|
||||
|
||||
require.Equal(t, len(expected), len(listItems))
|
||||
sort.Slice(listItems, func(i, k int) bool {
|
||||
return listItems[i].Key < listItems[k].Key
|
||||
})
|
||||
for i, item := range expected {
|
||||
require.Equal(t, item.Path, listItems[i].Key)
|
||||
require.Equal(t, item.IsPrefix, listItems[i].IsPrefix)
|
||||
}
|
||||
|
||||
objects = project.ListObjects(ctx, bucketName, &uplink.ListObjectsOptions{
|
||||
Recursive: false,
|
||||
})
|
||||
|
||||
listItems = make([]*uplink.Object, 0)
|
||||
for objects.Next() {
|
||||
listItems = append(listItems, objects.Item())
|
||||
}
|
||||
require.NoError(t, objects.Err())
|
||||
|
||||
expected = []storj.Object{
|
||||
{Path: "müsic"},
|
||||
{Path: "müsic/", IsPrefix: true},
|
||||
{Path: "sample.😶"},
|
||||
{Path: "ビデオ/", IsPrefix: true},
|
||||
}
|
||||
|
||||
require.Equal(t, len(expected), len(listItems))
|
||||
sort.Slice(listItems, func(i, k int) bool {
|
||||
return listItems[i].Key < listItems[k].Key
|
||||
})
|
||||
for i, item := range expected {
|
||||
t.Log(item.Path, listItems[i].Key)
|
||||
require.Equal(t, item.Path, listItems[i].Key)
|
||||
require.Equal(t, item.IsPrefix, listItems[i].IsPrefix)
|
||||
}
|
||||
})
|
||||
|
||||
})
|
||||
}
|
||||
func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -616,6 +616,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy
|
||||
# metainfo.server-side-copy-disabled: false
|
||||
|
||||
# test the new query for non-recursive listing
|
||||
# metainfo.test-listing-query: false
|
||||
|
||||
# address(es) to send telemetry to (comma-separated)
|
||||
# metrics.addr: collectora.storj.io:9000
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user