satellite/gc/bloomfilter: add service to collect bloom filters

We would like to have separate process/command to collect bloom
filters from source different than production DBs. Such process will
use segment loop to build bloom filters for all storage nodes and
will send it to Storj bucket. This this initial change to add such
service. Added service is joining segment loop and collects all
bloom filters.

Sending bloom filters to the bucket will be added as a subsequent
change.

Updates https://github.com/storj/team-metainfo/issues/120

Change-Id: I2551723605afa41bec84826b0c647cd1f61f3b14
This commit is contained in:
Michal Niewrzal 2022-08-30 10:51:11 +02:00
parent 1613d37466
commit 68f6d93f29
5 changed files with 222 additions and 3 deletions

View File

@ -21,6 +21,7 @@ import (
"storj.io/private/version"
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
@ -56,7 +57,7 @@ type GarbageCollectionBF struct {
}
GarbageCollection struct {
// TODO add service when will be ready
Service *bloomfilter.Service
}
}
@ -125,7 +126,20 @@ func NewGarbageCollectionBF(log *zap.Logger, full *identity.FullIdentity, db DB,
})
}
// TODO setup garbage collection bloom filters
{ // setup garbage collection bloom filters
peer.GarbageCollection.Service = bloomfilter.NewService(
peer.Log.Named("garbage-collection-bf"),
config.GarbageCollectionBF,
peer.Overlay.DB,
peer.Metainfo.SegmentLoop,
)
peer.Services.Add(lifecycle.Item{
Name: "garbage-collection-bf",
Run: peer.GarbageCollection.Service.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Garbage Collection Bloom Filters", peer.GarbageCollection.Service.Loop))
}
return peer, nil
}

View File

@ -0,0 +1,99 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package bloomfilter
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase/segmentloop"
)
var _ segmentloop.Observer = (*PieceTracker)(nil)
// RetainInfo contains info needed for a storage node to retain important data and delete garbage data.
type RetainInfo struct {
Filter *bloomfilter.Filter
CreationDate time.Time
Count int
}
// PieceTracker implements the metainfo loop observer interface for garbage collection.
//
// architecture: Observer
type PieceTracker struct {
log *zap.Logger
config Config
creationDate time.Time
// TODO: should we use int or int64 consistently for piece count (db type is int64)?
pieceCounts map[storj.NodeID]int
RetainInfos map[storj.NodeID]*RetainInfo
}
// NewPieceTracker instantiates a new gc piece tracker to be subscribed to the metainfo loop.
func NewPieceTracker(log *zap.Logger, config Config, pieceCounts map[storj.NodeID]int) *PieceTracker {
return &PieceTracker{
log: log,
config: config,
creationDate: time.Now().UTC(),
pieceCounts: pieceCounts,
RetainInfos: make(map[storj.NodeID]*RetainInfo, len(pieceCounts)),
}
}
// LoopStarted is called at each start of a loop.
func (pieceTracker *PieceTracker) LoopStarted(ctx context.Context, info segmentloop.LoopInfo) (err error) {
if pieceTracker.creationDate.After(info.Started) {
return errs.New("Creation date after loop starting time.")
}
return nil
}
// RemoteSegment takes a remote segment found in metabase and adds pieces to bloom filters.
func (pieceTracker *PieceTracker) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error {
// we are expliticy not adding monitoring here as we are tracking loop observers separately
deriver := segment.RootPieceID.Deriver()
for _, piece := range segment.Pieces {
pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number))
pieceTracker.add(piece.StorageNode, pieceID)
}
return nil
}
// add adds a pieceID to the relevant node's RetainInfo.
func (pieceTracker *PieceTracker) add(nodeID storj.NodeID, pieceID storj.PieceID) {
info, ok := pieceTracker.RetainInfos[nodeID]
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := pieceTracker.config.InitialPieces
if pieceTracker.pieceCounts[nodeID] > 0 {
numPieces = pieceTracker.pieceCounts[nodeID]
}
// limit size of bloom filter to ensure we are under the limit for RPC
filter := bloomfilter.NewOptimalMaxSize(numPieces, pieceTracker.config.FalsePositiveRate, 2*memory.MiB)
info = &RetainInfo{
Filter: filter,
CreationDate: pieceTracker.creationDate,
}
pieceTracker.RetainInfos[nodeID] = info
}
info.Filter.Add(pieceID)
info.Count++
}
// InlineSegment returns nil because we're only doing gc for storage nodes for now.
func (pieceTracker *PieceTracker) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}

View File

@ -0,0 +1,92 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package bloomfilter
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
)
var mon = monkit.Package()
// Config contains configurable values for garbage collection.
type Config struct {
Interval time.Duration `help:"the time between each garbage collection executions" releaseDefault:"120h" devDefault:"10m" testDefault:"$TESTINTERVAL"`
Enabled bool `help:"set if garbage collection bloom filters is enabled or not" releaseDefault:"true" devDefault:"true"`
// value for InitialPieces currently based on average pieces per node
InitialPieces int `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`
}
// Service implements the garbage collection service.
//
// architecture: Chore
type Service struct {
log *zap.Logger
config Config
Loop *sync2.Cycle
overlay overlay.DB
segmentLoop *segmentloop.Service
}
// NewService creates a new instance of the gc service.
func NewService(log *zap.Logger, config Config, overlay overlay.DB, loop *segmentloop.Service) *Service {
return &Service{
log: log,
config: config,
Loop: sync2.NewCycle(config.Interval),
overlay: overlay,
segmentLoop: loop,
}
}
// Run starts the gc loop service.
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
if !service.config.Enabled {
return nil
}
// load last piece counts from overlay db
lastPieceCounts, err := service.overlay.AllPieceCounts(ctx)
if err != nil {
service.log.Error("error getting last piece counts", zap.Error(err))
err = nil
}
if lastPieceCounts == nil {
lastPieceCounts = make(map[storj.NodeID]int)
}
return service.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Debug("collecting bloom filters started")
pieceTracker := NewPieceTracker(service.log.Named("gc observer"), service.config, lastPieceCounts)
// collect things to retain
err = service.segmentLoop.Join(ctx, pieceTracker)
if err != nil {
service.log.Error("error joining metainfoloop", zap.Error(err))
return nil
}
// TODO send bloom filters to the bucket
service.log.Debug("collecting bloom filters finished")
return nil
})
}

View File

@ -38,6 +38,7 @@ import (
"storj.io/storj/satellite/console/restkeys"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/mailservice/simulate"
@ -148,7 +149,8 @@ type Config struct {
Repairer repairer.Config
Audit audit.Config
GarbageCollection gc.Config
GarbageCollection gc.Config
GarbageCollectionBF bloomfilter.Config
ExpiredDeletion expireddeletion.Config
ZombieDeletion zombiedeletion.Config

View File

@ -379,6 +379,18 @@ contact.external-address: ""
# how many expired objects to query in a batch
# expired-deletion.list-limit: 100
# set if garbage collection bloom filters is enabled or not
# garbage-collection-bf.enabled: true
# the false positive rate used for creating a garbage collection bloom filter
# garbage-collection-bf.false-positive-rate: 0.1
# the initial number of pieces expected for a storage node to have, used for creating a filter
# garbage-collection-bf.initial-pieces: 400000
# the time between each garbage collection executions
# garbage-collection-bf.interval: 120h0m0s
# the number of nodes to concurrently send garbage collection bloom filters to
# garbage-collection.concurrent-sends: 1