cmd/satellite: add command for collecting GC bloom filters

We would like to have separate process/command to collect bloom
filters from source different than production DBs. Such process will
use segment loop to build bloom filters for all storage nodes and
will send it to Storj bucket. This change is extending satellite binary
with appropriete command.

New GC service for collecting bloom filter will be a subsequent
change.

Updates https://github.com/storj/team-metainfo/issues/120

Change-Id: Ibc03e119c340919cf468fc1f5a4f3d187bb3a5a1
This commit is contained in:
Michal Niewrzal 2022-08-29 16:16:48 +02:00
parent ea1408f7a8
commit 2ae1db53b6
2 changed files with 96 additions and 0 deletions

89
cmd/satellite/gc-bf.go Normal file
View File

@ -0,0 +1,89 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/process"
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/satellitedb"
)
func cmdGCBloomFilterRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
runCfg.Debug.Address = *process.DebugAddrFlag
identity, err := runCfg.Identity.Load()
if err != nil {
log.Error("Failed to load identity.", zap.Error(err))
return errs.New("Failed to load identity: %+v", err)
}
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-gc-bloomfilter"})
if err != nil {
return errs.New("Error starting master database on satellite GC: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL, metabase.Config{
ApplicationName: "satellite-gc-bloomfilter",
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
ServerSideCopy: runCfg.Config.Metainfo.ServerSideCopy,
})
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}
defer func() {
err = errs.Combine(err, metabaseDB.Close())
}()
revocationDB, err := revocation.OpenDBFromCfg(ctx, runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database GC: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
peer, err := satellite.NewGarbageCollectionBF(log, identity, db, metabaseDB, revocationDB, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
if err != nil {
return err
}
_, err = peer.Version.Service.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithHostname(ctx, log, nil); err != nil {
log.Warn("Failed to initialize telemetry batcher on satellite GC", zap.Error(err))
}
err = metabaseDB.CheckVersion(ctx)
if err != nil {
log.Error("Failed metabase database version check.", zap.Error(err))
return errs.New("failed metabase version check: %+v", err)
}
err = db.CheckVersion(ctx)
if err != nil {
log.Error("Failed satellite database version check.", zap.Error(err))
return errs.New("Error checking version for satellitedb: %+v", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}

View File

@ -116,6 +116,11 @@ var (
Short: "Run the satellite garbage collection process",
RunE: cmdGCRun,
}
runGCBloomFilterCmd = &cobra.Command{
Use: "garbage-collection-bloom-filters",
Short: "Run the satellite process which collects nodes bloom filters for garbage collection",
RunE: cmdGCBloomFilterRun,
}
setupCmd = &cobra.Command{
Use: "setup",
Short: "Create config files",
@ -320,6 +325,7 @@ func init() {
runCmd.AddCommand(runAdminCmd)
runCmd.AddCommand(runRepairerCmd)
runCmd.AddCommand(runGCCmd)
runCmd.AddCommand(runGCBloomFilterCmd)
rootCmd.AddCommand(setupCmd)
rootCmd.AddCommand(qdiagCmd)
rootCmd.AddCommand(reportsCmd)
@ -350,6 +356,7 @@ func init() {
process.Bind(runAdminCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runRepairerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runGCCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runGCBloomFilterCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(restoreTrashCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(registerLostSegments, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(fetchPiecesCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))