From 86c41790ce6bf4cbd204302d04126665779b4cd7 Mon Sep 17 00:00:00 2001 From: JT Olio Date: Tue, 30 Mar 2021 14:24:35 -0600 Subject: [PATCH] satellite/metainfo/metaloop: add observability we want to know a lot more about what's going on during the operation of the metainfo loop. this patchset adds more instrumentation to previously unmonitored but interesting functions, and adds metrics that keep track of how far through a specific loop we are. it also adds mon:lock annotations, especially to the metainfo loop run task, which recently changed, silently broke some queries, and thus failed to alert us to spiking run time issues. Change-Id: I4358e2f2293d8ebe30eef497ba4e423ece929041 --- monkit.lock | 4 ++ satellite/metainfo/metaloop/service.go | 86 +++++++++++++++----------- 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/monkit.lock b/monkit.lock index f5f9062ac..fb6ab0a91 100644 --- a/monkit.lock +++ b/monkit.lock @@ -62,6 +62,10 @@ storj.io/storj/satellite/gracefulexit."graceful_exit_successful_pieces_transfer_ storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter storj.io/storj/satellite/metainfo."metainfo_rate_limit_exceeded" Event +storj.io/storj/satellite/metainfo/metaloop."objectsIterated" IntVal +storj.io/storj/satellite/metainfo/metaloop."objectsProcessed" IntVal +storj.io/storj/satellite/metainfo/metaloop."segmentsProcessed" IntVal +storj.io/storj/satellite/metainfo/metaloop.*Service.RunOnce Task storj.io/storj/satellite/metainfo/piecedeletion."delete_batch_size" IntVal storj.io/storj/satellite/metainfo/piecedeletion."deletion_pieces_unhandled_count" IntVal storj.io/storj/satellite/orders."download_failed_not_enough_pieces_uplink" Meter diff --git a/satellite/metainfo/metaloop/service.go b/satellite/metainfo/metaloop/service.go index ed7dff6d6..bdf02e54f 100644 --- a/satellite/metainfo/metaloop/service.go +++ b/satellite/metainfo/metaloop/service.go @@ -234,7 +234,7 @@ func (loop *Service) Close() (err error) { // // It is not safe to call this concurrently with Run. func (loop *Service) RunOnce(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) + defer mon.Task()(&ctx)(&err) //mon:locked var observers []*observerContext @@ -304,15 +304,21 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs objectsMap := make(map[uuid.UUID]metabase.LoopObjectEntry) ids := make([]uuid.UUID, 0, limit) - processBatch := func() error { + var objectsProcessed, segmentsProcessed int64 + + processBatch := func(ctx context.Context) (err error) { + defer mon.TaskNamed("processBatch")(&ctx)(&err) + if len(objectsMap) == 0 { return nil } - err := metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{ + err = metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{ StreamIDs: ids, AsOfSystemTime: startingTime, - }, func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error { + }, func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) (err error) { + defer mon.TaskNamed("iterateLoopStreamsCB")(&ctx, "objs", objectsProcessed, "segs", segmentsProcessed)(&err) + if err := ctx.Err(); err != nil { return err } @@ -323,14 +329,17 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs } delete(objectsMap, streamID) - observers = withObservers(observers, func(observer *observerContext) bool { + observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool { object := Object(obj) - return handleObject(ctx, observer, &object) + return !observer.HandleError(handleObject(ctx, observer, &object)) }) if len(observers) == 0 { return noObserversErr } + objectsProcessed++ + mon.IntVal("objectsProcessed").Observe(objectsProcessed) //mon:locked + for { // if context has been canceled exit. Otherwise, continue if err := ctx.Err(); err != nil { @@ -349,12 +358,16 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs Position: segment.Position, } - observers = withObservers(observers, func(observer *observerContext) bool { - return handleSegment(ctx, observer, location, segment, obj.ExpiresAt) + observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool { + return !observer.HandleError(handleSegment(ctx, observer, location, segment, obj.ExpiresAt)) }) if len(observers) == 0 { return noObserversErr } + + segmentsProcessed++ + mon.IntVal("segmentsProcessed").Observe(segmentsProcessed) //mon:locked + } return nil @@ -370,19 +383,28 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs return nil } + var objectsIterated int64 + segmentsInBatch := int32(0) err = metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{ BatchSize: limit, AsOfSystemTime: startingTime, - }, func(ctx context.Context, it metabase.LoopObjectsIterator) error { + }, func(ctx context.Context, it metabase.LoopObjectsIterator) (err error) { + defer mon.TaskNamed("iterateLoopObjectsCB")(&ctx)(&err) var entry metabase.LoopObjectEntry for it.Next(ctx, &entry) { + timer := mon.Timer("iterateLoopObjectsRateLimit").Start() if err := rateLimiter.Wait(ctx); err != nil { // We don't really execute concurrent batches so we should never // exceed the burst size of 1 and this should never happen. // We can also enter here if the context is cancelled. + timer.Stop() return err } + timer.Stop() + + mon.IntVal("objectsIterated").Observe(objectsIterated) //mon:locked + objectsProcessed++ objectsMap[entry.StreamID] = entry ids = append(ids, entry.StreamID) @@ -391,7 +413,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs segmentsInBatch += entry.SegmentCount + 1 if segmentsInBatch >= int32(limit) { - err := processBatch() + err := processBatch(ctx) if err != nil { if errors.Is(err, noObserversErr) { return nil @@ -407,7 +429,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs segmentsInBatch = 0 } } - err = processBatch() + err = processBatch(ctx) if errors.Is(err, noObserversErr) { return nil } @@ -417,10 +439,11 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs return observers, err } -func withObservers(observers []*observerContext, handleObserver func(observer *observerContext) bool) []*observerContext { +func withObservers(ctx context.Context, observers []*observerContext, handleObserver func(ctx context.Context, observer *observerContext) bool) []*observerContext { + defer mon.Task()(&ctx)(nil) nextObservers := observers[:0] for _, observer := range observers { - keepObserver := handleObserver(observer) + keepObserver := handleObserver(ctx, observer) if keepObserver { nextObservers = append(nextObservers, observer) } @@ -428,22 +451,18 @@ func withObservers(observers []*observerContext, handleObserver func(observer *o return nextObservers } -func handleObject(ctx context.Context, observer *observerContext, object *Object) bool { - if observer.HandleError(observer.Object(ctx, object)) { - return false +func handleObject(ctx context.Context, observer *observerContext, object *Object) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := observer.Object(ctx, object); err != nil { + return err } - select { - case <-observer.ctx.Done(): - observer.HandleError(observer.ctx.Err()) - return false - default: - } - - return true + return observer.ctx.Err() } -func handleSegment(ctx context.Context, observer *observerContext, location metabase.SegmentLocation, segment metabase.LoopSegmentEntry, expirationDate *time.Time) bool { +func handleSegment(ctx context.Context, observer *observerContext, location metabase.SegmentLocation, segment metabase.LoopSegmentEntry, expirationDate *time.Time) (err error) { + defer mon.Task()(&ctx)(&err) loopSegment := &Segment{ Location: location, // TODO we are not setting this since multipart-upload branch, we need to @@ -461,23 +480,16 @@ func handleSegment(ctx context.Context, observer *observerContext, location meta } if loopSegment.Inline() { - if observer.HandleError(observer.InlineSegment(ctx, loopSegment)) { - return false + if err := observer.InlineSegment(ctx, loopSegment); err != nil { + return err } } else { - if observer.HandleError(observer.RemoteSegment(ctx, loopSegment)) { - return false + if err := observer.RemoteSegment(ctx, loopSegment); err != nil { + return err } } - select { - case <-observer.ctx.Done(): - observer.HandleError(observer.ctx.Err()) - return false - default: - } - - return true + return observer.ctx.Err() } func finishObservers(observers []*observerContext) {