satellite/metainfo/metaloop: add observability

we want to know a lot more about what's going on during
the operation of the metainfo loop. this patchset adds
more instrumentation to previously unmonitored but
interesting functions, and adds metrics that keep track
of how far through a specific loop we are. it also
adds mon:lock annotations, especially to the metainfo
loop run task, which recently changed, silently broke
some queries, and thus failed to alert us to spiking
run time issues.

Change-Id: I4358e2f2293d8ebe30eef497ba4e423ece929041
This commit is contained in:
JT Olio 2021-03-30 14:24:35 -06:00
parent f3176adbce
commit 86c41790ce
2 changed files with 53 additions and 37 deletions

View File

@ -62,6 +62,10 @@ storj.io/storj/satellite/gracefulexit."graceful_exit_successful_pieces_transfer_
storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter
storj.io/storj/satellite/metainfo."metainfo_rate_limit_exceeded" Event
storj.io/storj/satellite/metainfo/metaloop."objectsIterated" IntVal
storj.io/storj/satellite/metainfo/metaloop."objectsProcessed" IntVal
storj.io/storj/satellite/metainfo/metaloop."segmentsProcessed" IntVal
storj.io/storj/satellite/metainfo/metaloop.*Service.RunOnce Task
storj.io/storj/satellite/metainfo/piecedeletion."delete_batch_size" IntVal
storj.io/storj/satellite/metainfo/piecedeletion."deletion_pieces_unhandled_count" IntVal
storj.io/storj/satellite/orders."download_failed_not_enough_pieces_uplink" Meter

View File

@ -234,7 +234,7 @@ func (loop *Service) Close() (err error) {
//
// It is not safe to call this concurrently with Run.
func (loop *Service) RunOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
defer mon.Task()(&ctx)(&err) //mon:locked
var observers []*observerContext
@ -304,15 +304,21 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
objectsMap := make(map[uuid.UUID]metabase.LoopObjectEntry)
ids := make([]uuid.UUID, 0, limit)
processBatch := func() error {
var objectsProcessed, segmentsProcessed int64
processBatch := func(ctx context.Context) (err error) {
defer mon.TaskNamed("processBatch")(&ctx)(&err)
if len(objectsMap) == 0 {
return nil
}
err := metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{
err = metabaseDB.IterateLoopStreams(ctx, metabase.IterateLoopStreams{
StreamIDs: ids,
AsOfSystemTime: startingTime,
}, func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error {
}, func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) (err error) {
defer mon.TaskNamed("iterateLoopStreamsCB")(&ctx, "objs", objectsProcessed, "segs", segmentsProcessed)(&err)
if err := ctx.Err(); err != nil {
return err
}
@ -323,14 +329,17 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
}
delete(objectsMap, streamID)
observers = withObservers(observers, func(observer *observerContext) bool {
observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool {
object := Object(obj)
return handleObject(ctx, observer, &object)
return !observer.HandleError(handleObject(ctx, observer, &object))
})
if len(observers) == 0 {
return noObserversErr
}
objectsProcessed++
mon.IntVal("objectsProcessed").Observe(objectsProcessed) //mon:locked
for {
// if context has been canceled exit. Otherwise, continue
if err := ctx.Err(); err != nil {
@ -349,12 +358,16 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
Position: segment.Position,
}
observers = withObservers(observers, func(observer *observerContext) bool {
return handleSegment(ctx, observer, location, segment, obj.ExpiresAt)
observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool {
return !observer.HandleError(handleSegment(ctx, observer, location, segment, obj.ExpiresAt))
})
if len(observers) == 0 {
return noObserversErr
}
segmentsProcessed++
mon.IntVal("segmentsProcessed").Observe(segmentsProcessed) //mon:locked
}
return nil
@ -370,19 +383,28 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
return nil
}
var objectsIterated int64
segmentsInBatch := int32(0)
err = metabaseDB.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
BatchSize: limit,
AsOfSystemTime: startingTime,
}, func(ctx context.Context, it metabase.LoopObjectsIterator) error {
}, func(ctx context.Context, it metabase.LoopObjectsIterator) (err error) {
defer mon.TaskNamed("iterateLoopObjectsCB")(&ctx)(&err)
var entry metabase.LoopObjectEntry
for it.Next(ctx, &entry) {
timer := mon.Timer("iterateLoopObjectsRateLimit").Start()
if err := rateLimiter.Wait(ctx); err != nil {
// We don't really execute concurrent batches so we should never
// exceed the burst size of 1 and this should never happen.
// We can also enter here if the context is cancelled.
timer.Stop()
return err
}
timer.Stop()
mon.IntVal("objectsIterated").Observe(objectsIterated) //mon:locked
objectsProcessed++
objectsMap[entry.StreamID] = entry
ids = append(ids, entry.StreamID)
@ -391,7 +413,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
segmentsInBatch += entry.SegmentCount + 1
if segmentsInBatch >= int32(limit) {
err := processBatch()
err := processBatch(ctx)
if err != nil {
if errors.Is(err, noObserversErr) {
return nil
@ -407,7 +429,7 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
segmentsInBatch = 0
}
}
err = processBatch()
err = processBatch(ctx)
if errors.Is(err, noObserversErr) {
return nil
}
@ -417,10 +439,11 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
return observers, err
}
func withObservers(observers []*observerContext, handleObserver func(observer *observerContext) bool) []*observerContext {
func withObservers(ctx context.Context, observers []*observerContext, handleObserver func(ctx context.Context, observer *observerContext) bool) []*observerContext {
defer mon.Task()(&ctx)(nil)
nextObservers := observers[:0]
for _, observer := range observers {
keepObserver := handleObserver(observer)
keepObserver := handleObserver(ctx, observer)
if keepObserver {
nextObservers = append(nextObservers, observer)
}
@ -428,22 +451,18 @@ func withObservers(observers []*observerContext, handleObserver func(observer *o
return nextObservers
}
func handleObject(ctx context.Context, observer *observerContext, object *Object) bool {
if observer.HandleError(observer.Object(ctx, object)) {
return false
func handleObject(ctx context.Context, observer *observerContext, object *Object) (err error) {
defer mon.Task()(&ctx)(&err)
if err := observer.Object(ctx, object); err != nil {
return err
}
select {
case <-observer.ctx.Done():
observer.HandleError(observer.ctx.Err())
return false
default:
}
return true
return observer.ctx.Err()
}
func handleSegment(ctx context.Context, observer *observerContext, location metabase.SegmentLocation, segment metabase.LoopSegmentEntry, expirationDate *time.Time) bool {
func handleSegment(ctx context.Context, observer *observerContext, location metabase.SegmentLocation, segment metabase.LoopSegmentEntry, expirationDate *time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
loopSegment := &Segment{
Location: location,
// TODO we are not setting this since multipart-upload branch, we need to
@ -461,23 +480,16 @@ func handleSegment(ctx context.Context, observer *observerContext, location meta
}
if loopSegment.Inline() {
if observer.HandleError(observer.InlineSegment(ctx, loopSegment)) {
return false
if err := observer.InlineSegment(ctx, loopSegment); err != nil {
return err
}
} else {
if observer.HandleError(observer.RemoteSegment(ctx, loopSegment)) {
return false
if err := observer.RemoteSegment(ctx, loopSegment); err != nil {
return err
}
}
select {
case <-observer.ctx.Done():
observer.HandleError(observer.ctx.Err())
return false
default:
}
return true
return observer.ctx.Err()
}
func finishObservers(observers []*observerContext) {