satellite/gc: check on bloom filter creation date
Check that the bloom filter creation date is earlier than the metainfo loop system time used for db scanning. Change-Id: Ib0f47c124f5651deae0fd7e7996abcdcaac98fb4
This commit is contained in:
parent
2f39fc0f84
commit
bde367ae73
@ -175,3 +175,8 @@ func (progress *ProgressObserver) InlineSegment(context.Context, *metaloop.Segme
|
||||
progress.InlineSegmentCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (progress *ProgressObserver) LoopStarted(ctx context.Context, info metaloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
@ -273,6 +273,11 @@ func (observer *Observer) ensureBucket(ctx context.Context, location metabase.Ob
|
||||
return bucket
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (observer *Observer) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Object is called for each object once.
|
||||
func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) (err error) {
|
||||
if object.Expired(observer.Now) {
|
||||
|
@ -29,6 +29,11 @@ func NewCollector(reservoirSlots int, r *rand.Rand) *Collector {
|
||||
}
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (collector *Collector) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.
|
||||
func (collector *Collector) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
for _, piece := range segment.Pieces {
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/bloomfilter"
|
||||
@ -59,6 +60,14 @@ func (pieceTracker *PieceTracker) Object(ctx context.Context, object *metaloop.O
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info metaloop.LoopInfo) (err error) {
|
||||
if pieceTracker.creationDate.After(info.Started) {
|
||||
return errs.New("Creation date after metaloop starting time.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// InlineSegment returns nil because we're only doing gc for storage nodes for now.
|
||||
func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
return nil
|
||||
|
@ -49,6 +49,11 @@ func NewPathCollector(db DB, nodeIDs storj.NodeIDList, log *zap.Logger, batchSiz
|
||||
return collector
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (collector *PathCollector) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush persists the current buffer items to the database.
|
||||
func (collector *PathCollector) Flush(ctx context.Context) (err error) {
|
||||
return collector.flush(ctx, 1)
|
||||
|
@ -56,6 +56,12 @@ type Observer interface {
|
||||
Object(context.Context, *Object) error
|
||||
RemoteSegment(context.Context, *Segment) error
|
||||
InlineSegment(context.Context, *Segment) error
|
||||
LoopStarted(context.Context, LoopInfo) error
|
||||
}
|
||||
|
||||
// LoopInfo contains information about the current loop.
|
||||
type LoopInfo struct {
|
||||
Started time.Time
|
||||
}
|
||||
|
||||
// NullObserver is an observer that does nothing. This is useful for joining
|
||||
@ -77,6 +83,11 @@ func (NullObserver) InlineSegment(context.Context, *Segment) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoopStarted is called at each loop start.
|
||||
func (NullObserver) LoopStarted(context.Context, LoopInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type observerContext struct {
|
||||
observer Observer
|
||||
|
||||
@ -297,6 +308,14 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
|
||||
|
||||
noObserversErr := errs.New("no observers")
|
||||
|
||||
observers = withObservers(ctx, observers, func(ctx context.Context, observer *observerContext) bool {
|
||||
err := observer.observer.LoopStarted(ctx, LoopInfo{Started: startingTime})
|
||||
return !observer.HandleError(err)
|
||||
})
|
||||
|
||||
if len(observers) == 0 {
|
||||
return observers, noObserversErr
|
||||
}
|
||||
// TODO we may consider keeping only expiration time as its
|
||||
// only thing we need to handle segments
|
||||
objectsMap := make(map[uuid.UUID]metabase.LoopObjectEntry)
|
||||
|
@ -399,6 +399,11 @@ func newTestObserver(onSegment func(context.Context) error) *testObserver {
|
||||
}
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (obs *testObserver) LoopStarted(ctx context.Context, info metaloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obs *testObserver) RemoteSegment(ctx context.Context, segment *metaloop.Segment) error {
|
||||
obs.remoteSegCount++
|
||||
|
||||
|
@ -25,6 +25,11 @@ func NewCounter() *Counter {
|
||||
return &Counter{}
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (counter *Counter) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Object increments the count for total objects and for inline objects in case the object has no segments.
|
||||
func (counter *Counter) Object(ctx context.Context, object *metaloop.Object) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
@ -295,6 +295,11 @@ func (obs *checkerObserver) loadRedundancy(redundancy storj.RedundancyScheme) (i
|
||||
return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares)
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (obs *checkerObserver) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obs *checkerObserver) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user