satellite/gc: move garbage collection to its own process

Change-Id: I7235aa83f7c641e31c62ba9d42192b2232dca4a5
This commit is contained in:
Jessica Grebenschikov 2020-03-12 08:40:22 -07:00 committed by Jess G
parent 19685ad81a
commit 5142874144
8 changed files with 349 additions and 18 deletions

77
cmd/satellite/gc.go Normal file
View File

@ -0,0 +1,77 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/process"
"storj.io/storj/pkg/revocation"
"storj.io/storj/private/version"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/satellitedb"
)
func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
log := zap.L()
runCfg.Debug.Address = *process.DebugAddrFlag
identity, err := runCfg.Identity.Load()
if err != nil {
zap.S().Fatal(err)
}
db, err := satellitedb.New(log.Named("db"), runCfg.Database, satellitedb.Options{})
if err != nil {
return errs.New("Error starting master database on satellite GC: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
pointerDB, err := metainfo.NewStore(log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating pointerDB GC: %+v", err)
}
defer func() {
err = errs.Combine(err, pointerDB.Close())
}()
revocationDB, err := revocation.NewDBFromCfg(runCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database GC: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
peer, err := satellite.NewGarbageCollection(log, identity, db, pointerDB, revocationDB, version.Build, &runCfg.Config)
if err != nil {
return err
}
_, err = peer.Version.Service.CheckVersion(ctx)
if err != nil {
return err
}
if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Warn("Failed to initialize telemetry batcher on satellite GC: ", err)
}
err = db.CheckVersion(ctx)
if err != nil {
zap.S().Fatal("failed satellite database version check for GC: ", err)
return errs.New("Error checking version for satellitedb for GC: %+v", err)
}
runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}

View File

@ -84,6 +84,11 @@ var (
Short: "Run the satellite Admin",
RunE: cmdAdminRun,
}
runGCCmd = &cobra.Command{
Use: "garbage-collection",
Short: "Run the satellite garbage collection process",
RunE: cmdGCRun,
}
setupCmd = &cobra.Command{
Use: "setup",
Short: "Create config files",
@ -171,6 +176,7 @@ func init() {
runCmd.AddCommand(runAPICmd)
runCmd.AddCommand(runAdminCmd)
runCmd.AddCommand(runRepairerCmd)
runCmd.AddCommand(runGCCmd)
rootCmd.AddCommand(setupCmd)
rootCmd.AddCommand(qdiagCmd)
rootCmd.AddCommand(reportsCmd)
@ -184,6 +190,7 @@ func init() {
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runAdminCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runRepairerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runGCCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))

View File

@ -68,6 +68,7 @@ const (
debugAdminHTTP = 6
debugPeerHTTP = 7
debugRepairerHTTP = 8
debugGCHTTP = 10
)
// port creates a port with a consistent format for storj-sim services.
@ -372,6 +373,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugPeerHTTP)),
},
})
apiProcess.WaitForExited(migrationProcess)
coreProcess := processes.New(Info{
Name: fmt.Sprintf("satellite-core/%d", i),
@ -413,7 +415,18 @@ func newNetwork(flags *Flags) (*Processes, error) {
})
repairProcess.WaitForExited(migrationProcess)
apiProcess.WaitForExited(migrationProcess)
garbageCollectionProcess := processes.New(Info{
Name: fmt.Sprintf("satellite-garbage-collection/%d", i),
Executable: "satellite",
Directory: filepath.Join(processes.Directory, "satellite", fmt.Sprint(i)),
})
garbageCollectionProcess.Arguments = withCommon(apiProcess.Directory, Arguments{
"run": {
"garbage-collection",
"--debug.addr", net.JoinHostPort(host, port(satellitePeer, i, debugGCHTTP)),
},
})
garbageCollectionProcess.WaitForExited(migrationProcess)
}
// Create gateways for each satellite

View File

@ -64,6 +64,7 @@ type SatelliteSystem struct {
API *satellite.API
Repairer *satellite.Repairer
Admin *satellite.Admin
GC *satellite.GarbageCollection
Log *zap.Logger
Identity *identity.FullIdentity
@ -196,6 +197,7 @@ func (system *SatelliteSystem) Close() error {
system.Core.Close(),
system.Repairer.Close(),
system.Admin.Close(),
system.GC.Close(),
)
}
@ -215,6 +217,9 @@ func (system *SatelliteSystem) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(system.Admin.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.GC.Run(ctx))
})
return group.Wait()
}
@ -505,9 +510,14 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
return xs, err
}
gcPeer, err := planet.newGarbageCollection(i, identity, db, pointerDB, config, versionInfo)
if err != nil {
return xs, err
}
log.Debug("id=" + peer.ID().String() + " addr=" + api.Addr())
system := createNewSystem(log, peer, api, repairerPeer, adminPeer)
system := createNewSystem(log, peer, api, repairerPeer, adminPeer, gcPeer)
xs = append(xs, system)
}
return xs, nil
@ -517,12 +527,13 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many prrocesses.
func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin) *SatelliteSystem {
func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection) *SatelliteSystem {
system := &SatelliteSystem{
Core: peer,
API: api,
Repairer: repairerPeer,
Admin: adminPeer,
GC: gcPeer,
}
system.Log = log
system.Identity = peer.Identity
@ -559,7 +570,7 @@ func createNewSystem(log *zap.Logger, peer *satellite.Core, api *satellite.API,
system.Audit.Verifier = peer.Audit.Verifier
system.Audit.Reporter = peer.Audit.Reporter
system.GarbageCollection.Service = peer.GarbageCollection.Service
system.GarbageCollection.Service = gcPeer.GarbageCollection.Service
system.DBCleanup.Chore = peer.DBCleanup.Chore
@ -645,3 +656,15 @@ type rollupsWriteCacheCloser struct {
func (cache rollupsWriteCacheCloser) Close() error {
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
}
func (planet *Planet) newGarbageCollection(count int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
prefix := "satellite-gc" + strconv.Itoa(count)
log := planet.log.Named(prefix)
revocationDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewGarbageCollection(log, identity, db, pointerDB, revocationDB, versionInfo, &config)
}

View File

@ -359,20 +359,22 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
}
{ // setup garbage collection
peer.GarbageCollection.Service = gc.NewService(
peer.Log.Named("garbage-collection"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
peer.Metainfo.Loop,
)
peer.Services.Add(lifecycle.Item{
Name: "garbage-collection",
Run: peer.GarbageCollection.Service.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Garbage Collection", peer.GarbageCollection.Service.Loop))
{ // setup garbage collection if configured to run with the core
if config.GarbageCollection.RunInCore {
peer.GarbageCollection.Service = gc.NewService(
peer.Log.Named("core-garbage-collection"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
peer.Metainfo.Loop,
)
peer.Services.Add(lifecycle.Item{
Name: "core-garbage-collection",
Run: peer.GarbageCollection.Service.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Core Garbage Collection", peer.GarbageCollection.Service.Loop))
}
}
{ // setup db cleanup

205
satellite/gc.go Normal file
View File

@ -0,0 +1,205 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellite
import (
"context"
"errors"
"net"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/identity"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/storj/pkg/debug"
"storj.io/storj/private/lifecycle"
"storj.io/storj/private/version"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/overlay"
)
// GarbageCollection is the satellite garbage collection process
//
// architecture: Peer
type GarbageCollection struct {
Log *zap.Logger
Identity *identity.FullIdentity
DB DB
Servers *lifecycle.Group
Services *lifecycle.Group
Dialer rpc.Dialer
Version struct {
Chore *version_checker.Chore
Service *version_checker.Service
}
Debug struct {
Listener net.Listener
Server *debug.Server
}
Overlay struct {
DB overlay.DB
}
Metainfo struct {
Database metainfo.PointerDB
Loop *metainfo.Loop
}
GarbageCollection struct {
Service *gc.Service
}
Metrics struct {
Chore *metrics.Chore
}
}
// NewGarbageCollection creates a new satellite garbage collection process
func NewGarbageCollection(log *zap.Logger, full *identity.FullIdentity, db DB,
pointerDB metainfo.PointerDB, revocationDB extensions.RevocationDB,
versionInfo version.Info, config *Config) (*GarbageCollection, error) {
peer := &GarbageCollection{
Log: log,
Identity: full,
DB: db,
Servers: lifecycle.NewGroup(log.Named("servers")),
Services: lifecycle.NewGroup(log.Named("services")),
}
{ // setup debug
var err error
if config.Debug.Address != "" {
peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address)
if err != nil {
withoutStack := errors.New(err.Error())
peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack))
err = nil
}
}
debugConfig := config.Debug
debugConfig.ControlTitle = "GC"
peer.Debug.Server = debug.NewServer(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig)
peer.Servers.Add(lifecycle.Item{
Name: "debug",
Run: peer.Debug.Server.Run,
Close: peer.Debug.Server.Close,
})
}
{ // setup version control
if !versionInfo.IsZero() {
peer.Log.Sugar().Debugf("Binary Version: %s with CommitHash %s, built at %s as Release %v",
versionInfo.Version.String(), versionInfo.CommitHash, versionInfo.Timestamp.String(), versionInfo.Release)
}
peer.Version.Service = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
peer.Version.Chore = version_checker.NewChore(peer.Version.Service, config.Version.CheckInterval)
peer.Services.Add(lifecycle.Item{
Name: "version",
Run: peer.Version.Chore.Run,
})
}
{ // setup listener and server
log.Debug("Starting listener and server for GC")
sc := config.Server
tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
}
{ // setup overlay
peer.Overlay.DB = peer.DB.OverlayCache()
}
{ // setup metainfo
peer.Metainfo.Database = pointerDB
// Garbage Collection creates its own instance of the metainfo loop here. Since
// GC runs infrequently, this shouldn'tt add too much extra load on the metainfo db.
// As long as garbage collection is the only observer joining the metainfo loop, then by default
// the metainfo loop will only run when the garbage collection joins (which happens every GarbageCollection.Interval)
peer.Metainfo.Loop = metainfo.NewLoop(config.Metainfo.Loop, peer.Metainfo.Database)
peer.Services.Add(lifecycle.Item{
Name: "metainfo:loop",
Run: peer.Metainfo.Loop.Run,
Close: peer.Metainfo.Loop.Close,
})
}
{ // setup garbage collection
peer.GarbageCollection.Service = gc.NewService(
peer.Log.Named("garbage-collection"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
peer.Metainfo.Loop,
)
peer.Services.Add(lifecycle.Item{
Name: "garbage-collection",
Run: peer.GarbageCollection.Service.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Garbage Collection", peer.GarbageCollection.Service.Loop))
}
{ // setup metrics service
peer.Metrics.Chore = metrics.NewChore(
peer.Log.Named("metrics"),
config.Metrics,
peer.Metainfo.Loop,
)
peer.Services.Add(lifecycle.Item{
Name: "metrics",
Run: peer.Metrics.Chore.Run,
Close: peer.Metrics.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Metrics", peer.Metrics.Chore.Loop))
}
return peer, nil
}
// Run runs satellite garbage collection until it's either closed or it errors.
func (peer *GarbageCollection) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
group, ctx := errgroup.WithContext(ctx)
peer.Servers.Run(ctx, group)
peer.Services.Run(ctx, group)
return group.Wait()
}
// Close closes all the resources.
func (peer *GarbageCollection) Close() error {
return errs.Combine(
peer.Servers.Close(),
peer.Services.Close(),
)
}
// ID returns the peer ID.
func (peer *GarbageCollection) ID() storj.NodeID { return peer.Identity.ID }

View File

@ -32,6 +32,7 @@ type Config struct {
Interval time.Duration `help:"the time between each send of garbage collection filters to storage nodes" releaseDefault:"120h" devDefault:"10m"`
Enabled bool `help:"set if garbage collection is enabled or not" releaseDefault:"true" devDefault:"true"`
SkipFirst bool `help:"if true, skip the first run of GC" releaseDefault:"true" devDefault:"false"`
RunInCore bool `help:"if true, run garbage collection as part of the core" releaseDefault:"true" devDefault:"false"`
// value for InitialPieces currently based on average pieces per node
InitialPieces int `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
FalsePositiveRate float64 `help:"the false positive rate used for creating a garbage collection bloom filter" releaseDefault:"0.1" devDefault:"0.1"`

View File

@ -142,6 +142,9 @@ contact.external-address: ""
# the amount of time to allow a node to handle a retain request
# garbage-collection.retain-send-timeout: 1m0s
# if true, run garbage collection as part of the core
# garbage-collection.run-in-core: true
# if true, skip the first run of GC
# garbage-collection.skip-first: true