satellite/gc/bloomfilter: remove segments loop parts
We are switching completely to ranged loop. https://github.com/storj/storj/issues/5368 Change-Id: I1a22ac4b242998e287b2b7d8167b64e850b61a0f
This commit is contained in:
parent
5fede0ce95
commit
2592aaef9c
@ -40,7 +40,6 @@ import (
|
||||
"storj.io/storj/satellite/console/consoleweb"
|
||||
"storj.io/storj/satellite/console/userinfo"
|
||||
"storj.io/storj/satellite/contact"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/gc/sender"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
"storj.io/storj/satellite/inspector"
|
||||
@ -152,8 +151,7 @@ type Satellite struct {
|
||||
}
|
||||
|
||||
GarbageCollection struct {
|
||||
Sender *sender.Service
|
||||
BloomFilters *bloomfilter.Service
|
||||
Sender *sender.Service
|
||||
}
|
||||
|
||||
ExpiredDeletion struct {
|
||||
@ -639,7 +637,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
||||
system.Audit.ContainmentSyncChore = peer.Audit.ContainmentSyncChore
|
||||
|
||||
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender
|
||||
system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service
|
||||
|
||||
system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore
|
||||
system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore
|
||||
|
@ -49,8 +49,7 @@ type GarbageCollectionBF struct {
|
||||
}
|
||||
|
||||
GarbageCollection struct {
|
||||
Config bloomfilter.Config
|
||||
Service *bloomfilter.Service
|
||||
Config bloomfilter.Config
|
||||
}
|
||||
|
||||
RangedLoop struct {
|
||||
@ -95,65 +94,31 @@ func NewGarbageCollectionBF(log *zap.Logger, db DB, metabaseDB *metabase.DB, rev
|
||||
{ // setup garbage collection bloom filters
|
||||
log := peer.Log.Named("garbage-collection-bf")
|
||||
peer.GarbageCollection.Config = config.GarbageCollectionBF
|
||||
if config.GarbageCollectionBF.UseRangedLoop {
|
||||
log.Info("using ranged loop")
|
||||
|
||||
var observer rangedloop.Observer
|
||||
if config.GarbageCollectionBF.UseSyncObserver {
|
||||
observer = bloomfilter.NewSyncObserver(log.Named("gc-bf"),
|
||||
config.GarbageCollectionBF,
|
||||
peer.Overlay.DB,
|
||||
)
|
||||
} else {
|
||||
observer = bloomfilter.NewObserver(log.Named("gc-bf"),
|
||||
config.GarbageCollectionBF,
|
||||
peer.Overlay.DB,
|
||||
)
|
||||
}
|
||||
|
||||
provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)
|
||||
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{observer})
|
||||
|
||||
if !config.GarbageCollectionBF.RunOnce {
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "garbage-collection-bf",
|
||||
Run: peer.RangedLoop.Service.Run,
|
||||
Close: peer.RangedLoop.Service.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Garbage Collection Bloom Filters", peer.RangedLoop.Service.Loop))
|
||||
}
|
||||
} else {
|
||||
log.Info("using segments loop")
|
||||
|
||||
{ // setup metainfo
|
||||
peer.Metainfo.SegmentLoop = segmentloop.New(
|
||||
log.Named("segmentloop"),
|
||||
config.Metainfo.SegmentLoop,
|
||||
metabaseDB,
|
||||
)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "metainfo:segmentloop",
|
||||
Run: peer.Metainfo.SegmentLoop.Run,
|
||||
Close: peer.Metainfo.SegmentLoop.Close,
|
||||
})
|
||||
}
|
||||
|
||||
peer.GarbageCollection.Service = bloomfilter.NewService(
|
||||
log,
|
||||
var observer rangedloop.Observer
|
||||
if config.GarbageCollectionBF.UseSyncObserver {
|
||||
observer = bloomfilter.NewSyncObserver(log.Named("gc-bf"),
|
||||
config.GarbageCollectionBF,
|
||||
peer.Overlay.DB,
|
||||
peer.Metainfo.SegmentLoop,
|
||||
)
|
||||
} else {
|
||||
observer = bloomfilter.NewObserver(log.Named("gc-bf"),
|
||||
config.GarbageCollectionBF,
|
||||
peer.Overlay.DB,
|
||||
)
|
||||
}
|
||||
|
||||
if !config.GarbageCollectionBF.RunOnce {
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "garbage-collection-bf",
|
||||
Run: peer.GarbageCollection.Service.Run,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Garbage Collection Bloom Filters", peer.GarbageCollection.Service.Loop))
|
||||
}
|
||||
provider := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)
|
||||
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, provider, []rangedloop.Observer{observer})
|
||||
|
||||
if !config.GarbageCollectionBF.RunOnce {
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "garbage-collection-bf",
|
||||
Run: peer.RangedLoop.Service.Run,
|
||||
Close: peer.RangedLoop.Service.Close,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Garbage Collection Bloom Filters", peer.RangedLoop.Service.Loop))
|
||||
}
|
||||
}
|
||||
|
||||
@ -175,11 +140,7 @@ func (peer *GarbageCollectionBF) Run(ctx context.Context) (err error) {
|
||||
|
||||
if peer.GarbageCollection.Config.RunOnce {
|
||||
group.Go(func() error {
|
||||
if peer.GarbageCollection.Config.UseRangedLoop {
|
||||
_, err = peer.RangedLoop.Service.RunOnce(ctx)
|
||||
} else {
|
||||
err = peer.GarbageCollection.Service.RunOnce(ctx)
|
||||
}
|
||||
_, err = peer.RangedLoop.Service.RunOnce(ctx)
|
||||
cancel()
|
||||
return err
|
||||
})
|
||||
|
@ -20,22 +20,6 @@ func TestGCBFUseRangedLoop(t *testing.T) {
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.GarbageCollectionBF.RunOnce = true
|
||||
config.GarbageCollectionBF.UseRangedLoop = true
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
err := planet.Satellites[0].GCBF.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGCBFUseSegmentsLoop(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.GarbageCollectionBF.RunOnce = true
|
||||
config.GarbageCollectionBF.UseRangedLoop = false
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
|
22
satellite/gc/bloomfilter/config.go
Normal file
22
satellite/gc/bloomfilter/config.go
Normal file
@ -0,0 +1,22 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package bloomfilter
|
||||
|
||||
import "time"
|
||||
|
||||
// Config contains configurable values for garbage collection.
|
||||
type Config struct {
|
||||
RunOnce bool `help:"set if garbage collection bloom filter process should only run once then exit" default:"false"`
|
||||
|
||||
UseSyncObserver bool `help:"whether to use test GC SyncObserver with ranged loop" default:"false"`
|
||||
|
||||
// value for InitialPieces currently based on average pieces per node
|
||||
InitialPieces int64 `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
|
||||
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`
|
||||
|
||||
AccessGrant string `help:"Access Grant which will be used to upload bloom filters to the bucket" default:""`
|
||||
Bucket string `help:"Bucket which will be used to upload bloom filters" default:"" testDefault:"gc-queue"` // TODO do we need full location?
|
||||
ZipBatchSize int `help:"how many bloom filters will be packed in a single zip" default:"500" testDefault:"2"`
|
||||
ExpireIn time.Duration `help:"how long bloom filters will remain in the bucket for gc/sender to consume before being automatically deleted" default:"336h"`
|
||||
}
|
@ -5,18 +5,18 @@
|
||||
Package bloomfilter contains the functions needed to run part of garbage collection
|
||||
process.
|
||||
|
||||
The bloomfilter.PieceTracker implements the segments loop Observer interface
|
||||
The bloomfilter.Observer implements the ranged loop Observer interface
|
||||
allowing us to subscribe to the loop to get information for every segment
|
||||
in the metabase db.
|
||||
|
||||
The bloomfilter.PieceTracker handling functions are used by the bloomfilter.Service
|
||||
to periodically account for all existing pieces on storage nodes and create
|
||||
"retain requests" which contain a bloom filter of all pieces that possibly exist
|
||||
on a storage node.
|
||||
The bloomfilter.Observer is subscribed to ranged loop instance to account for all
|
||||
existing segment pieces on storage nodes and create "retain requests" which contain
|
||||
a bloom filter of all pieces that possibly exist on a storage node. With ranged loop
|
||||
segments can be processed in parallel to speed up process.
|
||||
|
||||
The bloomfilter.Service will send that requests to the Storj bucket after a full
|
||||
segments loop iteration. After that bloom filters will be downloaded and sent
|
||||
to the storage nodes with separate service from storj/satellite/gc package.
|
||||
The bloomfilter.Observer will send that requests to the Storj bucket after a full
|
||||
ranged loop iteration. After that bloom filters will be downloaded and sent
|
||||
to the storage nodes with separate service from storj/satellite/gc/sender package.
|
||||
|
||||
This bloom filter service should be run only against immutable database snapshot.
|
||||
|
||||
|
@ -7,16 +7,26 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/bloomfilter"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// RetainInfo contains info needed for a storage node to retain important data and delete garbage data.
|
||||
type RetainInfo struct {
|
||||
Filter *bloomfilter.Filter
|
||||
Count int
|
||||
}
|
||||
|
||||
// Observer implements a rangedloop observer to collect bloom filters for the garbage collection.
|
||||
//
|
||||
// architecture: Observer
|
||||
@ -35,6 +45,7 @@ type Observer struct {
|
||||
}
|
||||
|
||||
var _ (rangedloop.Observer) = (*Observer)(nil)
|
||||
var _ (rangedloop.Partial) = (*observerFork)(nil)
|
||||
|
||||
// NewObserver creates a new instance of the gc rangedloop observer.
|
||||
func NewObserver(log *zap.Logger, config Config, overlay overlay.DB) *Observer {
|
||||
@ -77,27 +88,20 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)
|
||||
// Fork creates a Partial to build bloom filters over a chunk of all the segments.
|
||||
func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
// TODO: refactor PieceTracker after the segmentloop has been removed to
|
||||
// more closely match the rangedloop observer needs.
|
||||
pieceTracker := NewPieceTrackerWithSeed(obs.log.Named("gc observer"), obs.config, obs.lastPieceCounts, obs.seed)
|
||||
if err := pieceTracker.LoopStarted(ctx, segmentloop.LoopInfo{
|
||||
Started: obs.startTime,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pieceTracker, nil
|
||||
|
||||
return newObserverFork(obs.log.Named("gc observer"), obs.config, obs.lastPieceCounts, obs.seed, obs.startTime), nil
|
||||
}
|
||||
|
||||
// Join merges the bloom filters gathered by each Partial.
|
||||
func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
pieceTracker, ok := partial.(*PieceTracker)
|
||||
pieceTracker, ok := partial.(*observerFork)
|
||||
if !ok {
|
||||
return errs.New("expected %T but got %T", pieceTracker, partial)
|
||||
}
|
||||
|
||||
// Update the count and merge the bloom filters for each node.
|
||||
for nodeID, retainInfo := range pieceTracker.RetainInfos {
|
||||
for nodeID, retainInfo := range pieceTracker.retainInfos {
|
||||
if existing, ok := obs.retainInfos[nodeID]; ok {
|
||||
existing.Count += retainInfo.Count
|
||||
if err := existing.Filter.AddFilter(retainInfo.Filter); err != nil {
|
||||
@ -109,8 +113,8 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err
|
||||
}
|
||||
|
||||
// Replace the latestCreationTime if the partial observed a later time.
|
||||
if obs.latestCreationTime.IsZero() || obs.latestCreationTime.Before(pieceTracker.LatestCreationTime) {
|
||||
obs.latestCreationTime = pieceTracker.LatestCreationTime
|
||||
if obs.latestCreationTime.IsZero() || obs.latestCreationTime.Before(pieceTracker.latestCreationTime) {
|
||||
obs.latestCreationTime = pieceTracker.latestCreationTime
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -125,3 +129,88 @@ func (obs *Observer) Finish(ctx context.Context) (err error) {
|
||||
obs.log.Debug("collecting bloom filters finished")
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestingRetainInfos returns retain infos collected by observer.
|
||||
func (obs *Observer) TestingRetainInfos() map[storj.NodeID]*RetainInfo {
|
||||
return obs.retainInfos
|
||||
}
|
||||
|
||||
type observerFork struct {
|
||||
log *zap.Logger
|
||||
config Config
|
||||
// TODO: should we use int or int64 consistently for piece count (db type is int64)?
|
||||
pieceCounts map[storj.NodeID]int64
|
||||
seed byte
|
||||
startTime time.Time
|
||||
|
||||
retainInfos map[storj.NodeID]*RetainInfo
|
||||
// latestCreationTime will be used to set bloom filter CreationDate.
|
||||
// Because bloom filter service needs to be run against immutable database snapshot
|
||||
// we can set CreationDate for bloom filters as a latest segment CreatedAt value.
|
||||
latestCreationTime time.Time
|
||||
}
|
||||
|
||||
// newObserverFork instantiates a new observer fork to process different segment range.
|
||||
// The seed is passed so that it can be shared among all parallel forks.
|
||||
func newObserverFork(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int64, seed byte, startTime time.Time) *observerFork {
|
||||
return &observerFork{
|
||||
log: log,
|
||||
config: config,
|
||||
pieceCounts: pieceCounts,
|
||||
seed: seed,
|
||||
startTime: startTime,
|
||||
|
||||
retainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)),
|
||||
}
|
||||
}
|
||||
|
||||
// Process adds pieces to the bloom filter from remote segments.
|
||||
func (fork *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
for _, segment := range segments {
|
||||
if segment.Inline() {
|
||||
continue
|
||||
}
|
||||
|
||||
// sanity check to detect if loop is not running against live database
|
||||
if segment.CreatedAt.After(fork.startTime) {
|
||||
fork.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID),
|
||||
zap.Time("loop started", fork.startTime),
|
||||
zap.Time("segment created", segment.CreatedAt))
|
||||
return errs.New("segment created after loop started")
|
||||
}
|
||||
|
||||
if fork.latestCreationTime.Before(segment.CreatedAt) {
|
||||
fork.latestCreationTime = segment.CreatedAt
|
||||
}
|
||||
|
||||
deriver := segment.RootPieceID.Deriver()
|
||||
for _, piece := range segment.Pieces {
|
||||
pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number))
|
||||
fork.add(piece.StorageNode, pieceID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// add adds a pieceID to the relevant node's RetainInfo.
|
||||
func (fork *observerFork) add(nodeID storj.NodeID, pieceID storj.PieceID) {
|
||||
info, ok := fork.retainInfos[nodeID]
|
||||
if !ok {
|
||||
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
|
||||
numPieces := fork.config.InitialPieces
|
||||
if pieceCounts := fork.pieceCounts[nodeID]; pieceCounts > 0 {
|
||||
numPieces = pieceCounts
|
||||
}
|
||||
|
||||
hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, fork.config.FalsePositiveRate, 2*memory.MiB)
|
||||
// limit size of bloom filter to ensure we are under the limit for RPC
|
||||
filter := bloomfilter.NewExplicit(fork.seed, hashCount, tableSize)
|
||||
info = &RetainInfo{
|
||||
Filter: filter,
|
||||
}
|
||||
fork.retainInfos[nodeID] = info
|
||||
}
|
||||
|
||||
info.Filter.Add(pieceID)
|
||||
info.Count++
|
||||
}
|
||||
|
@ -74,8 +74,6 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
|
||||
// directly, as is already done for the service. Maybe we can
|
||||
// improve this later.
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.Enabled = true
|
||||
config.UseRangedLoop = true
|
||||
config.AccessGrant = accessString
|
||||
config.Bucket = tc.Bucket
|
||||
config.ZipBatchSize = tc.ZipBatchSize
|
||||
@ -201,10 +199,8 @@ func TestObserverGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T)
|
||||
// directly, as is already done for the service. Maybe we can
|
||||
// improve this later.
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.Enabled = true
|
||||
config.AccessGrant = accessString
|
||||
config.Bucket = "bloomfilters"
|
||||
config.UseRangedLoop = true
|
||||
observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB)
|
||||
|
||||
// TODO: see comment above. ideally this should use the rangedloop
|
||||
@ -273,10 +269,8 @@ func TestObserverGarbageCollection_MultipleRanges(t *testing.T) {
|
||||
// directly, as is already done for the service. Maybe we can
|
||||
// improve this later.
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.Enabled = true
|
||||
config.AccessGrant = accessString
|
||||
config.Bucket = "bloomfilters"
|
||||
config.UseRangedLoop = true
|
||||
observers := []rangedloop.Observer{
|
||||
bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
|
||||
bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
|
||||
|
@ -1,134 +0,0 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package bloomfilter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/bloomfilter"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
)
|
||||
|
||||
var _ segmentloop.Observer = (*PieceTracker)(nil)
|
||||
|
||||
// RetainInfo contains info needed for a storage node to retain important data and delete garbage data.
|
||||
type RetainInfo struct {
|
||||
Filter *bloomfilter.Filter
|
||||
Count int
|
||||
}
|
||||
|
||||
// PieceTracker implements the segments loop observer interface for garbage collection.
|
||||
//
|
||||
// architecture: Observer
|
||||
type PieceTracker struct {
|
||||
log *zap.Logger
|
||||
config Config
|
||||
// TODO: should we use int or int64 consistently for piece count (db type is int64)?
|
||||
pieceCounts map[storj.NodeID]int64
|
||||
seed byte
|
||||
startTime time.Time
|
||||
|
||||
RetainInfos map[storj.NodeID]*RetainInfo
|
||||
// LatestCreationTime will be used to set bloom filter CreationDate.
|
||||
// Because bloom filter service needs to be run against immutable database snapshot
|
||||
// we can set CreationDate for bloom filters as a latest segment CreatedAt value.
|
||||
LatestCreationTime time.Time
|
||||
}
|
||||
|
||||
// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the segments loop.
|
||||
func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int64) *PieceTracker {
|
||||
return NewPieceTrackerWithSeed(log, config, pieceCounts, bloomfilter.GenerateSeed())
|
||||
}
|
||||
|
||||
// NewPieceTrackerWithSeed instantiates a new gc piece tracker to be subscribed
|
||||
// to the rangedloop. The seed is passed so that it can be shared among all
|
||||
// parallel PieceTrackers handling each segment range.
|
||||
func NewPieceTrackerWithSeed(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int64, seed byte) *PieceTracker {
|
||||
return &PieceTracker{
|
||||
log: log,
|
||||
config: config,
|
||||
pieceCounts: pieceCounts,
|
||||
seed: seed,
|
||||
|
||||
RetainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)),
|
||||
}
|
||||
}
|
||||
|
||||
// LoopStarted is called at each start of a loop.
|
||||
func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) {
|
||||
pieceTracker.startTime = info.Started
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteSegment takes a remote segment found in metabase and adds pieces to bloom filters.
|
||||
func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
|
||||
// we are expliticy not adding monitoring here as we are tracking loop observers separately
|
||||
|
||||
// sanity check to detect if loop is not running against live database
|
||||
if segment.CreatedAt.After(pieceTracker.startTime) {
|
||||
pieceTracker.log.Error("segment created after loop started", zap.Stringer("StreamID", segment.StreamID),
|
||||
zap.Time("loop started", pieceTracker.startTime),
|
||||
zap.Time("segment created", segment.CreatedAt))
|
||||
return errs.New("segment created after loop started")
|
||||
}
|
||||
|
||||
if pieceTracker.LatestCreationTime.Before(segment.CreatedAt) {
|
||||
pieceTracker.LatestCreationTime = segment.CreatedAt
|
||||
}
|
||||
|
||||
deriver := segment.RootPieceID.Deriver()
|
||||
for _, piece := range segment.Pieces {
|
||||
pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number))
|
||||
pieceTracker.add(piece.StorageNode, pieceID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// add adds a pieceID to the relevant node's RetainInfo.
|
||||
func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID) {
|
||||
info, ok := pieceTracker.RetainInfos[nodeID]
|
||||
if !ok {
|
||||
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
|
||||
numPieces := pieceTracker.config.InitialPieces
|
||||
if pieceCounts := pieceTracker.pieceCounts[nodeID]; pieceCounts > 0 {
|
||||
numPieces = pieceCounts
|
||||
}
|
||||
|
||||
hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, pieceTracker.config.FalsePositiveRate, 2*memory.MiB)
|
||||
// limit size of bloom filter to ensure we are under the limit for RPC
|
||||
filter := bloomfilter.NewExplicit(pieceTracker.seed, hashCount, tableSize)
|
||||
info = &RetainInfo{
|
||||
Filter: filter,
|
||||
}
|
||||
pieceTracker.RetainInfos[nodeID] = info
|
||||
}
|
||||
|
||||
info.Filter.Add(pieceID)
|
||||
info.Count++
|
||||
}
|
||||
|
||||
// InlineSegment returns nil because we're only doing gc for storage nodes for now.
|
||||
func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Process adds pieces to the bloom filter from remote segments.
|
||||
func (pieceTracker *PieceTracker) Process(ctx context.Context, segments []segmentloop.Segment) error {
|
||||
for _, segment := range segments {
|
||||
if segment.Inline() {
|
||||
continue
|
||||
}
|
||||
if err := pieceTracker.RemoteSegment(ctx, &segment); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,286 +0,0 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package bloomfilter
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/spacemonkeygo/monkit/v3"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/uplink"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Config contains configurable values for garbage collection.
|
||||
type Config struct {
|
||||
Interval time.Duration `help:"the time between each garbage collection executions" releaseDefault:"120h" devDefault:"10m" testDefault:"$TESTINTERVAL"`
|
||||
// TODO service is not enabled by default for testing until will be finished
|
||||
Enabled bool `help:"set if garbage collection bloom filters is enabled or not" default:"true" testDefault:"false"`
|
||||
|
||||
RunOnce bool `help:"set if garbage collection bloom filter process should only run once then exit" default:"false"`
|
||||
|
||||
UseRangedLoop bool `help:"whether to use ranged loop instead of segment loop" default:"false"`
|
||||
UseSyncObserver bool `help:"whether to use test GC SyncObserver with ranged loop" default:"false"`
|
||||
|
||||
// value for InitialPieces currently based on average pieces per node
|
||||
InitialPieces int64 `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
|
||||
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`
|
||||
|
||||
AccessGrant string `help:"Access Grant which will be used to upload bloom filters to the bucket" default:""`
|
||||
Bucket string `help:"Bucket which will be used to upload bloom filters" default:"" testDefault:"gc-queue"` // TODO do we need full location?
|
||||
ZipBatchSize int `help:"how many bloom filters will be packed in a single zip" default:"500" testDefault:"2"`
|
||||
ExpireIn time.Duration `help:"how long bloom filters will remain in the bucket for gc/sender to consume before being automatically deleted" default:"336h"`
|
||||
}
|
||||
|
||||
// Service implements service to collect bloom filters for the garbage collection.
|
||||
//
|
||||
// architecture: Chore
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
config Config
|
||||
Loop *sync2.Cycle
|
||||
|
||||
overlay overlay.DB
|
||||
segmentLoop *segmentloop.Service
|
||||
}
|
||||
|
||||
// NewService creates a new instance of the gc service.
|
||||
func NewService(log *zap.Logger, config Config, overlay overlay.DB, loop *segmentloop.Service) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
config: config,
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
overlay: overlay,
|
||||
segmentLoop: loop,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the gc loop service.
|
||||
func (service *Service) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if !service.config.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case service.config.AccessGrant == "":
|
||||
return errs.New("Access Grant is not set")
|
||||
case service.config.Bucket == "":
|
||||
return errs.New("Bucket is not set")
|
||||
}
|
||||
|
||||
return service.Loop.Run(ctx, service.RunOnce)
|
||||
}
|
||||
|
||||
// RunOnce runs service only once.
|
||||
func (service *Service) RunOnce(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
service.log.Debug("collecting bloom filters started")
|
||||
|
||||
// load last piece counts from overlay db
|
||||
lastPieceCounts, err := service.overlay.AllPieceCounts(ctx)
|
||||
if err != nil {
|
||||
service.log.Error("error getting last piece counts", zap.Error(err))
|
||||
err = nil
|
||||
}
|
||||
if lastPieceCounts == nil {
|
||||
lastPieceCounts = make(map[storj.NodeID]int64)
|
||||
}
|
||||
|
||||
pieceTracker := NewPieceTracker(service.log.Named("gc observer"), service.config, lastPieceCounts)
|
||||
|
||||
// collect things to retain
|
||||
err = service.segmentLoop.Join(ctx, pieceTracker)
|
||||
if err != nil {
|
||||
service.log.Error("error joining metainfoloop", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
err = service.uploadBloomFilters(ctx, pieceTracker.LatestCreationTime, pieceTracker.RetainInfos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.log.Debug("collecting bloom filters finished")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadBloomFilters stores a zipfile with multiple bloom filters in a bucket.
|
||||
func (service *Service) uploadBloomFilters(ctx context.Context, latestCreationDate time.Time, retainInfos map[storj.NodeID]*RetainInfo) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(retainInfos) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
prefix := time.Now().Format(time.RFC3339)
|
||||
|
||||
expirationTime := time.Now().Add(service.config.ExpireIn)
|
||||
|
||||
accessGrant, err := uplink.ParseAccess(service.config.AccessGrant)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
project, err := uplink.OpenProject(ctx, accessGrant)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
// do cleanup in case of any error while uploading bloom filters
|
||||
if err != nil {
|
||||
// TODO should we drop whole bucket if cleanup will fail
|
||||
err = errs.Combine(err, service.cleanup(ctx, project, prefix))
|
||||
}
|
||||
err = errs.Combine(err, project.Close())
|
||||
}()
|
||||
|
||||
_, err = project.EnsureBucket(ctx, service.config.Bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO move it before segment loop is started
|
||||
o := uplink.ListObjectsOptions{
|
||||
Prefix: prefix + "/",
|
||||
}
|
||||
iterator := project.ListObjects(ctx, service.config.Bucket, &o)
|
||||
for iterator.Next() {
|
||||
if iterator.Item().IsPrefix {
|
||||
continue
|
||||
}
|
||||
|
||||
service.log.Warn("target bucket was not empty, stop operation and wait for next execution", zap.String("bucket", service.config.Bucket))
|
||||
return nil
|
||||
}
|
||||
|
||||
infos := make([]internalpb.RetainInfo, 0, service.config.ZipBatchSize)
|
||||
batchNumber := 0
|
||||
for nodeID, info := range retainInfos {
|
||||
infos = append(infos, internalpb.RetainInfo{
|
||||
Filter: info.Filter.Bytes(),
|
||||
// because bloom filters should be created from immutable database
|
||||
// snapshot we are using latest segment creation date
|
||||
CreationDate: latestCreationDate,
|
||||
PieceCount: int64(info.Count),
|
||||
StorageNodeId: nodeID,
|
||||
})
|
||||
|
||||
if len(infos) == service.config.ZipBatchSize {
|
||||
err = service.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
infos = infos[:0]
|
||||
batchNumber++
|
||||
}
|
||||
}
|
||||
|
||||
// upload rest of infos if any
|
||||
if err := service.uploadPack(ctx, project, prefix, batchNumber, expirationTime, infos); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update LATEST file
|
||||
upload, err := project.UploadObject(ctx, service.config.Bucket, LATEST, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = upload.Write([]byte(prefix))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return upload.Commit()
|
||||
}
|
||||
|
||||
// uploadPack uploads single zip pack with multiple bloom filters.
|
||||
func (service *Service) uploadPack(ctx context.Context, project *uplink.Project, prefix string, batchNumber int, expirationTime time.Time, infos []internalpb.RetainInfo) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(infos) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
upload, err := project.UploadObject(ctx, service.config.Bucket, prefix+"/bloomfilters-"+strconv.Itoa(batchNumber)+".zip", &uplink.UploadOptions{
|
||||
Expires: expirationTime,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
zipWriter := zip.NewWriter(upload)
|
||||
defer func() {
|
||||
err = errs.Combine(err, zipWriter.Close())
|
||||
if err != nil {
|
||||
err = errs.Combine(err, upload.Abort())
|
||||
} else {
|
||||
err = upload.Commit()
|
||||
}
|
||||
}()
|
||||
|
||||
for _, info := range infos {
|
||||
retainInfoBytes, err := pb.Marshal(&info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
writer, err := zipWriter.Create(info.StorageNodeId.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
write, err := writer.Write(retainInfoBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(retainInfoBytes) != write {
|
||||
return errs.New("not whole bloom filter was written")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanup moves all objects from root location to unique prefix. Objects will be deleted
|
||||
// automatically when expires.
|
||||
func (service *Service) cleanup(ctx context.Context, project *uplink.Project, prefix string) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
errPrefix := "upload-error-" + time.Now().Format(time.RFC3339)
|
||||
o := uplink.ListObjectsOptions{
|
||||
Prefix: prefix + "/",
|
||||
}
|
||||
iterator := project.ListObjects(ctx, service.config.Bucket, &o)
|
||||
|
||||
for iterator.Next() {
|
||||
item := iterator.Item()
|
||||
if item.IsPrefix {
|
||||
continue
|
||||
}
|
||||
|
||||
err := project.MoveObject(ctx, service.config.Bucket, item.Key, service.config.Bucket, prefix+"/"+errPrefix+"/"+item.Key, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return iterator.Err()
|
||||
}
|
@ -1,193 +0,0 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package bloomfilter_test
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/uplink"
|
||||
)
|
||||
|
||||
func TestServiceGarbageCollectionBloomFilters(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 7,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.SegmentLoop.AsOfSystemInterval = 1
|
||||
|
||||
testplanet.ReconfigureRS(2, 2, 7, 7)(log, index, config)
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
||||
accessString, err := access.Serialize()
|
||||
require.NoError(t, err)
|
||||
|
||||
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(project.Close)
|
||||
|
||||
type testCase struct {
|
||||
Bucket string
|
||||
ZipBatchSize int
|
||||
ExpectedPacks int
|
||||
}
|
||||
|
||||
testCases := []testCase{
|
||||
{"bloomfilters-bucket-1", 1, 7},
|
||||
{"bloomfilters-bucket-2", 2, 4},
|
||||
{"bloomfilters-bucket-7", 7, 1},
|
||||
{"bloomfilters-bucket-100", 100, 1},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.Enabled = true
|
||||
config.AccessGrant = accessString
|
||||
config.Bucket = tc.Bucket
|
||||
config.ZipBatchSize = tc.ZipBatchSize
|
||||
service := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
|
||||
|
||||
err = service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
download, err := project.DownloadObject(ctx, tc.Bucket, bloomfilter.LATEST, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
value, err := io.ReadAll(download)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = download.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
prefix := string(value)
|
||||
iterator := project.ListObjects(ctx, tc.Bucket, &uplink.ListObjectsOptions{
|
||||
Prefix: prefix + "/",
|
||||
})
|
||||
|
||||
count := 0
|
||||
nodeIds := []string{}
|
||||
packNames := []string{}
|
||||
for iterator.Next() {
|
||||
packNames = append(packNames, iterator.Item().Key)
|
||||
|
||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], tc.Bucket, iterator.Item().Key)
|
||||
require.NoError(t, err)
|
||||
|
||||
zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, file := range zipReader.File {
|
||||
bfReader, err := file.Open()
|
||||
require.NoError(t, err)
|
||||
|
||||
bloomfilter, err := io.ReadAll(bfReader)
|
||||
require.NoError(t, err)
|
||||
|
||||
var pbRetainInfo internalpb.RetainInfo
|
||||
err = pb.Unmarshal(bloomfilter, &pbRetainInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotEmpty(t, pbRetainInfo.Filter)
|
||||
require.NotZero(t, pbRetainInfo.PieceCount)
|
||||
require.NotZero(t, pbRetainInfo.CreationDate)
|
||||
require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String())
|
||||
|
||||
nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String())
|
||||
}
|
||||
|
||||
count++
|
||||
}
|
||||
require.NoError(t, iterator.Err())
|
||||
require.Equal(t, tc.ExpectedPacks, count)
|
||||
|
||||
expectedPackNames := []string{}
|
||||
for i := 0; i < tc.ExpectedPacks; i++ {
|
||||
expectedPackNames = append(expectedPackNames, prefix+"/bloomfilters-"+strconv.Itoa(i)+".zip")
|
||||
}
|
||||
sort.Strings(expectedPackNames)
|
||||
sort.Strings(packNames)
|
||||
require.Equal(t, expectedPackNames, packNames)
|
||||
|
||||
expectedNodeIds := []string{}
|
||||
for _, node := range planet.StorageNodes {
|
||||
expectedNodeIds = append(expectedNodeIds, node.ID().String())
|
||||
}
|
||||
sort.Strings(expectedNodeIds)
|
||||
sort.Strings(nodeIds)
|
||||
require.Equal(t, expectedNodeIds, nodeIds)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestServiceGarbageCollectionBloomFilters_AllowNotEmptyBucket(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 4,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.SegmentLoop.AsOfSystemInterval = 1
|
||||
|
||||
testplanet.ReconfigureRS(2, 2, 4, 4)(log, index, config)
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "object", testrand.Bytes(10*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
||||
accessString, err := access.Serialize()
|
||||
require.NoError(t, err)
|
||||
|
||||
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(project.Close)
|
||||
|
||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bloomfilters", "some object", testrand.Bytes(1*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.AccessGrant = accessString
|
||||
config.Bucket = "bloomfilters"
|
||||
service := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
|
||||
|
||||
err = service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check that there are 2 objects and the names match
|
||||
iterator := project.ListObjects(ctx, "bloomfilters", nil)
|
||||
keys := []string{}
|
||||
for iterator.Next() {
|
||||
if !iterator.Item().IsPrefix {
|
||||
keys = append(keys, iterator.Item().Key)
|
||||
}
|
||||
}
|
||||
require.Len(t, keys, 2)
|
||||
require.Contains(t, keys, "some object")
|
||||
require.Contains(t, keys, bloomfilter.LATEST)
|
||||
})
|
||||
}
|
@ -54,7 +54,7 @@ func (bfu *Upload) UploadBloomFilters(ctx context.Context, latestCreationDate ti
|
||||
return nil
|
||||
}
|
||||
|
||||
prefix := time.Now().Format(time.RFC3339)
|
||||
prefix := time.Now().Format(time.RFC3339Nano)
|
||||
|
||||
expirationTime := time.Now().Add(bfu.config.ExpireIn)
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/base58"
|
||||
@ -23,6 +24,7 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/blobstore"
|
||||
"storj.io/uplink/private/eestream"
|
||||
@ -58,7 +60,6 @@ func TestGarbageCollection(t *testing.T) {
|
||||
// configure filter uploader
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.AccessGrant = accessString
|
||||
bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
|
||||
|
||||
satellite := planet.Satellites[0]
|
||||
upl := planet.Uplinks[0]
|
||||
@ -116,8 +117,15 @@ func TestGarbageCollection(t *testing.T) {
|
||||
// for a second.
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Wait for next iteration of garbage collection to finish
|
||||
err = bloomFilterService.RunOnce(ctx)
|
||||
// Wait for bloom filter observer to finish
|
||||
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
|
||||
|
||||
observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB)
|
||||
segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
|
||||
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments,
|
||||
[]rangedloop.Observer{observer})
|
||||
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
@ -166,7 +174,6 @@ func TestGarbageCollectionWithCopies(t *testing.T) {
|
||||
// configure filter uploader
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.AccessGrant = accessString
|
||||
bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
|
||||
|
||||
project, err := planet.Uplinks[0].OpenProject(ctx, satellite)
|
||||
require.NoError(t, err)
|
||||
@ -211,8 +218,15 @@ func TestGarbageCollectionWithCopies(t *testing.T) {
|
||||
afterTotalUsedByNodes := allSpaceUsedForPieces()
|
||||
require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes)
|
||||
|
||||
// Wait for next iteration of garbage collection to finish
|
||||
err = bloomFilterService.RunOnce(ctx)
|
||||
// Wait for bloom filter observer to finish
|
||||
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
|
||||
|
||||
observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB)
|
||||
segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
|
||||
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments,
|
||||
[]rangedloop.Observer{observer})
|
||||
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
@ -241,7 +255,7 @@ func TestGarbageCollectionWithCopies(t *testing.T) {
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// run GC
|
||||
err = bloomFilterService.RunOnce(ctx)
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
@ -268,7 +282,7 @@ func TestGarbageCollectionWithCopies(t *testing.T) {
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// run GC
|
||||
err = bloomFilterService.RunOnce(ctx)
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
@ -341,6 +355,10 @@ func TestGarbageCollection_PendingObject(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
||||
accessString, err := access.Serialize()
|
||||
require.NoError(t, err)
|
||||
|
||||
satellite := planet.Satellites[0]
|
||||
upl := planet.Uplinks[0]
|
||||
|
||||
@ -350,19 +368,25 @@ func TestGarbageCollection_PendingObject(t *testing.T) {
|
||||
segments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 1)
|
||||
require.Len(t, segments[0].Pieces, 1)
|
||||
|
||||
lastPieceCounts := map[storj.NodeID]int64{}
|
||||
pieceTracker := bloomfilter.NewPieceTracker(satellite.Log.Named("gc observer"), bloomfilter.Config{
|
||||
FalsePositiveRate: 0.000000001,
|
||||
InitialPieces: 10,
|
||||
}, lastPieceCounts)
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.AccessGrant = accessString
|
||||
config.Bucket = "bucket"
|
||||
config.FalsePositiveRate = 0.000000001
|
||||
config.InitialPieces = 10
|
||||
|
||||
err = satellite.Metabase.SegmentLoop.Join(ctx, pieceTracker)
|
||||
observer := bloomfilter.NewObserver(satellite.Log.Named("gc observer"), config, satellite.Overlay.DB)
|
||||
|
||||
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
|
||||
provider := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
|
||||
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, provider,
|
||||
[]rangedloop.Observer{observer})
|
||||
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotEmpty(t, pieceTracker.RetainInfos)
|
||||
info := pieceTracker.RetainInfos[planet.StorageNodes[0].ID()]
|
||||
require.NotEmpty(t, observer.TestingRetainInfos())
|
||||
info := observer.TestingRetainInfos()[planet.StorageNodes[0].ID()]
|
||||
require.NotNil(t, info)
|
||||
require.Equal(t, 1, info.Count)
|
||||
|
||||
|
@ -89,8 +89,6 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
||||
func (service *Service) RunOnce(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
loopStartTime := time.Now()
|
||||
|
||||
switch {
|
||||
case service.Config.AccessGrant == "":
|
||||
return errs.New("Access Grant is not set")
|
||||
@ -150,10 +148,10 @@ func (service *Service) RunOnce(ctx context.Context) (err error) {
|
||||
|
||||
if err != nil {
|
||||
// We store the error in the bucket and then continue with the next zip file.
|
||||
return service.moveToErrorPrefix(ctx, project, objectKey, err, loopStartTime)
|
||||
return service.moveToErrorPrefix(ctx, project, objectKey, err)
|
||||
}
|
||||
|
||||
return service.moveToSentPrefix(ctx, project, objectKey, loopStartTime)
|
||||
return service.moveToSentPrefix(ctx, project, objectKey)
|
||||
})
|
||||
}
|
||||
|
||||
@ -198,7 +196,7 @@ func (service *Service) sendRetainRequest(ctx context.Context, retainInfo *inter
|
||||
|
||||
// moveToErrorPrefix moves an object to prefix "error" and attaches the error to the metadata.
|
||||
func (service *Service) moveToErrorPrefix(
|
||||
ctx context.Context, project *uplink.Project, objectKey string, previousErr error, timeStamp time.Time,
|
||||
ctx context.Context, project *uplink.Project, objectKey string, previousErr error,
|
||||
) error {
|
||||
newObjectKey := "error-" + objectKey
|
||||
|
||||
@ -235,9 +233,9 @@ func (service *Service) uploadError(
|
||||
return upload.Commit()
|
||||
}
|
||||
|
||||
// moveToErrorPrefix moves an object to prefix "sent-[timestamp]".
|
||||
// moveToSentPrefix moves an object to prefix "sent".
|
||||
func (service *Service) moveToSentPrefix(
|
||||
ctx context.Context, project *uplink.Project, objectKey string, timeStamp time.Time,
|
||||
ctx context.Context, project *uplink.Project, objectKey string,
|
||||
) error {
|
||||
newObjectKey := "sent-" + objectKey
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/memory"
|
||||
@ -17,6 +18,7 @@ import (
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/uplink"
|
||||
@ -54,8 +56,14 @@ func TestSendRetainFilters(t *testing.T) {
|
||||
config.AccessGrant = accessString
|
||||
config.ZipBatchSize = 2
|
||||
|
||||
bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
|
||||
err = bloomFilterService.RunOnce(ctx)
|
||||
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
|
||||
|
||||
observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB)
|
||||
segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
|
||||
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments,
|
||||
[]rangedloop.Observer{observer})
|
||||
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
storageNode0 := planet.StorageNodes[0]
|
||||
@ -127,8 +135,14 @@ func TestSendRetainFiltersDisqualifedNode(t *testing.T) {
|
||||
config.AccessGrant = accessString
|
||||
config.ZipBatchSize = 2
|
||||
|
||||
bloomFilterService := bloomfilter.NewService(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB, planet.Satellites[0].Metabase.SegmentLoop)
|
||||
err = bloomFilterService.RunOnce(ctx)
|
||||
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
|
||||
|
||||
observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB)
|
||||
segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
|
||||
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments,
|
||||
[]rangedloop.Observer{observer})
|
||||
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
storageNode0 := planet.StorageNodes[0]
|
||||
|
@ -143,6 +143,7 @@ func (service *Service) RunOnce(ctx context.Context) (observerDurations []Observ
|
||||
rangeObservers := []*rangeObserverState{}
|
||||
for i, observerState := range observerStates {
|
||||
if observerState.err != nil {
|
||||
service.log.Debug("observer returned error", zap.Error(observerState.err))
|
||||
continue
|
||||
}
|
||||
rangeObserver, err := observerState.observer.Fork(ctx)
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"storj.io/storj/private/lifecycle"
|
||||
"storj.io/storj/satellite/accounting/nodetally"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
@ -62,10 +61,6 @@ type RangedLoop struct {
|
||||
Observer rangedloop.Observer
|
||||
}
|
||||
|
||||
GarbageCollectionBF struct {
|
||||
Observer rangedloop.Observer
|
||||
}
|
||||
|
||||
Accounting struct {
|
||||
NodeTallyObserver *nodetally.Observer
|
||||
}
|
||||
@ -149,10 +144,6 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
|
||||
)
|
||||
}
|
||||
|
||||
{ // setup garbage collection bloom filter observer
|
||||
peer.GarbageCollectionBF.Observer = bloomfilter.NewObserver(log.Named("gc-bf"), config.GarbageCollectionBF, db.OverlayCache())
|
||||
}
|
||||
|
||||
{ // setup ranged loop
|
||||
observers := []rangedloop.Observer{
|
||||
rangedloop.NewLiveCountObserver(metabaseDB, config.RangedLoop.SuspiciousProcessedRatio, config.RangedLoop.AsOfSystemInterval),
|
||||
@ -171,10 +162,6 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
|
||||
observers = append(observers, peer.GracefulExit.Observer)
|
||||
}
|
||||
|
||||
if config.GarbageCollectionBF.Enabled && config.GarbageCollectionBF.UseRangedLoop {
|
||||
observers = append(observers, peer.GarbageCollectionBF.Observer)
|
||||
}
|
||||
|
||||
if config.Repairer.UseRangedLoop {
|
||||
observers = append(observers, peer.Repair.Observer)
|
||||
}
|
||||
|
9
scripts/testdata/satellite-config.yaml.lock
vendored
9
scripts/testdata/satellite-config.yaml.lock
vendored
@ -454,9 +454,6 @@ contact.external-address: ""
|
||||
# Bucket which will be used to upload bloom filters
|
||||
# garbage-collection-bf.bucket: ""
|
||||
|
||||
# set if garbage collection bloom filters is enabled or not
|
||||
# garbage-collection-bf.enabled: true
|
||||
|
||||
# how long bloom filters will remain in the bucket for gc/sender to consume before being automatically deleted
|
||||
# garbage-collection-bf.expire-in: 336h0m0s
|
||||
|
||||
@ -466,15 +463,9 @@ contact.external-address: ""
|
||||
# the initial number of pieces expected for a storage node to have, used for creating a filter
|
||||
# garbage-collection-bf.initial-pieces: 400000
|
||||
|
||||
# the time between each garbage collection executions
|
||||
# garbage-collection-bf.interval: 120h0m0s
|
||||
|
||||
# set if garbage collection bloom filter process should only run once then exit
|
||||
# garbage-collection-bf.run-once: false
|
||||
|
||||
# whether to use ranged loop instead of segment loop
|
||||
# garbage-collection-bf.use-ranged-loop: false
|
||||
|
||||
# whether to use test GC SyncObserver with ranged loop
|
||||
# garbage-collection-bf.use-sync-observer: false
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user