diff --git a/satellite/metainfo/loop.go b/satellite/metainfo/loop.go index 442a960dc..8997110d8 100644 --- a/satellite/metainfo/loop.go +++ b/satellite/metainfo/loop.go @@ -154,25 +154,6 @@ func (observer *observerContext) Wait() error { return <-observer.done } -type observers []*observerContext - -func (o *observers) Remove(toRemove *observerContext) { - list := *o - for i, observer := range list { - if observer == toRemove { - list[len(list)-1], list[i] = list[i], list[len(list)-1] - *o = list[:len(list)-1] - return - } - } -} - -func (o *observers) Finish() { - for _, observer := range *o { - observer.Finish() - } -} - // LoopConfig contains configurable values for the metainfo loop. type LoopConfig struct { CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s"` @@ -299,7 +280,7 @@ func (loop *Loop) Wait() { <-loop.done } -func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter) (err error) { +func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) { defer func() { if err != nil { for _, observer := range observers { @@ -307,7 +288,7 @@ func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, met } return } - observers.Finish() + finishObservers(observers) }() more := true @@ -322,12 +303,17 @@ func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, met } for _, bucket := range buckets.Items { - err := iterateObjects(ctx, bucket.ProjectID, bucket.Name, metabaseDB, observers, limit, rateLimiter) + observers, err = iterateObjects(ctx, bucket.ProjectID, bucket.Name, metabaseDB, observers, limit, rateLimiter) if err != nil { return LoopError.Wrap(err) } } + // if context has been canceled exit. Otherwise, continue + if err := ctx.Err(); err != nil { + return err + } + more = buckets.More if more { lastBucket := buckets.Items[len(buckets.Items)-1] @@ -338,7 +324,7 @@ func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, met return err } -func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter) (err error) { +func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) { defer mon.Task()(&ctx)(&err) // TODO we should improve performance here, this is just most straightforward solution @@ -359,6 +345,7 @@ func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, met return err } + nextObservers := observers[:0] for _, observer := range observers { location := metabase.ObjectLocation{ ProjectID: projectID, @@ -366,89 +353,77 @@ func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, met ObjectKey: entry.ObjectKey, } keepObserver := handleObject(ctx, observer, location, entry) - if !keepObserver { - observers.Remove(observer) + if keepObserver { + nextObservers = append(nextObservers, observer) } } + observers = nextObservers if len(observers) == 0 { return nil } // if context has been canceled exit. Otherwise, continue - select { - case <-ctx.Done(): - return ctx.Err() - default: + if err := ctx.Err(); err != nil { + return err } - err = iterateSegments(ctx, entry.StreamID, projectID, bucket, entry.ObjectKey, metabaseDB, observers, limit, rateLimiter, entry.ExpiresAt) - if err != nil { - return err + more := true + cursor := metabase.SegmentPosition{} + for more { + if err := rateLimiter.Wait(ctx); err != nil { + // We don't really execute concurrent batches so we should never + // exceed the burst size of 1 and this should never happen. + // We can also enter here if the context is cancelled. + return err + } + + segments, err := metabaseDB.ListSegments(ctx, metabase.ListSegments{ + StreamID: entry.StreamID, + Cursor: cursor, + Limit: limit, + }) + if err != nil { + return err + } + + for _, segment := range segments.Segments { + nextObservers := observers[:0] + location := metabase.SegmentLocation{ + ProjectID: projectID, + BucketName: bucket, + ObjectKey: entry.ObjectKey, + Position: segment.Position, + } + for _, observer := range observers { + keepObserver := handleSegment(ctx, observer, location, segment, entry.ExpiresAt) + if keepObserver { + nextObservers = append(nextObservers, observer) + } + } + + observers = nextObservers + if len(observers) == 0 { + return nil + } + + // if context has been canceled exit. Otherwise, continue + if err := ctx.Err(); err != nil { + return err + } + } + + more = segments.More + if more { + lastSegment := segments.Segments[len(segments.Segments)-1] + cursor = lastSegment.Position + } } } return nil }) - return err -} - -func iterateSegments(ctx context.Context, streamID uuid.UUID, projectID uuid.UUID, bucket string, objectKey metabase.ObjectKey, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter, expireAt *time.Time) (err error) { - defer mon.Task()(&ctx)(&err) - - more := true - cursor := metabase.SegmentPosition{} - for more { - if err := rateLimiter.Wait(ctx); err != nil { - // We don't really execute concurrent batches so we should never - // exceed the burst size of 1 and this should never happen. - // We can also enter here if the context is cancelled. - return err - } - - segments, err := metabaseDB.ListSegments(ctx, metabase.ListSegments{ - StreamID: streamID, - Cursor: cursor, - Limit: limit, - }) - if err != nil { - return err - } - - for _, segment := range segments.Segments { - for _, observer := range observers { - location := metabase.SegmentLocation{ - ProjectID: projectID, - BucketName: bucket, - ObjectKey: objectKey, - Position: segment.Position, - } - keepObserver := handleSegment(ctx, observer, location, segment, expireAt) - if !keepObserver { - observers.Remove(observer) - } - } - - if len(observers) == 0 { - return nil - } - - // if context has been canceled exit. Otherwise, continue - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - } - - more = segments.More - if more { - lastSegment := segments.Segments[len(segments.Segments)-1] - cursor = lastSegment.Position - } - } - - return nil + return observers, err } func handleObject(ctx context.Context, observer *observerContext, location metabase.ObjectLocation, object metabase.ObjectEntry) bool { @@ -486,7 +461,7 @@ func handleSegment(ctx context.Context, observer *observerContext, location meta } loopSegment.StreamID = segment.StreamID - loopSegment.DataSize = int(segment.EncryptedSize) // TODO should this be plain or enrypted size + loopSegment.DataSize = int(segment.EncryptedSize) if segment.Inline() { loopSegment.Inline = true if observer.HandleError(observer.InlineSegment(ctx, loopSegment)) {