satellite/metainfo: fix metainfo loop

ListAllBuckets could skip buckets when the total number of buckets
exceeds list limit. Replace listing buckets with looping directly
on the objects table.

Change-Id: I43da2fdf51e83915a7854b782f0e9ec32c373018
This commit is contained in:
Egon Elbre 2021-02-18 13:37:49 +02:00
parent c860b74a37
commit 8e95e76c35
3 changed files with 50 additions and 47 deletions

View File

@ -207,6 +207,8 @@ type MetabaseDB interface {
IterateObjectsAllVersionsWithStatus(ctx context.Context, opts metabase.IterateObjectsWithStatus, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
// IteratePendingObjectsByKey iterates through all StreamID for a given ObjectKey.
IteratePendingObjectsByKey(ctx context.Context, opts metabase.IteratePendingObjectsByKey, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
// FullIterateObjects iterates through all objects in metabase.
FullIterateObjects(ctx context.Context, opts metabase.FullIterateObjects, fn func(context.Context, metabase.FullObjectsIterator) error) (err error)
// BucketEmpty returns true if bucket does not contain objects (pending or committed).
// This method doesn't check bucket existence.
BucketEmpty(ctx context.Context, opts metabase.BucketEmpty) (empty bool, err error)

View File

@ -291,50 +291,22 @@ func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, met
finishObservers(observers)
}()
more := true
bucketsCursor := ListAllBucketsCursor{}
for more {
buckets, err := bucketsDB.ListAllBuckets(ctx, ListAllBucketsOptions{
Cursor: bucketsCursor,
Limit: limit,
})
if err != nil {
return LoopError.Wrap(err)
}
for _, bucket := range buckets.Items {
observers, err = iterateObjects(ctx, bucket.ProjectID, bucket.Name, metabaseDB, observers, limit, rateLimiter)
if err != nil {
return LoopError.Wrap(err)
}
}
// if context has been canceled exit. Otherwise, continue
if err := ctx.Err(); err != nil {
return err
}
more = buckets.More
if more {
lastBucket := buckets.Items[len(buckets.Items)-1]
bucketsCursor.ProjectID = lastBucket.ProjectID
bucketsCursor.BucketName = []byte(lastBucket.Name)
}
observers, err = iterateObjects(ctx, metabaseDB, observers, limit, rateLimiter)
if err != nil {
return LoopError.Wrap(err)
}
return err
}
func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
defer mon.Task()(&ctx)(&err)
// TODO we should improve performance here, this is just most straightforward solution
err = metabaseDB.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
ProjectID: projectID,
BucketName: bucket,
BatchSize: limit,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
var entry metabase.ObjectEntry
err = metabaseDB.FullIterateObjects(ctx, metabase.FullIterateObjects{
BatchSize: limit,
}, func(ctx context.Context, it metabase.FullObjectsIterator) error {
var entry metabase.FullObjectEntry
for it.Next(ctx, &entry) {
if err := rateLimiter.Wait(ctx); err != nil {
// We don't really execute concurrent batches so we should never
@ -345,12 +317,7 @@ func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, met
nextObservers := observers[:0]
for _, observer := range observers {
location := metabase.ObjectLocation{
ProjectID: projectID,
BucketName: bucket,
ObjectKey: entry.ObjectKey,
}
keepObserver := handleObject(ctx, observer, location, entry)
keepObserver := handleObject(ctx, observer, entry)
if keepObserver {
nextObservers = append(nextObservers, observer)
}
@ -388,8 +355,8 @@ func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, met
for _, segment := range segments.Segments {
nextObservers := observers[:0]
location := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: bucket,
ProjectID: entry.ProjectID,
BucketName: entry.BucketName,
ObjectKey: entry.ObjectKey,
Position: segment.Position,
}
@ -424,14 +391,14 @@ func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, met
return observers, err
}
func handleObject(ctx context.Context, observer *observerContext, location metabase.ObjectLocation, object metabase.ObjectEntry) bool {
func handleObject(ctx context.Context, observer *observerContext, object metabase.FullObjectEntry) bool {
expirationDate := time.Time{}
if object.ExpiresAt != nil {
expirationDate = *object.ExpiresAt
}
if observer.HandleError(observer.Object(ctx, &Object{
Location: location,
Location: object.Location(),
StreamID: object.StreamID,
SegmentCount: int(object.SegmentCount),
MetadataSize: len(object.EncryptedMetadata),

View File

@ -109,6 +109,40 @@ func TestLoop(t *testing.T) {
})
}
func TestLoop_AllData(t *testing.T) {
segmentSize := 8 * memory.KiB
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
UplinkCount: 3,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.Loop.CoalesceDuration = 1 * time.Second
config.Metainfo.Loop.ListLimit = 2
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
bucketNames := strings.Split("abc", "")
data := testrand.Bytes(segmentSize)
for _, up := range planet.Uplinks {
for _, bucketName := range bucketNames {
err := up.Upload(ctx, planet.Satellites[0], "zzz"+bucketName, "1", data)
require.NoError(t, err)
}
}
metaLoop := planet.Satellites[0].Metainfo.Loop
obs := newTestObserver(nil)
err := metaLoop.Join(ctx, obs)
require.NoError(t, err)
gotItems := len(obs.uniquePaths)
require.Equal(t, len(bucketNames)*len(planet.Uplinks), gotItems)
})
}
// TestLoopObserverCancel does the following:
// * upload 3 remote segments
// * hook three observers up to metainfo loop