From 90eded4d99b3714e5394e914e17375e64c5960e2 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Thu, 1 Sep 2022 10:53:09 +0200 Subject: [PATCH] satellite/gc/bloomfilter: take CreationDate from latest segment Bloom filter CreationDate is used to avoid deleting pieces that where not processed by GC. Every piece created after that timestamp won't be deleted. Current GC process is taking CreationDate as a beginning of bloom filter creation. This is appraoch allows to avoid issues with inconsistent view on DB as currently we are using live DB to create bloom filters. With appraoch were we will be using DB snaphot with segment loop we can get CreationDate from latest created segment in DB. Every piece created after latest created segment won't be touched by GC on storage node. Updates https://github.com/storj/team-metainfo/issues/120 Change-Id: I6aaf64948ab7f60cfea62195689ad77c25ea772e --- satellite/gc/bloomfilter/piecetracker.go | 45 +++++++++++++++--------- satellite/gc/bloomfilter/service.go | 12 ++++--- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/satellite/gc/bloomfilter/piecetracker.go b/satellite/gc/bloomfilter/piecetracker.go index 0607d4e8f..387933420 100644 --- a/satellite/gc/bloomfilter/piecetracker.go +++ b/satellite/gc/bloomfilter/piecetracker.go @@ -20,31 +20,33 @@ var _ segmentloop.Observer = (*PieceTracker)(nil) // RetainInfo contains info needed for a storage node to retain important data and delete garbage data. type RetainInfo struct { - Filter *bloomfilter.Filter - CreationDate time.Time - Count int + Filter *bloomfilter.Filter + Count int } -// PieceTracker implements the metainfo loop observer interface for garbage collection. +// PieceTracker implements the segments loop observer interface for garbage collection. // // architecture: Observer type PieceTracker struct { - log *zap.Logger - config Config - creationDate time.Time + log *zap.Logger + config Config // TODO: should we use int or int64 consistently for piece count (db type is int64)? pieceCounts map[storj.NodeID]int + startTime time.Time RetainInfos map[storj.NodeID]*RetainInfo + // LatestCreationTime will be used to set bloom filter CreationDate. + // Because bloom filter service needs to be run against immutable database snapshot + // we can set CreationDate for bloom filters as a latest segment CreatedAt value. + LatestCreationTime time.Time } -// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the metainfo loop. +// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the segments loop. func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int) *PieceTracker { return &PieceTracker{ - log: log, - config: config, - creationDate: time.Now().UTC(), - pieceCounts: pieceCounts, + log: log, + config: config, + pieceCounts: pieceCounts, RetainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)), } @@ -52,9 +54,7 @@ func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeI // LoopStarted is called at each start of a loop. func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) { - if pieceTracker.creationDate.After(info.Started) { - return errs.New("Creation date after loop starting time.") - } + pieceTracker.startTime = info.Started return nil } @@ -62,6 +62,18 @@ func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentl func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error { // we are expliticy not adding monitoring here as we are tracking loop observers separately + // sanity check to detect if loop is not running against live database + if segment.CreatedAt.After(pieceTracker.startTime) { + pieceTracker.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID), + zap.Time("loop started", pieceTracker.startTime), + zap.Time("segment created", segment.CreatedAt)) + return errs.New("segment created after loop started") + } + + if pieceTracker.LatestCreationTime.Before(segment.CreatedAt) { + pieceTracker.LatestCreationTime = segment.CreatedAt + } + deriver := segment.RootPieceID.Deriver() for _, piece := range segment.Pieces { pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number)) @@ -83,8 +95,7 @@ func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID // limit size of bloom filter to ensure we are under the limit for RPC filter := bloomfilter.NewOptimalMaxSize(numPieces, pieceTracker.config.FalsePositiveRate, 2*memory.MiB) info = &RetainInfo{ - Filter: filter, - CreationDate: pieceTracker.creationDate, + Filter: filter, } pieceTracker.RetainInfos[nodeID] = info } diff --git a/satellite/gc/bloomfilter/service.go b/satellite/gc/bloomfilter/service.go index 98704f7aa..3090f5bb4 100644 --- a/satellite/gc/bloomfilter/service.go +++ b/satellite/gc/bloomfilter/service.go @@ -40,7 +40,7 @@ type Config struct { ExpireIn time.Duration `help:"how quickly uploaded bloom filters will be automatically deleted" default:"336h"` } -// Service implements the garbage collection service. +// Service implements service to collect bloom filters for the garbage collection. // // architecture: Chore type Service struct { @@ -106,7 +106,7 @@ func (service *Service) RunOnce(ctx context.Context) (err error) { return nil } - err = service.uploadBloomFilters(ctx, pieceTracker.RetainInfos) + err = service.uploadBloomFilters(ctx, pieceTracker.LatestCreationTime, pieceTracker.RetainInfos) if err != nil { return err } @@ -116,7 +116,7 @@ func (service *Service) RunOnce(ctx context.Context) (err error) { return nil } -func (service *Service) uploadBloomFilters(ctx context.Context, retainInfos map[storj.NodeID]*RetainInfo) (err error) { +func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) { defer mon.Task()(&ctx)(&err) if len(retainInfos) == 0 { @@ -163,8 +163,10 @@ func (service *Service) uploadBloomFilters(ctx context.Context, retainInfos map[ batchNumber := 0 for nodeID, info := range retainInfos { infos = append(infos, internalpb.RetainInfo{ - Filter: info.Filter.Bytes(), - CreationDate: info.CreationDate, + Filter: info.Filter.Bytes(), + // because bloom filters should be created from immutable database + // snapshot we are using latest segment creation date + CreationDate: latestCreationDate, PieceCount: int64(info.Count), StorageNodeId: nodeID, })