satellite/metainfo: fix metainfo loop

This fix issues with passing observers between iteration methods.

It's not best implementation but I think we will need to optimize it
soon one way or another.

Change-Id: I574599bfd10822d84e2d2f1800bcd88e176a76ea
This commit is contained in:
Michal Niewrzal 2020-12-15 22:44:57 +01:00 committed by Egon Elbre
parent ce20db9f68
commit 311b082838

View File

@ -154,25 +154,6 @@ func (observer *observerContext) Wait() error {
return <-observer.done
}
type observers []*observerContext
func (o *observers) Remove(toRemove *observerContext) {
list := *o
for i, observer := range list {
if observer == toRemove {
list[len(list)-1], list[i] = list[i], list[len(list)-1]
*o = list[:len(list)-1]
return
}
}
}
func (o *observers) Finish() {
for _, observer := range *o {
observer.Finish()
}
}
// LoopConfig contains configurable values for the metainfo loop.
type LoopConfig struct {
CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s"`
@ -299,7 +280,7 @@ func (loop *Loop) Wait() {
<-loop.done
}
func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter) (err error) {
func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (err error) {
defer func() {
if err != nil {
for _, observer := range observers {
@ -307,7 +288,7 @@ func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, met
}
return
}
observers.Finish()
finishObservers(observers)
}()
more := true
@ -322,12 +303,17 @@ func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, met
}
for _, bucket := range buckets.Items {
err := iterateObjects(ctx, bucket.ProjectID, bucket.Name, metabaseDB, observers, limit, rateLimiter)
observers, err = iterateObjects(ctx, bucket.ProjectID, bucket.Name, metabaseDB, observers, limit, rateLimiter)
if err != nil {
return LoopError.Wrap(err)
}
}
// if context has been canceled exit. Otherwise, continue
if err := ctx.Err(); err != nil {
return err
}
more = buckets.More
if more {
lastBucket := buckets.Items[len(buckets.Items)-1]
@ -338,7 +324,7 @@ func iterateDatabase(ctx context.Context, db PointerDB, bucketsDB BucketsDB, met
return err
}
func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter) (err error) {
func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, metabaseDB MetabaseDB, observers []*observerContext, limit int, rateLimiter *rate.Limiter) (_ []*observerContext, err error) {
defer mon.Task()(&ctx)(&err)
// TODO we should improve performance here, this is just most straightforward solution
@ -359,6 +345,7 @@ func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, met
return err
}
nextObservers := observers[:0]
for _, observer := range observers {
location := metabase.ObjectLocation{
ProjectID: projectID,
@ -366,89 +353,77 @@ func iterateObjects(ctx context.Context, projectID uuid.UUID, bucket string, met
ObjectKey: entry.ObjectKey,
}
keepObserver := handleObject(ctx, observer, location, entry)
if !keepObserver {
observers.Remove(observer)
if keepObserver {
nextObservers = append(nextObservers, observer)
}
}
observers = nextObservers
if len(observers) == 0 {
return nil
}
// if context has been canceled exit. Otherwise, continue
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := ctx.Err(); err != nil {
return err
}
err = iterateSegments(ctx, entry.StreamID, projectID, bucket, entry.ObjectKey, metabaseDB, observers, limit, rateLimiter, entry.ExpiresAt)
if err != nil {
return err
more := true
cursor := metabase.SegmentPosition{}
for more {
if err := rateLimiter.Wait(ctx); err != nil {
// We don't really execute concurrent batches so we should never
// exceed the burst size of 1 and this should never happen.
// We can also enter here if the context is cancelled.
return err
}
segments, err := metabaseDB.ListSegments(ctx, metabase.ListSegments{
StreamID: entry.StreamID,
Cursor: cursor,
Limit: limit,
})
if err != nil {
return err
}
for _, segment := range segments.Segments {
nextObservers := observers[:0]
location := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: bucket,
ObjectKey: entry.ObjectKey,
Position: segment.Position,
}
for _, observer := range observers {
keepObserver := handleSegment(ctx, observer, location, segment, entry.ExpiresAt)
if keepObserver {
nextObservers = append(nextObservers, observer)
}
}
observers = nextObservers
if len(observers) == 0 {
return nil
}
// if context has been canceled exit. Otherwise, continue
if err := ctx.Err(); err != nil {
return err
}
}
more = segments.More
if more {
lastSegment := segments.Segments[len(segments.Segments)-1]
cursor = lastSegment.Position
}
}
}
return nil
})
return err
}
func iterateSegments(ctx context.Context, streamID uuid.UUID, projectID uuid.UUID, bucket string, objectKey metabase.ObjectKey, metabaseDB MetabaseDB, observers observers, limit int, rateLimiter *rate.Limiter, expireAt *time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
more := true
cursor := metabase.SegmentPosition{}
for more {
if err := rateLimiter.Wait(ctx); err != nil {
// We don't really execute concurrent batches so we should never
// exceed the burst size of 1 and this should never happen.
// We can also enter here if the context is cancelled.
return err
}
segments, err := metabaseDB.ListSegments(ctx, metabase.ListSegments{
StreamID: streamID,
Cursor: cursor,
Limit: limit,
})
if err != nil {
return err
}
for _, segment := range segments.Segments {
for _, observer := range observers {
location := metabase.SegmentLocation{
ProjectID: projectID,
BucketName: bucket,
ObjectKey: objectKey,
Position: segment.Position,
}
keepObserver := handleSegment(ctx, observer, location, segment, expireAt)
if !keepObserver {
observers.Remove(observer)
}
}
if len(observers) == 0 {
return nil
}
// if context has been canceled exit. Otherwise, continue
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
more = segments.More
if more {
lastSegment := segments.Segments[len(segments.Segments)-1]
cursor = lastSegment.Position
}
}
return nil
return observers, err
}
func handleObject(ctx context.Context, observer *observerContext, location metabase.ObjectLocation, object metabase.ObjectEntry) bool {
@ -486,7 +461,7 @@ func handleSegment(ctx context.Context, observer *observerContext, location meta
}
loopSegment.StreamID = segment.StreamID
loopSegment.DataSize = int(segment.EncryptedSize) // TODO should this be plain or enrypted size
loopSegment.DataSize = int(segment.EncryptedSize)
if segment.Inline() {
loopSegment.Inline = true
if observer.HandleError(observer.InlineSegment(ctx, loopSegment)) {