satellite/repair/checker: allow for multipart objects
We have multipart objects so we may get multiple inline segments sequences or no segments at all for objects. Change-Id: Ie46ee777a2db8f18f7154e3443bb9e07ecb170f7
This commit is contained in:
parent
8093c666a6
commit
c860b74a37
@ -16,7 +16,6 @@ import (
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
@ -276,7 +275,9 @@ type checkerObserver struct {
|
||||
nodeFailureRate float64
|
||||
getNodesEstimate func(ctx context.Context) (int, error)
|
||||
log *zap.Logger
|
||||
streamIDCursor uuid.UUID
|
||||
|
||||
// we need to delay counting objects to ensure they get associated with the correct redundancy only once
|
||||
objectCounted bool
|
||||
}
|
||||
|
||||
func (obs *checkerObserver) getStatsByRS(redundancy storj.RedundancyScheme) *stats {
|
||||
@ -296,24 +297,18 @@ func (obs *checkerObserver) loadRedundancy(redundancy storj.RedundancyScheme) (i
|
||||
func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
stats := obs.getStatsByRS(segment.Redundancy)
|
||||
|
||||
if !obs.streamIDCursor.IsZero() {
|
||||
if obs.streamIDCursor != segment.StreamID {
|
||||
return Error.New("unexpected cursor: wants %s, got %s", segment.StreamID.String(), obs.streamIDCursor.String())
|
||||
}
|
||||
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
|
||||
// reset the cursor to ensure we don't count multi-segment objects more than once.
|
||||
obs.streamIDCursor = uuid.UUID{}
|
||||
}
|
||||
|
||||
// ignore segment if expired
|
||||
if segment.Expired(time.Now()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
stats := obs.getStatsByRS(segment.Redundancy)
|
||||
|
||||
if !obs.objectCounted {
|
||||
obs.objectCounted = true
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
}
|
||||
|
||||
obs.monStats.remoteSegmentsChecked++
|
||||
stats.iterationAggregates.remoteSegmentsChecked++
|
||||
|
||||
@ -463,17 +458,14 @@ func (obs *checkerObserver) Object(ctx context.Context, object *metainfo.Object)
|
||||
|
||||
obs.monStats.objectsChecked++
|
||||
|
||||
// TODO: check for expired objects
|
||||
|
||||
if object.SegmentCount == 0 {
|
||||
stats := obs.getStatsByRS(storj.RedundancyScheme{})
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
return nil
|
||||
}
|
||||
|
||||
if !obs.streamIDCursor.IsZero() {
|
||||
return Error.New("unexpected cursor: wants zero, got %s", obs.streamIDCursor.String())
|
||||
}
|
||||
|
||||
obs.streamIDCursor = object.StreamID
|
||||
obs.objectCounted = false
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -481,20 +473,15 @@ func (obs *checkerObserver) Object(ctx context.Context, object *metainfo.Object)
|
||||
func (obs *checkerObserver) InlineSegment(ctx context.Context, segment *metainfo.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if obs.streamIDCursor.IsZero() {
|
||||
return nil
|
||||
// TODO: check for expired segments
|
||||
|
||||
if !obs.objectCounted {
|
||||
// Note: this may give may give false stats when an object starts with a inline segment.
|
||||
obs.objectCounted = true
|
||||
stats := obs.getStatsByRS(storj.RedundancyScheme{})
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
}
|
||||
|
||||
if obs.streamIDCursor != segment.StreamID {
|
||||
return Error.New("unexpected cursor: wants %s, got %s", segment.StreamID.String(), obs.streamIDCursor.String())
|
||||
}
|
||||
|
||||
stats := obs.getStatsByRS(storj.RedundancyScheme{})
|
||||
stats.iterationAggregates.objectsChecked++
|
||||
|
||||
// reset the cursor to ensure we don't count multi-segment objects more than once.
|
||||
obs.streamIDCursor = uuid.UUID{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user