satellite: move GC sender to Core peer

Having separate process/pod only for sending bloom filters once a week
is a bit waste. After reconfiguring production settings to use sender in
core we can remove also GC sender peer code.

https://github.com/storj/storj/issues/5979

Change-Id: I6efe3ec073f96545e1f70ad13843f8ccdf923ee8
This commit is contained in:
Michal Niewrzal 2023-06-30 11:18:35 +02:00 committed by Michał Niewrzał
parent a9d979e4d7
commit 1f92e7acda
2 changed files with 24 additions and 28 deletions

View File

@ -69,7 +69,6 @@ type Satellite struct {
Repairer *satellite.Repairer
Auditor *satellite.Auditor
Admin *satellite.Admin
GC *satellite.GarbageCollection
GCBF *satellite.GarbageCollectionBF
RangedLoop *satellite.RangedLoop
@ -285,7 +284,6 @@ func (system *Satellite) Close() error {
system.Repairer.Close(),
system.Auditor.Close(),
system.Admin.Close(),
system.GC.Close(),
system.GCBF.Close(),
)
}
@ -309,9 +307,6 @@ func (system *Satellite) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(system.Admin.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.GC.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.GCBF.Run(ctx))
})
@ -539,11 +534,6 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, errs.Wrap(err)
}
gcPeer, err := planet.newGarbageCollection(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
}
gcBFPeer, err := planet.newGarbageCollectionBF(ctx, index, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, errs.Wrap(err)
@ -558,14 +548,14 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
peer.Mail.EmailReminders.TestSetLinkAddress("http://" + api.Console.Listener.Addr().String() + "/")
}
return createNewSystem(prefix, log, config, peer, api, repairerPeer, auditorPeer, adminPeer, gcPeer, gcBFPeer, rangedLoopPeer), nil
return createNewSystem(prefix, log, config, peer, api, repairerPeer, auditorPeer, adminPeer, gcBFPeer, rangedLoopPeer), nil
}
// createNewSystem makes a new Satellite System and exposes the same interface from
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many processes.
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, auditorPeer *satellite.Auditor, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF, rangedLoopPeer *satellite.RangedLoop) *Satellite {
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, auditorPeer *satellite.Auditor, adminPeer *satellite.Admin, gcBFPeer *satellite.GarbageCollectionBF, rangedLoopPeer *satellite.RangedLoop) *Satellite {
system := &Satellite{
Name: name,
Config: config,
@ -574,7 +564,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
Repairer: repairerPeer,
Auditor: auditorPeer,
Admin: adminPeer,
GC: gcPeer,
GCBF: gcBFPeer,
RangedLoop: rangedLoopPeer,
}
@ -622,7 +611,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Audit.Reporter = auditorPeer.Audit.Reporter
system.Audit.ContainmentSyncChore = peer.Audit.ContainmentSyncChore
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender
system.GarbageCollection.Sender = peer.GarbageCollection.Sender
system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore
system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore
@ -713,20 +702,6 @@ func (cache rollupsWriteCacheCloser) Close() error {
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
}
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.GarbageCollection, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-gc" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewGarbageCollection(log, identity, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}
func (planet *Planet) newGarbageCollectionBF(ctx context.Context, index int, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.GarbageCollectionBF, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -34,6 +34,7 @@ import (
"storj.io/storj/satellite/console/consoleauth"
"storj.io/storj/satellite/console/dbcleanup"
"storj.io/storj/satellite/console/emailreminders"
"storj.io/storj/satellite/gc/sender"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/zombiedeletion"
@ -142,6 +143,10 @@ type Core struct {
ConsoleDBCleanup struct {
Chore *dbcleanup.Chore
}
GarbageCollection struct {
Sender *sender.Service
}
}
// New creates a new satellite.
@ -564,6 +569,22 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
})
}
{ // setup garbage collection
peer.GarbageCollection.Sender = sender.NewService(
peer.Log.Named("gc-sender"),
config.GarbageCollection,
peer.Dialer,
peer.Overlay.DB,
)
peer.Services.Add(lifecycle.Item{
Name: "gc-sender",
Run: peer.GarbageCollection.Sender.Run,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Garbage Collection", peer.GarbageCollection.Sender.Loop))
}
return peer, nil
}