diff --git a/satellite/gc/bloomfilter/piecetracker.go b/satellite/gc/bloomfilter/piecetracker.go index 0607d4e8f..387933420 100644 --- a/satellite/gc/bloomfilter/piecetracker.go +++ b/satellite/gc/bloomfilter/piecetracker.go @@ -20,31 +20,33 @@ var _ segmentloop.Observer = (*PieceTracker)(nil) // RetainInfo contains info needed for a storage node to retain important data and delete garbage data. type RetainInfo struct { - Filter *bloomfilter.Filter - CreationDate time.Time - Count int + Filter *bloomfilter.Filter + Count int } -// PieceTracker implements the metainfo loop observer interface for garbage collection. +// PieceTracker implements the segments loop observer interface for garbage collection. // // architecture: Observer type PieceTracker struct { - log *zap.Logger - config Config - creationDate time.Time + log *zap.Logger + config Config // TODO: should we use int or int64 consistently for piece count (db type is int64)? pieceCounts map[storj.NodeID]int + startTime time.Time RetainInfos map[storj.NodeID]*RetainInfo + // LatestCreationTime will be used to set bloom filter CreationDate. + // Because bloom filter service needs to be run against immutable database snapshot + // we can set CreationDate for bloom filters as a latest segment CreatedAt value. + LatestCreationTime time.Time } -// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the metainfo loop. +// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the segments loop. func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int) *PieceTracker { return &PieceTracker{ - log: log, - config: config, - creationDate: time.Now().UTC(), - pieceCounts: pieceCounts, + log: log, + config: config, + pieceCounts: pieceCounts, RetainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)), } @@ -52,9 +54,7 @@ func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeI // LoopStarted is called at each start of a loop. func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) { - if pieceTracker.creationDate.After(info.Started) { - return errs.New("Creation date after loop starting time.") - } + pieceTracker.startTime = info.Started return nil } @@ -62,6 +62,18 @@ func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentl func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error { // we are expliticy not adding monitoring here as we are tracking loop observers separately + // sanity check to detect if loop is not running against live database + if segment.CreatedAt.After(pieceTracker.startTime) { + pieceTracker.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID), + zap.Time("loop started", pieceTracker.startTime), + zap.Time("segment created", segment.CreatedAt)) + return errs.New("segment created after loop started") + } + + if pieceTracker.LatestCreationTime.Before(segment.CreatedAt) { + pieceTracker.LatestCreationTime = segment.CreatedAt + } + deriver := segment.RootPieceID.Deriver() for _, piece := range segment.Pieces { pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number)) @@ -83,8 +95,7 @@ func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID // limit size of bloom filter to ensure we are under the limit for RPC filter := bloomfilter.NewOptimalMaxSize(numPieces, pieceTracker.config.FalsePositiveRate, 2*memory.MiB) info = &RetainInfo{ - Filter: filter, - CreationDate: pieceTracker.creationDate, + Filter: filter, } pieceTracker.RetainInfos[nodeID] = info } diff --git a/satellite/gc/bloomfilter/service.go b/satellite/gc/bloomfilter/service.go index 98704f7aa..3090f5bb4 100644 --- a/satellite/gc/bloomfilter/service.go +++ b/satellite/gc/bloomfilter/service.go @@ -40,7 +40,7 @@ type Config struct { ExpireIn time.Duration `help:"how quickly uploaded bloom filters will be automatically deleted" default:"336h"` } -// Service implements the garbage collection service. +// Service implements service to collect bloom filters for the garbage collection. // // architecture: Chore type Service struct { @@ -106,7 +106,7 @@ func (service *Service) RunOnce(ctx context.Context) (err error) { return nil } - err = service.uploadBloomFilters(ctx, pieceTracker.RetainInfos) + err = service.uploadBloomFilters(ctx, pieceTracker.LatestCreationTime, pieceTracker.RetainInfos) if err != nil { return err } @@ -116,7 +116,7 @@ func (service *Service) RunOnce(ctx context.Context) (err error) { return nil } -func (service *Service) uploadBloomFilters(ctx context.Context, retainInfos map[storj.NodeID]*RetainInfo) (err error) { +func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) { defer mon.Task()(&ctx)(&err) if len(retainInfos) == 0 { @@ -163,8 +163,10 @@ func (service *Service) uploadBloomFilters(ctx context.Context, retainInfos map[ batchNumber := 0 for nodeID, info := range retainInfos { infos = append(infos, internalpb.RetainInfo{ - Filter: info.Filter.Bytes(), - CreationDate: info.CreationDate, + Filter: info.Filter.Bytes(), + // because bloom filters should be created from immutable database + // snapshot we are using latest segment creation date + CreationDate: latestCreationDate, PieceCount: int64(info.Count), StorageNodeId: nodeID, })