diff --git a/satellite/gc/piecetracker/observer.go b/satellite/gc/piecetracker/observer.go new file mode 100644 index 000000000..2d720255a --- /dev/null +++ b/satellite/gc/piecetracker/observer.go @@ -0,0 +1,135 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package piecetracker + +import ( + "context" + "time" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/storj" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/overlay" +) + +var ( + // Error is a standard error class for this package. + Error = errs.Class("piecetracker") + mon = monkit.Package() + + // check if Observer and Partial interfaces are satisfied. + _ rangedloop.Observer = (*Observer)(nil) + _ rangedloop.Partial = (*observerFork)(nil) +) + +// Observer implements piecetraker ranged loop observer. +// +// The piecetracker counts the number of pieces currently expected to reside on each node, +// then passes the counts to the overlay with UpdatePieceCounts(). +type Observer struct { + log *zap.Logger + config Config + overlay overlay.DB + metabaseDB *metabase.DB + + pieceCounts map[metabase.NodeAlias]int64 +} + +// NewObserver creates new piecetracker ranged loop observer. +func NewObserver(log *zap.Logger, metabaseDB *metabase.DB, overlay overlay.DB, config Config) *Observer { + return &Observer{ + log: log, + overlay: overlay, + metabaseDB: metabaseDB, + config: config, + pieceCounts: map[metabase.NodeAlias]int64{}, + } +} + +// Start implements ranged loop observer start method. +func (observer *Observer) Start(ctx context.Context, time time.Time) (err error) { + defer mon.Task()(&ctx)(&err) + + observer.pieceCounts = map[metabase.NodeAlias]int64{} + return nil +} + +// Fork implements ranged loop observer fork method. +func (observer *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) { + defer mon.Task()(&ctx)(&err) + + return newObserverFork(), nil +} + +// Join joins piecetracker ranged loop partial to main observer updating piece counts map. +func (observer *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) { + defer mon.Task()(&ctx)(&err) + pieceTracker, ok := partial.(*observerFork) + if !ok { + return Error.New("expected %T but got %T", pieceTracker, partial) + } + + // Merge piece counts for each node. + for nodeAlias, pieceCount := range pieceTracker.pieceCounts { + observer.pieceCounts[nodeAlias] += pieceCount + } + + return nil +} + +// Finish updates piece counts in the DB. +func (observer *Observer) Finish(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + observer.log.Info("piecetracker observer finished") + + nodeAliasMap, err := observer.metabaseDB.LatestNodesAliasMap(ctx) + pieceCounts := make(map[storj.NodeID]int64, len(observer.pieceCounts)) + + for nodeAlias, count := range observer.pieceCounts { + nodeID, ok := nodeAliasMap.Node(nodeAlias) + if !ok { + observer.log.Error("unrecognized node alias in piecetracker ranged-loop", zap.Int32("node-alias", int32(nodeAlias))) + continue + } + pieceCounts[nodeID] = count + } + err = observer.overlay.UpdatePieceCounts(ctx, pieceCounts) + if err != nil { + observer.log.Error("error updating piece counts", zap.Error(err)) + return Error.Wrap(err) + } + + return nil +} + +type observerFork struct { + pieceCounts map[metabase.NodeAlias]int64 +} + +// newObserverFork creates new piecetracker ranged loop fork. +func newObserverFork() *observerFork { + return &observerFork{ + pieceCounts: map[metabase.NodeAlias]int64{}, + } +} + +// Process iterates over segment range updating partial piece counts for each node. +func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error { + for _, segment := range segments { + if segment.Inline() { + continue + } + + for _, piece := range segment.AliasPieces { + fork.pieceCounts[piece.Alias]++ + } + } + + return nil +} diff --git a/satellite/gc/piecetracker/observer_test.go b/satellite/gc/piecetracker/observer_test.go new file mode 100644 index 000000000..0979aeabf --- /dev/null +++ b/satellite/gc/piecetracker/observer_test.go @@ -0,0 +1,82 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package piecetracker_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "storj.io/common/memory" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" +) + +func TestObserverPieceTracker(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.PieceTracker.UseRangedLoop = true + config.RangedLoop.Parallelism = 4 + config.RangedLoop.BatchSize = 4 + + // configure RS + config.Metainfo.RS.Min = 2 + config.Metainfo.RS.Repair = 3 + config.Metainfo.RS.Success = 4 + config.Metainfo.RS.Total = 4 + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + // ensure that the piece counts are empty + pieceCounts, err := planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(pieceCounts)) + + // Setup: create 50KiB of data for the uplink to upload + testdata := testrand.Bytes(50 * memory.KiB) + + testBucket := "testbucket" + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], testBucket, "test/path", testdata) + require.NoError(t, err) + + // Run the ranged loop + _, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + + // Check that the piece counts are correct + pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx) + require.NoError(t, err) + require.True(t, len(pieceCounts) > 0) + + for node, count := range pieceCounts { + require.Equal(t, int64(1), count, "node %s should have 1 piece", node) + } + + // upload more objects + numOfObjects := 10 + for i := 0; i < numOfObjects; i++ { + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], testBucket, fmt.Sprintf("test/path%d", i), testdata) + require.NoError(t, err) + } + + // Run the ranged loop again + _, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) + + // Check that the piece counts are correct + pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx) + require.NoError(t, err) + require.True(t, len(pieceCounts) > 0) + + for node, count := range pieceCounts { + require.Equal(t, int64(numOfObjects+1), count, "node %s should have %d pieces", node, numOfObjects+1) + } + }) +} diff --git a/satellite/gc/piecetracker/piecetracker.go b/satellite/gc/piecetracker/piecetracker.go new file mode 100644 index 000000000..1bcb1f77c --- /dev/null +++ b/satellite/gc/piecetracker/piecetracker.go @@ -0,0 +1,9 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package piecetracker + +// Config is the configuration for the piecetracker. +type Config struct { + UseRangedLoop bool `help:"whether to enable piece tracker observer with ranged loop" default:"true"` +} diff --git a/satellite/peer.go b/satellite/peer.go index 6e1f877ae..9ee6582ec 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -43,6 +43,7 @@ import ( "storj.io/storj/satellite/console/userinfo" "storj.io/storj/satellite/contact" "storj.io/storj/satellite/gc/bloomfilter" + "storj.io/storj/satellite/gc/piecetracker" "storj.io/storj/satellite/gc/sender" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/mailservice" @@ -215,6 +216,8 @@ type Config struct { ProjectLimit accounting.ProjectLimitConfig Analytics analytics.Config + + PieceTracker piecetracker.Config } func setupMailService(log *zap.Logger, config Config) (*mailservice.Service, error) { diff --git a/satellite/rangedloop.go b/satellite/rangedloop.go index 824a62391..5faf0fa36 100644 --- a/satellite/rangedloop.go +++ b/satellite/rangedloop.go @@ -18,6 +18,7 @@ import ( "storj.io/storj/private/lifecycle" "storj.io/storj/satellite/accounting/nodetally" "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/gc/piecetracker" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" @@ -65,6 +66,10 @@ type RangedLoop struct { NodeTallyObserver *nodetally.Observer } + PieceTracker struct { + Observer *piecetracker.Observer + } + RangedLoop struct { Service *rangedloop.Service } @@ -124,6 +129,15 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf metabaseDB) } + { // setup piece tracker observer + peer.PieceTracker.Observer = piecetracker.NewObserver( + log.Named("piecetracker"), + metabaseDB, + peer.DB.OverlayCache(), + config.PieceTracker, + ) + } + { // setup overlay peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.DB.OverlayCache(), peer.DB.NodeEvents(), config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay) if err != nil { @@ -167,6 +181,10 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf observers = append(observers, peer.Repair.Observer) } + if config.PieceTracker.UseRangedLoop { + observers = append(observers, peer.PieceTracker.Observer) + } + segments := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize) peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, segments, observers) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index e2ba9245b..c66915e97 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -889,6 +889,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # price user should pay for storage per month in dollars/TB # payments.usage-price.storage-tb: "4" +# whether to enable piece tracker observer with ranged loop +# piece-tracker.use-ranged-loop: true + # how often to remove unused project bandwidth rollups # project-bw-cleanup.interval: 24h0m0s