satellite/metaloop: missing monitoring on observers
Change-Id: I630fbb0448c8d08b426486b3e49abfbca03332a6
This commit is contained in:
parent
ce87652a8c
commit
6949dc0bac
@ -280,6 +280,8 @@ func (observer *Observer) LoopStarted(context.Context, metaloop.LoopInfo) (err e
|
||||
|
||||
// Object is called for each object once.
|
||||
func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if object.Expired(observer.Now) {
|
||||
return nil
|
||||
}
|
||||
@ -293,6 +295,8 @@ func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) (
|
||||
|
||||
// InlineSegment is called for each inline segment.
|
||||
func (observer *Observer) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if segment.Expired(observer.Now) {
|
||||
return nil
|
||||
}
|
||||
@ -306,6 +310,8 @@ func (observer *Observer) InlineSegment(ctx context.Context, segment *metaloop.S
|
||||
|
||||
// RemoteSegment is called for each remote segment.
|
||||
func (observer *Observer) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if segment.Expired(observer.Now) {
|
||||
return nil
|
||||
}
|
||||
|
@ -36,6 +36,8 @@ func (collector *Collector) LoopStarted(context.Context, metaloop.LoopInfo) (err
|
||||
|
||||
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.
|
||||
func (collector *Collector) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
for _, piece := range segment.Pieces {
|
||||
if _, ok := collector.Reservoirs[piece.StorageNode]; !ok {
|
||||
collector.Reservoirs[piece.StorageNode] = NewReservoir(collector.slotCount)
|
||||
|
@ -56,11 +56,14 @@ func (collector *PathCollector) LoopStarted(context.Context, metaloop.LoopInfo)
|
||||
|
||||
// Flush persists the current buffer items to the database.
|
||||
func (collector *PathCollector) Flush(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return collector.flush(ctx, 1)
|
||||
}
|
||||
|
||||
// RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already.
|
||||
func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(collector.nodeIDStorage) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -114,6 +117,8 @@ func (collector *PathCollector) InlineSegment(ctx context.Context, segment *meta
|
||||
}
|
||||
|
||||
func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(collector.buffer) >= limit {
|
||||
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize)
|
||||
collector.buffer = collector.buffer[:0]
|
||||
|
@ -200,7 +200,7 @@ type IterateLoopStreams struct {
|
||||
}
|
||||
|
||||
// SegmentIterator returns the next segment.
|
||||
type SegmentIterator func(segment *LoopSegmentEntry) bool
|
||||
type SegmentIterator func(ctx context.Context, segment *LoopSegmentEntry) bool
|
||||
|
||||
// LoopSegmentEntry contains information about segment metadata needed by metainfo loop.
|
||||
type LoopSegmentEntry struct {
|
||||
@ -270,7 +270,8 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
|
||||
for _, streamID := range opts.StreamIDs {
|
||||
streamID := streamID
|
||||
var internalError error
|
||||
err := handleStream(ctx, streamID, func(output *LoopSegmentEntry) bool {
|
||||
err := handleStream(ctx, streamID, func(ctx context.Context, output *LoopSegmentEntry) bool {
|
||||
mon.TaskNamed("handleStreamCB-SegmentIterator")(&ctx)(nil)
|
||||
if nextSegment != nil {
|
||||
if nextSegment.StreamID != streamID {
|
||||
return false
|
||||
|
@ -366,7 +366,7 @@ func (step IterateLoopStreams) Check(ctx *testcontext.Context, t testing.TB, db
|
||||
var segments []metabase.LoopSegmentEntry
|
||||
for {
|
||||
var segment metabase.LoopSegmentEntry
|
||||
if !next(&segment) {
|
||||
if !next(ctx, &segment) {
|
||||
break
|
||||
}
|
||||
segments = append(segments, segment)
|
||||
|
@ -446,7 +446,7 @@ func (loop *Service) iterateObjects(ctx context.Context, observers []*observerCo
|
||||
}
|
||||
|
||||
var segment metabase.LoopSegmentEntry
|
||||
if !next(&segment) {
|
||||
if !next(ctx, &segment) {
|
||||
break
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user