From 2592aaef9cfbf1d4a499340fd8eacf3c0fad88b8 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Tue, 9 May 2023 12:09:20 +0200 Subject: [PATCH] satellite/gc/bloomfilter: remove segments loop parts We are switching completely to ranged loop. https://github.com/storj/storj/issues/5368 Change-Id: I1a22ac4b242998e287b2b7d8167b64e850b61a0f --- private/testplanet/satellite.go | 5 +- satellite/gc-bf.go | 83 ++---- satellite/gc-bf_test.go | 16 -- satellite/gc/bloomfilter/config.go | 22 ++ satellite/gc/bloomfilter/doc.go | 16 +- satellite/gc/bloomfilter/observer.go | 115 +++++++- satellite/gc/bloomfilter/observer_test.go | 6 - satellite/gc/bloomfilter/piecetracker.go | 134 --------- satellite/gc/bloomfilter/service.go | 286 -------------------- satellite/gc/bloomfilter/service_test.go | 193 ------------- satellite/gc/bloomfilter/upload.go | 2 +- satellite/gc/gc_test.go | 58 ++-- satellite/gc/sender/service.go | 12 +- satellite/gc/sender/service_test.go | 22 +- satellite/metabase/rangedloop/service.go | 1 + satellite/rangedloop.go | 13 - scripts/testdata/satellite-config.yaml.lock | 9 - 17 files changed, 221 insertions(+), 772 deletions(-) create mode 100644 satellite/gc/bloomfilter/config.go delete mode 100644 satellite/gc/bloomfilter/piecetracker.go delete mode 100644 satellite/gc/bloomfilter/service.go delete mode 100644 satellite/gc/bloomfilter/service_test.go diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index c17ab95d7..5a4511d56 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -40,7 +40,6 @@ import ( "storj.io/storj/satellite/console/consoleweb" "storj.io/storj/satellite/console/userinfo" "storj.io/storj/satellite/contact" - "storj.io/storj/satellite/gc/bloomfilter" "storj.io/storj/satellite/gc/sender" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/inspector" @@ -152,8 +151,7 @@ type Satellite struct { } GarbageCollection struct { - Sender *sender.Service - BloomFilters *bloomfilter.Service + Sender *sender.Service } ExpiredDeletion struct { @@ -639,7 +637,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.Audit.ContainmentSyncChore = peer.Audit.ContainmentSyncChore system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender - system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore diff --git a/satellite/gc-bf.go b/satellite/gc-bf.go index 9f6e3632f..e2c286d87 100644 --- a/satellite/gc-bf.go +++ b/satellite/gc-bf.go @@ -49,8 +49,7 @@ type GarbageCollectionBF struct { } GarbageCollection struct { - Config bloomfilter.Config - Service *bloomfilter.Service + Config bloomfilter.Config } RangedLoop struct { @@ -95,65 +94,31 @@ func NewGarbageCollectionBF(log *zap.Logger, db DB, metabaseDB *metabase.DB, rev { // setup garbage collection bloom filters log := peer.Log.Named("garbage-collection-bf") peer.GarbageCollection.Config = config.GarbageCollectionBF - if config.GarbageCollectionBF.UseRangedLoop { - log.Info("using ranged loop") - var observer rangedloop.Observer - if config.GarbageCollectionBF.UseSyncObserver { - observer = bloomfilter.NewSyncObserver(log.Named("gc-bf"), - config.GarbageCollectionBF, - peer.Overlay.DB, - ) - } else { - observer = bloomfilter.NewObserver(log.Named("gc-bf"), - config.GarbageCollectionBF, - peer.Overlay.DB, - ) - } - - provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize) - peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{observer}) - - if !config.GarbageCollectionBF.RunOnce { - peer.Services.Add(lifecycle.Item{ - Name: "garbage-collection-bf", - Run: peer.RangedLoop.Service.Run, - Close: peer.RangedLoop.Service.Close, - }) - peer.Debug.Server.Panel.Add( - debug.Cycle("Garbage Collection Bloom Filters", peer.RangedLoop.Service.Loop)) - } - } else { - log.Info("using segments loop") - - { // setup metainfo - peer.Metainfo.SegmentLoop = segmentloop.New( - log.Named("segmentloop"), - config.Metainfo.SegmentLoop, - metabaseDB, - ) - peer.Services.Add(lifecycle.Item{ - Name: "metainfo:segmentloop", - Run: peer.Metainfo.SegmentLoop.Run, - Close: peer.Metainfo.SegmentLoop.Close, - }) - } - - peer.GarbageCollection.Service = bloomfilter.NewService( - log, + var observer rangedloop.Observer + if config.GarbageCollectionBF.UseSyncObserver { + observer = bloomfilter.NewSyncObserver(log.Named("gc-bf"), config.GarbageCollectionBF, peer.Overlay.DB, - peer.Metainfo.SegmentLoop, ) + } else { + observer = bloomfilter.NewObserver(log.Named("gc-bf"), + config.GarbageCollectionBF, + peer.Overlay.DB, + ) + } - if !config.GarbageCollectionBF.RunOnce { - peer.Services.Add(lifecycle.Item{ - Name: "garbage-collection-bf", - Run: peer.GarbageCollection.Service.Run, - }) - peer.Debug.Server.Panel.Add( - debug.Cycle("Garbage Collection Bloom Filters", peer.GarbageCollection.Service.Loop)) - } + provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize) + peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{observer}) + + if !config.GarbageCollectionBF.RunOnce { + peer.Services.Add(lifecycle.Item{ + Name: "garbage-collection-bf", + Run: peer.RangedLoop.Service.Run, + Close: peer.RangedLoop.Service.Close, + }) + peer.Debug.Server.Panel.Add( + debug.Cycle("Garbage Collection Bloom Filters", peer.RangedLoop.Service.Loop)) } } @@ -175,11 +140,7 @@ func (peer *GarbageCollectionBF) Run(ctx context.Context) (err error) { if peer.GarbageCollection.Config.RunOnce { group.Go(func() error { - if peer.GarbageCollection.Config.UseRangedLoop { - _, err = peer.RangedLoop.Service.RunOnce(ctx) - } else { - err = peer.GarbageCollection.Service.RunOnce(ctx) - } + _, err = peer.RangedLoop.Service.RunOnce(ctx) cancel() return err }) diff --git a/satellite/gc-bf_test.go b/satellite/gc-bf_test.go index 6d8f73076..6c2a00bf4 100644 --- a/satellite/gc-bf_test.go +++ b/satellite/gc-bf_test.go @@ -20,22 +20,6 @@ func TestGCBFUseRangedLoop(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.GarbageCollectionBF.RunOnce = true - config.GarbageCollectionBF.UseRangedLoop = true - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - err := planet.Satellites[0].GCBF.Run(ctx) - require.NoError(t, err) - }) -} - -func TestGCBFUseSegmentsLoop(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.GarbageCollectionBF.RunOnce = true - config.GarbageCollectionBF.UseRangedLoop = false }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { diff --git a/satellite/gc/bloomfilter/config.go b/satellite/gc/bloomfilter/config.go new file mode 100644 index 000000000..d0ebbea76 --- /dev/null +++ b/satellite/gc/bloomfilter/config.go @@ -0,0 +1,22 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package bloomfilter + +import "time" + +// Config contains configurable values for garbage collection. +type Config struct { + RunOnce bool `help:"set if garbage collection bloom filter process should only run once then exit" default:"false"` + + UseSyncObserver bool `help:"whether to use test GC SyncObserver with ranged loop" default:"false"` + + // value for InitialPieces currently based on average pieces per node + InitialPieces int64 `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"` + FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"` + + AccessGrant string `help:"Access Grant which will be used to upload bloom filters to the bucket" default:""` + Bucket string `help:"Bucket which will be used to upload bloom filters" default:"" testDefault:"gc-queue"` // TODO do we need full location? + ZipBatchSize int `help:"how many bloom filters will be packed in a single zip" default:"500" testDefault:"2"` + ExpireIn time.Duration `help:"how long bloom filters will remain in the bucket for gc/sender to consume before being automatically deleted" default:"336h"` +} diff --git a/satellite/gc/bloomfilter/doc.go b/satellite/gc/bloomfilter/doc.go index 11be0f5c4..1db0d07f9 100644 --- a/satellite/gc/bloomfilter/doc.go +++ b/satellite/gc/bloomfilter/doc.go @@ -5,18 +5,18 @@ Package bloomfilter contains the functions needed to run part of garbage collection process. -The bloomfilter.PieceTracker implements the segments loop Observer interface +The bloomfilter.Observer implements the ranged loop Observer interface allowing us to subscribe to the loop to get information for every segment in the metabase db. -The bloomfilter.PieceTracker handling functions are used by the bloomfilter.Service -to periodically account for all existing pieces on storage nodes and create -"retain requests" which contain a bloom filter of all pieces that possibly exist -on a storage node. +The bloomfilter.Observer is subscribed to ranged loop instance to account for all +existing segment pieces on storage nodes and create "retain requests" which contain +a bloom filter of all pieces that possibly exist on a storage node. With ranged loop +segments can be processed in parallel to speed up process. -The bloomfilter.Service will send that requests to the Storj bucket after a full -segments loop iteration. After that bloom filters will be downloaded and sent -to the storage nodes with separate service from storj/satellite/gc package. +The bloomfilter.Observer will send that requests to the Storj bucket after a full +ranged loop iteration. After that bloom filters will be downloaded and sent +to the storage nodes with separate service from storj/satellite/gc/sender package. This bloom filter service should be run only against immutable database snapshot. diff --git a/satellite/gc/bloomfilter/observer.go b/satellite/gc/bloomfilter/observer.go index c3301f6eb..78f10f2e7 100644 --- a/satellite/gc/bloomfilter/observer.go +++ b/satellite/gc/bloomfilter/observer.go @@ -7,16 +7,26 @@ import ( "context" "time" + "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/bloomfilter" + "storj.io/common/memory" "storj.io/common/storj" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" ) +var mon = monkit.Package() + +// RetainInfo contains info needed for a storage node to retain important data and delete garbage data. +type RetainInfo struct { + Filter *bloomfilter.Filter + Count int +} + // Observer implements a rangedloop observer to collect bloom filters for the garbage collection. // // architecture: Observer @@ -35,6 +45,7 @@ type Observer struct { } var _ (rangedloop.Observer) = (*Observer)(nil) +var _ (rangedloop.Partial) = (*observerFork)(nil) // NewObserver creates a new instance of the gc rangedloop observer. func NewObserver(log *zap.Logger, config Config, overlay overlay.DB) *Observer { @@ -77,27 +88,20 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) // Fork creates a Partial to build bloom filters over a chunk of all the segments. func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) { defer mon.Task()(&ctx)(&err) - // TODO: refactor PieceTracker after the segmentloop has been removed to - // more closely match the rangedloop observer needs. - pieceTracker := NewPieceTrackerWithSeed(obs.log.Named("gc observer"), obs.config, obs.lastPieceCounts, obs.seed) - if err := pieceTracker.LoopStarted(ctx, segmentloop.LoopInfo{ - Started: obs.startTime, - }); err != nil { - return nil, err - } - return pieceTracker, nil + + return newObserverFork(obs.log.Named("gc observer"), obs.config, obs.lastPieceCounts, obs.seed, obs.startTime), nil } // Join merges the bloom filters gathered by each Partial. func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) { defer mon.Task()(&ctx)(&err) - pieceTracker, ok := partial.(*PieceTracker) + pieceTracker, ok := partial.(*observerFork) if !ok { return errs.New("expected %T but got %T", pieceTracker, partial) } // Update the count and merge the bloom filters for each node. - for nodeID, retainInfo := range pieceTracker.RetainInfos { + for nodeID, retainInfo := range pieceTracker.retainInfos { if existing, ok := obs.retainInfos[nodeID]; ok { existing.Count += retainInfo.Count if err := existing.Filter.AddFilter(retainInfo.Filter); err != nil { @@ -109,8 +113,8 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err } // Replace the latestCreationTime if the partial observed a later time. - if obs.latestCreationTime.IsZero() || obs.latestCreationTime.Before(pieceTracker.LatestCreationTime) { - obs.latestCreationTime = pieceTracker.LatestCreationTime + if obs.latestCreationTime.IsZero() || obs.latestCreationTime.Before(pieceTracker.latestCreationTime) { + obs.latestCreationTime = pieceTracker.latestCreationTime } return nil @@ -125,3 +129,88 @@ func (obs *Observer) Finish(ctx context.Context) (err error) { obs.log.Debug("collecting bloom filters finished") return nil } + +// TestingRetainInfos returns retain infos collected by observer. +func (obs *Observer) TestingRetainInfos() map[storj.NodeID]*RetainInfo { + return obs.retainInfos +} + +type observerFork struct { + log *zap.Logger + config Config + // TODO: should we use int or int64 consistently for piece count (db type is int64)? + pieceCounts map[storj.NodeID]int64 + seed byte + startTime time.Time + + retainInfos map[storj.NodeID]*RetainInfo + // latestCreationTime will be used to set bloom filter CreationDate. + // Because bloom filter service needs to be run against immutable database snapshot + // we can set CreationDate for bloom filters as a latest segment CreatedAt value. + latestCreationTime time.Time +} + +// newObserverFork instantiates a new observer fork to process different segment range. +// The seed is passed so that it can be shared among all parallel forks. +func newObserverFork(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int64, seed byte, startTime time.Time) *observerFork { + return &observerFork{ + log: log, + config: config, + pieceCounts: pieceCounts, + seed: seed, + startTime: startTime, + + retainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)), + } +} + +// Process adds pieces to the bloom filter from remote segments. +func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error { + for _, segment := range segments { + if segment.Inline() { + continue + } + + // sanity check to detect if loop is not running against live database + if segment.CreatedAt.After(fork.startTime) { + fork.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID), + zap.Time("loop started", fork.startTime), + zap.Time("segment created", segment.CreatedAt)) + return errs.New("segment created after loop started") + } + + if fork.latestCreationTime.Before(segment.CreatedAt) { + fork.latestCreationTime = segment.CreatedAt + } + + deriver := segment.RootPieceID.Deriver() + for _, piece := range segment.Pieces { + pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number)) + fork.add(piece.StorageNode, pieceID) + } + } + return nil +} + +// add adds a pieceID to the relevant node's RetainInfo. +func (fork *observerFork) add(nodeID storj.NodeID, pieceID storj.PieceID) { + info, ok := fork.retainInfos[nodeID] + if !ok { + // If we know how many pieces a node should be storing, use that number. Otherwise use default. + numPieces := fork.config.InitialPieces + if pieceCounts := fork.pieceCounts[nodeID]; pieceCounts > 0 { + numPieces = pieceCounts + } + + hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, fork.config.FalsePositiveRate, 2*memory.MiB) + // limit size of bloom filter to ensure we are under the limit for RPC + filter := bloomfilter.NewExplicit(fork.seed, hashCount, tableSize) + info = &RetainInfo{ + Filter: filter, + } + fork.retainInfos[nodeID] = info + } + + info.Filter.Add(pieceID) + info.Count++ +} diff --git a/satellite/gc/bloomfilter/observer_test.go b/satellite/gc/bloomfilter/observer_test.go index bcb40d259..7915850da 100644 --- a/satellite/gc/bloomfilter/observer_test.go +++ b/satellite/gc/bloomfilter/observer_test.go @@ -74,8 +74,6 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) { // directly, as is already done for the service. Maybe we can // improve this later. config := planet.Satellites[0].Config.GarbageCollectionBF - config.Enabled = true - config.UseRangedLoop = true config.AccessGrant = accessString config.Bucket = tc.Bucket config.ZipBatchSize = tc.ZipBatchSize @@ -201,10 +199,8 @@ func TestObserverGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T) // directly, as is already done for the service. Maybe we can // improve this later. config := planet.Satellites[0].Config.GarbageCollectionBF - config.Enabled = true config.AccessGrant = accessString config.Bucket = "bloomfilters" - config.UseRangedLoop = true observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB) // TODO: see comment above. ideally this should use the rangedloop @@ -273,10 +269,8 @@ func TestObserverGarbageCollection_MultipleRanges(t *testing.T) { // directly, as is already done for the service. Maybe we can // improve this later. config := planet.Satellites[0].Config.GarbageCollectionBF - config.Enabled = true config.AccessGrant = accessString config.Bucket = "bloomfilters" - config.UseRangedLoop = true observers := []rangedloop.Observer{ bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB), bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB), diff --git a/satellite/gc/bloomfilter/piecetracker.go b/satellite/gc/bloomfilter/piecetracker.go deleted file mode 100644 index cd49eb9c0..000000000 --- a/satellite/gc/bloomfilter/piecetracker.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (C) 2022 Storj Labs, Inc. -// See LICENSE for copying information. - -package bloomfilter - -import ( - "context" - "time" - - "github.com/zeebo/errs" - "go.uber.org/zap" - - "storj.io/common/bloomfilter" - "storj.io/common/memory" - "storj.io/common/storj" - "storj.io/storj/satellite/metabase/segmentloop" -) - -var _ segmentloop.Observer = (*PieceTracker)(nil) - -// RetainInfo contains info needed for a storage node to retain important data and delete garbage data. -type RetainInfo struct { - Filter *bloomfilter.Filter - Count int -} - -// PieceTracker implements the segments loop observer interface for garbage collection. -// -// architecture: Observer -type PieceTracker struct { - log *zap.Logger - config Config - // TODO: should we use int or int64 consistently for piece count (db type is int64)? - pieceCounts map[storj.NodeID]int64 - seed byte - startTime time.Time - - RetainInfos map[storj.NodeID]*RetainInfo - // LatestCreationTime will be used to set bloom filter CreationDate. - // Because bloom filter service needs to be run against immutable database snapshot - // we can set CreationDate for bloom filters as a latest segment CreatedAt value. - LatestCreationTime time.Time -} - -// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the segments loop. -func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int64) *PieceTracker { - return NewPieceTrackerWithSeed(log, config, pieceCounts, bloomfilter.GenerateSeed()) -} - -// NewPieceTrackerWithSeed instantiates a new gc piece tracker to be subscribed -// to the rangedloop. The seed is passed so that it can be shared among all -// parallel PieceTrackers handling each segment range. -func NewPieceTrackerWithSeed(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int64, seed byte) *PieceTracker { - return &PieceTracker{ - log: log, - config: config, - pieceCounts: pieceCounts, - seed: seed, - - RetainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)), - } -} - -// LoopStarted is called at each start of a loop. -func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) { - pieceTracker.startTime = info.Started - return nil -} - -// RemoteSegment takes a remote segment found in metabase and adds pieces to bloom filters. -func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error { - // we are expliticy not adding monitoring here as we are tracking loop observers separately - - // sanity check to detect if loop is not running against live database - if segment.CreatedAt.After(pieceTracker.startTime) { - pieceTracker.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID), - zap.Time("loop started", pieceTracker.startTime), - zap.Time("segment created", segment.CreatedAt)) - return errs.New("segment created after loop started") - } - - if pieceTracker.LatestCreationTime.Before(segment.CreatedAt) { - pieceTracker.LatestCreationTime = segment.CreatedAt - } - - deriver := segment.RootPieceID.Deriver() - for _, piece := range segment.Pieces { - pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number)) - pieceTracker.add(piece.StorageNode, pieceID) - } - - return nil -} - -// add adds a pieceID to the relevant node's RetainInfo. -func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID) { - info, ok := pieceTracker.RetainInfos[nodeID] - if !ok { - // If we know how many pieces a node should be storing, use that number. Otherwise use default. - numPieces := pieceTracker.config.InitialPieces - if pieceCounts := pieceTracker.pieceCounts[nodeID]; pieceCounts > 0 { - numPieces = pieceCounts - } - - hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, pieceTracker.config.FalsePositiveRate, 2*memory.MiB) - // limit size of bloom filter to ensure we are under the limit for RPC - filter := bloomfilter.NewExplicit(pieceTracker.seed, hashCount, tableSize) - info = &RetainInfo{ - Filter: filter, - } - pieceTracker.RetainInfos[nodeID] = info - } - - info.Filter.Add(pieceID) - info.Count++ -} - -// InlineSegment returns nil because we're only doing gc for storage nodes for now. -func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) { - return nil -} - -// Process adds pieces to the bloom filter from remote segments. -func (pieceTracker *PieceTracker) Process(ctx context.Context, segments []segmentloop.Segment) error { - for _, segment := range segments { - if segment.Inline() { - continue - } - if err := pieceTracker.RemoteSegment(ctx, &segment); err != nil { - return err - } - } - return nil -} diff --git a/satellite/gc/bloomfilter/service.go b/satellite/gc/bloomfilter/service.go deleted file mode 100644 index def5eb3ee..000000000 --- a/satellite/gc/bloomfilter/service.go +++ /dev/null @@ -1,286 +0,0 @@ -// Copyright (C) 2022 Storj Labs, Inc. -// See LICENSE for copying information. - -package bloomfilter - -import ( - "archive/zip" - "context" - "strconv" - "time" - - "github.com/spacemonkeygo/monkit/v3" - "github.com/zeebo/errs" - "go.uber.org/zap" - - "storj.io/common/pb" - "storj.io/common/storj" - "storj.io/common/sync2" - "storj.io/storj/satellite/internalpb" - "storj.io/storj/satellite/metabase/segmentloop" - "storj.io/storj/satellite/overlay" - "storj.io/uplink" -) - -var mon = monkit.Package() - -// Config contains configurable values for garbage collection. -type Config struct { - Interval time.Duration `help:"the time between each garbage collection executions" releaseDefault:"120h" devDefault:"10m" testDefault:"$TESTINTERVAL"` - // TODO service is not enabled by default for testing until will be finished - Enabled bool `help:"set if garbage collection bloom filters is enabled or not" default:"true" testDefault:"false"` - - RunOnce bool `help:"set if garbage collection bloom filter process should only run once then exit" default:"false"` - - UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"` - UseSyncObserver bool `help:"whether to use test GC SyncObserver with ranged loop" default:"false"` - - // value for InitialPieces currently based on average pieces per node - InitialPieces int64 `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"` - FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"` - - AccessGrant string `help:"Access Grant which will be used to upload bloom filters to the bucket" default:""` - Bucket string `help:"Bucket which will be used to upload bloom filters" default:"" testDefault:"gc-queue"` // TODO do we need full location? - ZipBatchSize int `help:"how many bloom filters will be packed in a single zip" default:"500" testDefault:"2"` - ExpireIn time.Duration `help:"how long bloom filters will remain in the bucket for gc/sender to consume before being automatically deleted" default:"336h"` -} - -// Service implements service to collect bloom filters for the garbage collection. -// -// architecture: Chore -type Service struct { - log *zap.Logger - config Config - Loop *sync2.Cycle - - overlay overlay.DB - segmentLoop *segmentloop.Service -} - -// NewService creates a new instance of the gc service. -func NewService(log *zap.Logger, config Config, overlay overlay.DB, loop *segmentloop.Service) *Service { - return &Service{ - log: log, - config: config, - Loop: sync2.NewCycle(config.Interval), - overlay: overlay, - segmentLoop: loop, - } -} - -// Run starts the gc loop service. -func (service *Service) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - if !service.config.Enabled { - return nil - } - - switch { - case service.config.AccessGrant == "": - return errs.New("Access Grant is not set") - case service.config.Bucket == "": - return errs.New("Bucket is not set") - } - - return service.Loop.Run(ctx, service.RunOnce) -} - -// RunOnce runs service only once. -func (service *Service) RunOnce(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - service.log.Debug("collecting bloom filters started") - - // load last piece counts from overlay db - lastPieceCounts, err := service.overlay.AllPieceCounts(ctx) - if err != nil { - service.log.Error("error getting last piece counts", zap.Error(err)) - err = nil - } - if lastPieceCounts == nil { - lastPieceCounts = make(map[storj.NodeID]int64) - } - - pieceTracker := NewPieceTracker(service.log.Named("gc observer"), service.config, lastPieceCounts) - - // collect things to retain - err = service.segmentLoop.Join(ctx, pieceTracker) - if err != nil { - service.log.Error("error joining metainfoloop", zap.Error(err)) - return nil - } - - err = service.uploadBloomFilters(ctx, pieceTracker.LatestCreationTime, pieceTracker.RetainInfos) - if err != nil { - return err - } - - service.log.Debug("collecting bloom filters finished") - - return nil -} - -// uploadBloomFilters stores a zipfile with multiple bloom filters in a bucket. -func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) { - defer mon.Task()(&ctx)(&err) - - if len(retainInfos) == 0 { - return nil - } - - prefix := time.Now().Format(time.RFC3339) - - expirationTime := time.Now().Add(service.config.ExpireIn) - - accessGrant, err := uplink.ParseAccess(service.config.AccessGrant) - if err != nil { - return err - } - - project, err := uplink.OpenProject(ctx, accessGrant) - if err != nil { - return err - } - defer func() { - // do cleanup in case of any error while uploading bloom filters - if err != nil { - // TODO should we drop whole bucket if cleanup will fail - err = errs.Combine(err, service.cleanup(ctx, project, prefix)) - } - err = errs.Combine(err, project.Close()) - }() - - _, err = project.EnsureBucket(ctx, service.config.Bucket) - if err != nil { - return err - } - - // TODO move it before segment loop is started - o := uplink.ListObjectsOptions{ - Prefix: prefix + "/", - } - iterator := project.ListObjects(ctx, service.config.Bucket, &o) - for iterator.Next() { - if iterator.Item().IsPrefix { - continue - } - - service.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", service.config.Bucket)) - return nil - } - - infos := make([]internalpb.RetainInfo, 0, service.config.ZipBatchSize) - batchNumber := 0 - for nodeID, info := range retainInfos { - infos = append(infos, internalpb.RetainInfo{ - Filter: info.Filter.Bytes(), - // because bloom filters should be created from immutable database - // snapshot we are using latest segment creation date - CreationDate: latestCreationDate, - PieceCount: int64(info.Count), - StorageNodeId: nodeID, - }) - - if len(infos) == service.config.ZipBatchSize { - err = service.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos) - if err != nil { - return err - } - - infos = infos[:0] - batchNumber++ - } - } - - // upload rest of infos if any - if err := service.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil { - return err - } - - // update LATEST file - upload, err := project.UploadObject(ctx, service.config.Bucket, LATEST, nil) - if err != nil { - return err - } - _, err = upload.Write([]byte(prefix)) - if err != nil { - return err - } - - return upload.Commit() -} - -// uploadPack uploads single zip pack with multiple bloom filters. -func (service *Service) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) { - defer mon.Task()(&ctx)(&err) - - if len(infos) == 0 { - return nil - } - - upload, err := project.UploadObject(ctx, service.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{ - Expires: expirationTime, - }) - if err != nil { - return err - } - - zipWriter := zip.NewWriter(upload) - defer func() { - err = errs.Combine(err, zipWriter.Close()) - if err != nil { - err = errs.Combine(err, upload.Abort()) - } else { - err = upload.Commit() - } - }() - - for _, info := range infos { - retainInfoBytes, err := pb.Marshal(&info) - if err != nil { - return err - } - - writer, err := zipWriter.Create(info.StorageNodeId.String()) - if err != nil { - return err - } - - write, err := writer.Write(retainInfoBytes) - if err != nil { - return err - } - if len(retainInfoBytes) != write { - return errs.New("not whole bloom filter was written") - } - } - - return nil -} - -// cleanup moves all objects from root location to unique prefix. Objects will be deleted -// automatically when expires. -func (service *Service) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) { - defer mon.Task()(&ctx)(&err) - - errPrefix := "upload-error-" + time.Now().Format(time.RFC3339) - o := uplink.ListObjectsOptions{ - Prefix: prefix + "/", - } - iterator := project.ListObjects(ctx, service.config.Bucket, &o) - - for iterator.Next() { - item := iterator.Item() - if item.IsPrefix { - continue - } - - err := project.MoveObject(ctx, service.config.Bucket, item.Key, service.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil) - if err != nil { - return err - } - } - - return iterator.Err() -} diff --git a/satellite/gc/bloomfilter/service_test.go b/satellite/gc/bloomfilter/service_test.go deleted file mode 100644 index 10301b912..000000000 --- a/satellite/gc/bloomfilter/service_test.go +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright (C) 2022 Storj Labs, Inc. -// See LICENSE for copying information. - -package bloomfilter_test - -import ( - "archive/zip" - "bytes" - "io" - "sort" - "strconv" - "testing" - - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" - - "storj.io/common/memory" - "storj.io/common/pb" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/private/testplanet" - "storj.io/storj/satellite" - "storj.io/storj/satellite/gc/bloomfilter" - "storj.io/storj/satellite/internalpb" - "storj.io/uplink" -) - -func TestServiceGarbageCollectionBloomFilters(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - StorageNodeCount: 7, - UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.SegmentLoop.AsOfSystemInterval = 1 - - testplanet.ReconfigureRS(2, 2, 7, 7)(log, index, config) - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB)) - require.NoError(t, err) - - access := planet.Uplinks[0].Access[planet.Satellites[0].ID()] - accessString, err := access.Serialize() - require.NoError(t, err) - - project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) - require.NoError(t, err) - defer ctx.Check(project.Close) - - type testCase struct { - Bucket string - ZipBatchSize int - ExpectedPacks int - } - - testCases := []testCase{ - {"bloomfilters-bucket-1", 1, 7}, - {"bloomfilters-bucket-2", 2, 4}, - {"bloomfilters-bucket-7", 7, 1}, - {"bloomfilters-bucket-100", 100, 1}, - } - - for _, tc := range testCases { - config := planet.Satellites[0].Config.GarbageCollectionBF - config.Enabled = true - config.AccessGrant = accessString - config.Bucket = tc.Bucket - config.ZipBatchSize = tc.ZipBatchSize - service := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop) - - err = service.RunOnce(ctx) - require.NoError(t, err) - - download, err := project.DownloadObject(ctx, tc.Bucket, bloomfilter.LATEST, nil) - require.NoError(t, err) - - value, err := io.ReadAll(download) - require.NoError(t, err) - - err = download.Close() - require.NoError(t, err) - - prefix := string(value) - iterator := project.ListObjects(ctx, tc.Bucket, &uplink.ListObjectsOptions{ - Prefix: prefix + "/", - }) - - count := 0 - nodeIds := []string{} - packNames := []string{} - for iterator.Next() { - packNames = append(packNames, iterator.Item().Key) - - data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], tc.Bucket, iterator.Item().Key) - require.NoError(t, err) - - zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) - require.NoError(t, err) - - for _, file := range zipReader.File { - bfReader, err := file.Open() - require.NoError(t, err) - - bloomfilter, err := io.ReadAll(bfReader) - require.NoError(t, err) - - var pbRetainInfo internalpb.RetainInfo - err = pb.Unmarshal(bloomfilter, &pbRetainInfo) - require.NoError(t, err) - - require.NotEmpty(t, pbRetainInfo.Filter) - require.NotZero(t, pbRetainInfo.PieceCount) - require.NotZero(t, pbRetainInfo.CreationDate) - require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String()) - - nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String()) - } - - count++ - } - require.NoError(t, iterator.Err()) - require.Equal(t, tc.ExpectedPacks, count) - - expectedPackNames := []string{} - for i := 0; i < tc.ExpectedPacks; i++ { - expectedPackNames = append(expectedPackNames, prefix+"/bloomfilters-"+strconv.Itoa(i)+".zip") - } - sort.Strings(expectedPackNames) - sort.Strings(packNames) - require.Equal(t, expectedPackNames, packNames) - - expectedNodeIds := []string{} - for _, node := range planet.StorageNodes { - expectedNodeIds = append(expectedNodeIds, node.ID().String()) - } - sort.Strings(expectedNodeIds) - sort.Strings(nodeIds) - require.Equal(t, expectedNodeIds, nodeIds) - } - }) -} - -func TestServiceGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, - StorageNodeCount: 4, - UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.SegmentLoop.AsOfSystemInterval = 1 - - testplanet.ReconfigureRS(2, 2, 4, 4)(log, index, config) - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB)) - require.NoError(t, err) - - access := planet.Uplinks[0].Access[planet.Satellites[0].ID()] - accessString, err := access.Serialize() - require.NoError(t, err) - - project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) - require.NoError(t, err) - defer ctx.Check(project.Close) - - err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bloomfilters", "some object", testrand.Bytes(1*memory.KiB)) - require.NoError(t, err) - - config := planet.Satellites[0].Config.GarbageCollectionBF - config.AccessGrant = accessString - config.Bucket = "bloomfilters" - service := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop) - - err = service.RunOnce(ctx) - require.NoError(t, err) - - // check that there are 2 objects and the names match - iterator := project.ListObjects(ctx, "bloomfilters", nil) - keys := []string{} - for iterator.Next() { - if !iterator.Item().IsPrefix { - keys = append(keys, iterator.Item().Key) - } - } - require.Len(t, keys, 2) - require.Contains(t, keys, "some object") - require.Contains(t, keys, bloomfilter.LATEST) - }) -} diff --git a/satellite/gc/bloomfilter/upload.go b/satellite/gc/bloomfilter/upload.go index 35bb512b1..be01af042 100644 --- a/satellite/gc/bloomfilter/upload.go +++ b/satellite/gc/bloomfilter/upload.go @@ -54,7 +54,7 @@ func (bfu *Upload) UploadBloomFilters(ctx context.Context, latestCreationDate ti return nil } - prefix := time.Now().Format(time.RFC3339) + prefix := time.Now().Format(time.RFC3339Nano) expirationTime := time.Now().Add(bfu.config.ExpireIn) diff --git a/satellite/gc/gc_test.go b/satellite/gc/gc_test.go index a797010f4..73e1cf56b 100644 --- a/satellite/gc/gc_test.go +++ b/satellite/gc/gc_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "storj.io/common/base58" @@ -23,6 +24,7 @@ import ( "storj.io/storj/private/testplanet" "storj.io/storj/satellite/gc/bloomfilter" "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/storagenode" "storj.io/storj/storagenode/blobstore" "storj.io/uplink/private/eestream" @@ -58,7 +60,6 @@ func TestGarbageCollection(t *testing.T) { // configure filter uploader config := planet.Satellites[0].Config.GarbageCollectionBF config.AccessGrant = accessString - bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop) satellite := planet.Satellites[0] upl := planet.Uplinks[0] @@ -116,8 +117,15 @@ func TestGarbageCollection(t *testing.T) { // for a second. time.Sleep(1 * time.Second) - // Wait for next iteration of garbage collection to finish - err = bloomFilterService.RunOnce(ctx) + // Wait for bloom filter observer to finish + rangedloopConfig := planet.Satellites[0].Config.RangedLoop + + observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB) + segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize) + rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments, + []rangedloop.Observer{observer}) + + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) // send to storagenode @@ -166,7 +174,6 @@ func TestGarbageCollectionWithCopies(t *testing.T) { // configure filter uploader config := planet.Satellites[0].Config.GarbageCollectionBF config.AccessGrant = accessString - bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop) project, err := planet.Uplinks[0].OpenProject(ctx, satellite) require.NoError(t, err) @@ -211,8 +218,15 @@ func TestGarbageCollectionWithCopies(t *testing.T) { afterTotalUsedByNodes := allSpaceUsedForPieces() require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes) - // Wait for next iteration of garbage collection to finish - err = bloomFilterService.RunOnce(ctx) + // Wait for bloom filter observer to finish + rangedloopConfig := planet.Satellites[0].Config.RangedLoop + + observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB) + segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize) + rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments, + []rangedloop.Observer{observer}) + + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) // send to storagenode @@ -241,7 +255,7 @@ func TestGarbageCollectionWithCopies(t *testing.T) { planet.WaitForStorageNodeDeleters(ctx) // run GC - err = bloomFilterService.RunOnce(ctx) + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) // send to storagenode @@ -268,7 +282,7 @@ func TestGarbageCollectionWithCopies(t *testing.T) { planet.WaitForStorageNodeDeleters(ctx) // run GC - err = bloomFilterService.RunOnce(ctx) + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) // send to storagenode @@ -341,6 +355,10 @@ func TestGarbageCollection_PendingObject(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + access := planet.Uplinks[0].Access[planet.Satellites[0].ID()] + accessString, err := access.Serialize() + require.NoError(t, err) + satellite := planet.Satellites[0] upl := planet.Uplinks[0] @@ -350,19 +368,25 @@ func TestGarbageCollection_PendingObject(t *testing.T) { segments, err := satellite.Metabase.DB.TestingAllSegments(ctx) require.NoError(t, err) require.Len(t, segments, 1) - require.Len(t, segments[0].Pieces, 1) - lastPieceCounts := map[storj.NodeID]int64{} - pieceTracker := bloomfilter.NewPieceTracker(satellite.Log.Named("gc observer"), bloomfilter.Config{ - FalsePositiveRate: 0.000000001, - InitialPieces: 10, - }, lastPieceCounts) + config := planet.Satellites[0].Config.GarbageCollectionBF + config.AccessGrant = accessString + config.Bucket = "bucket" + config.FalsePositiveRate = 0.000000001 + config.InitialPieces = 10 - err = satellite.Metabase.SegmentLoop.Join(ctx, pieceTracker) + observer := bloomfilter.NewObserver(satellite.Log.Named("gc observer"), config, satellite.Overlay.DB) + + rangedloopConfig := planet.Satellites[0].Config.RangedLoop + provider := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize) + rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, provider, + []rangedloop.Observer{observer}) + + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) - require.NotEmpty(t, pieceTracker.RetainInfos) - info := pieceTracker.RetainInfos[planet.StorageNodes[0].ID()] + require.NotEmpty(t, observer.TestingRetainInfos()) + info := observer.TestingRetainInfos()[planet.StorageNodes[0].ID()] require.NotNil(t, info) require.Equal(t, 1, info.Count) diff --git a/satellite/gc/sender/service.go b/satellite/gc/sender/service.go index a2659b9c1..895e18450 100644 --- a/satellite/gc/sender/service.go +++ b/satellite/gc/sender/service.go @@ -89,8 +89,6 @@ func (service *Service) Run(ctx context.Context) (err error) { func (service *Service) RunOnce(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - loopStartTime := time.Now() - switch { case service.Config.AccessGrant == "": return errs.New("Access Grant is not set") @@ -150,10 +148,10 @@ func (service *Service) RunOnce(ctx context.Context) (err error) { if err != nil { // We store the error in the bucket and then continue with the next zip file. - return service.moveToErrorPrefix(ctx, project, objectKey, err, loopStartTime) + return service.moveToErrorPrefix(ctx, project, objectKey, err) } - return service.moveToSentPrefix(ctx, project, objectKey, loopStartTime) + return service.moveToSentPrefix(ctx, project, objectKey) }) } @@ -198,7 +196,7 @@ func (service *Service) sendRetainRequest(ctx context.Context, retainInfo *inter // moveToErrorPrefix moves an object to prefix "error" and attaches the error to the metadata. func (service *Service) moveToErrorPrefix( - ctx context.Context, project *uplink.Project, objectKey string, previousErr error, timeStamp time.Time, + ctx context.Context, project *uplink.Project, objectKey string, previousErr error, ) error { newObjectKey := "error-" + objectKey @@ -235,9 +233,9 @@ func (service *Service) uploadError( return upload.Commit() } -// moveToErrorPrefix moves an object to prefix "sent-[timestamp]". +// moveToSentPrefix moves an object to prefix "sent". func (service *Service) moveToSentPrefix( - ctx context.Context, project *uplink.Project, objectKey string, timeStamp time.Time, + ctx context.Context, project *uplink.Project, objectKey string, ) error { newObjectKey := "sent-" + objectKey diff --git a/satellite/gc/sender/service_test.go b/satellite/gc/sender/service_test.go index 9f534d05f..0f7caf5f1 100644 --- a/satellite/gc/sender/service_test.go +++ b/satellite/gc/sender/service_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "storj.io/common/memory" @@ -17,6 +18,7 @@ import ( "storj.io/common/testrand" "storj.io/storj/private/testplanet" "storj.io/storj/satellite/gc/bloomfilter" + "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/overlay" "storj.io/storj/storagenode" "storj.io/uplink" @@ -54,8 +56,14 @@ func TestSendRetainFilters(t *testing.T) { config.AccessGrant = accessString config.ZipBatchSize = 2 - bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop) - err = bloomFilterService.RunOnce(ctx) + rangedloopConfig := planet.Satellites[0].Config.RangedLoop + + observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB) + segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize) + rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments, + []rangedloop.Observer{observer}) + + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) storageNode0 := planet.StorageNodes[0] @@ -127,8 +135,14 @@ func TestSendRetainFiltersDisqualifedNode(t *testing.T) { config.AccessGrant = accessString config.ZipBatchSize = 2 - bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop) - err = bloomFilterService.RunOnce(ctx) + rangedloopConfig := planet.Satellites[0].Config.RangedLoop + + observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB) + segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize) + rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments, + []rangedloop.Observer{observer}) + + _, err = rangedLoop.RunOnce(ctx) require.NoError(t, err) storageNode0 := planet.StorageNodes[0] diff --git a/satellite/metabase/rangedloop/service.go b/satellite/metabase/rangedloop/service.go index 3d144eb1b..0cf121dbf 100644 --- a/satellite/metabase/rangedloop/service.go +++ b/satellite/metabase/rangedloop/service.go @@ -143,6 +143,7 @@ func (service *Service) RunOnce(ctx context.Context) (observerDurations []Observ rangeObservers := []*rangeObserverState{} for i, observerState := range observerStates { if observerState.err != nil { + service.log.Debug("observer returned error", zap.Error(observerState.err)) continue } rangeObserver, err := observerState.observer.Fork(ctx) diff --git a/satellite/rangedloop.go b/satellite/rangedloop.go index 9fe378e4f..bc76b38b0 100644 --- a/satellite/rangedloop.go +++ b/satellite/rangedloop.go @@ -18,7 +18,6 @@ import ( "storj.io/storj/private/lifecycle" "storj.io/storj/satellite/accounting/nodetally" "storj.io/storj/satellite/audit" - "storj.io/storj/satellite/gc/bloomfilter" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" @@ -62,10 +61,6 @@ type RangedLoop struct { Observer rangedloop.Observer } - GarbageCollectionBF struct { - Observer rangedloop.Observer - } - Accounting struct { NodeTallyObserver *nodetally.Observer } @@ -149,10 +144,6 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf ) } - { // setup garbage collection bloom filter observer - peer.GarbageCollectionBF.Observer = bloomfilter.NewObserver(log.Named("gc-bf"), config.GarbageCollectionBF, db.OverlayCache()) - } - { // setup ranged loop observers := []rangedloop.Observer{ rangedloop.NewLiveCountObserver(metabaseDB, config.RangedLoop.SuspiciousProcessedRatio, config.RangedLoop.AsOfSystemInterval), @@ -171,10 +162,6 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf observers = append(observers, peer.GracefulExit.Observer) } - if config.GarbageCollectionBF.Enabled && config.GarbageCollectionBF.UseRangedLoop { - observers = append(observers, peer.GarbageCollectionBF.Observer) - } - if config.Repairer.UseRangedLoop { observers = append(observers, peer.Repair.Observer) } diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index e87323faf..a7e8b12e2 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -454,9 +454,6 @@ contact.external-address: "" # Bucket which will be used to upload bloom filters # garbage-collection-bf.bucket: "" -# set if garbage collection bloom filters is enabled or not -# garbage-collection-bf.enabled: true - # how long bloom filters will remain in the bucket for gc/sender to consume before being automatically deleted # garbage-collection-bf.expire-in: 336h0m0s @@ -466,15 +463,9 @@ contact.external-address: "" # the initial number of pieces expected for a storage node to have, used for creating a filter # garbage-collection-bf.initial-pieces: 400000 -# the time between each garbage collection executions -# garbage-collection-bf.interval: 120h0m0s - # set if garbage collection bloom filter process should only run once then exit # garbage-collection-bf.run-once: false -# whether to use ranged loop instead of segment loop -# garbage-collection-bf.use-ranged-loop: false - # whether to use test GC SyncObserver with ranged loop # garbage-collection-bf.use-sync-observer: false