private/testplanet: integrate GC bloom filter service

We would like to have separate process/command to collect bloom
filters from source different than production DBs. Such process will
use segment loop to build bloom filters for all storage nodes and
will send it to Storj bucket.
This change adds integration with testplanet which makes writing
unit tests possible.

Updates https://github.com/storj/team-metainfo/issues/120

Change-Id: I7b335c5dafa8cffe265c56b75d8c8f8567580893
This commit is contained in:
Michal Niewrzal 2022-08-30 13:02:13 +02:00 committed by Storj Robot
parent ddc850dc21
commit d905931ed9
3 changed files with 63 additions and 4 deletions

View File

@ -41,6 +41,7 @@ import (
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
"storj.io/storj/satellite/mailservice"
@ -70,6 +71,7 @@ type Satellite struct {
Repairer *satellite.Repairer
Admin *satellite.Admin
GC *satellite.GarbageCollection
GCBF *satellite.GarbageCollectionBF
Log *zap.Logger
Identity *identity.FullIdentity
@ -134,7 +136,8 @@ type Satellite struct {
}
GarbageCollection struct {
Service *gc.Service
Service *gc.Service
BloomFilters *bloomfilter.Service
}
ExpiredDeletion struct {
@ -283,6 +286,7 @@ func (system *Satellite) Close() error {
system.Repairer.Close(),
system.Admin.Close(),
system.GC.Close(),
system.GCBF.Close(),
)
}
@ -305,6 +309,9 @@ func (system *Satellite) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(system.GC.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.GCBF.Run(ctx))
})
return group.Wait()
}
@ -527,18 +534,23 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}
gcBFPeer, err := planet.newGarbageCollectionBF(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}
if config.EmailReminders.Enable {
peer.Mail.EmailReminders.TestSetLinkAddress("http://" + api.Console.Listener.Addr().String() + "/")
}
return createNewSystem(prefix, log, config, peer, api, repairerPeer, adminPeer, gcPeer), nil
return createNewSystem(prefix, log, config, peer, api, repairerPeer, adminPeer, gcPeer, gcBFPeer), nil
}
// createNewSystem makes a new Satellite System and exposes the same interface from
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many processes.
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection) *Satellite {
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF) *Satellite {
system := &Satellite{
Name: name,
Config: config,
@ -547,6 +559,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
Repairer: repairerPeer,
Admin: adminPeer,
GC: gcPeer,
GCBF: gcBFPeer,
}
system.Log = log
system.Identity = peer.Identity
@ -587,6 +600,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Audit.Reporter = peer.Audit.Reporter
system.GarbageCollection.Service = gcPeer.GarbageCollection.Service
system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service
system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore
system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore
@ -683,6 +697,20 @@ func (planet *Planet) newGarbageCollection(ctx context.Context, index int, ident
return satellite.NewGarbageCollection(log, identity, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}
func (planet *Planet) newGarbageCollectionBF(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.GarbageCollectionBF, err error) {
defer mon.Task()(&ctx)(&err)
prefix := "satellite-gc-bf" + strconv.Itoa(index)
log := planet.log.Named(prefix)
revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewGarbageCollectionBF(log, identity, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}
// atLeastOne returns 1 if value < 1, or value otherwise.
func atLeastOne(value int) int {
if value < 1 {

View File

@ -21,7 +21,8 @@ var mon = monkit.Package()
// Config contains configurable values for garbage collection.
type Config struct {
Interval time.Duration `help:"the time between each garbage collection executions" releaseDefault:"120h" devDefault:"10m" testDefault:"$TESTINTERVAL"`
Enabled bool `help:"set if garbage collection bloom filters is enabled or not" releaseDefault:"true" devDefault:"true"`
// TODO service is not enabled by default for testing until will be finished
Enabled bool `help:"set if garbage collection bloom filters is enabled or not" default:"true" testDefault:"false"`
// value for InitialPieces currently based on average pieces per node
InitialPieces int `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`

View File

@ -0,0 +1,30 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package bloomfilter_test
import (
"testing"
"go.uber.org/zap"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
)
func TestGarbageCollectionBloomFilters(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.GarbageCollectionBF.Enabled = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// TODO test will be replaced with something more meaningful when service
// will be fully implemented
planet.Satellites[0].GarbageCollection.BloomFilters.Loop.Pause()
planet.Satellites[0].GarbageCollection.BloomFilters.Loop.TriggerWait()
})
}