satellite/gc: add piece tracker ranged loop observer

Resolves https://github.com/storj/storj/issues/5798

Change-Id: I6fe2c57b3a247b085026feb8bee60c2d002db71b
This commit is contained in:
Clement Sam 2023-06-13 14:03:54 +00:00 committed by Storj Robot
parent 2b2bca8e81
commit 1166fdfbab
6 changed files with 250 additions and 0 deletions

View File

@ -0,0 +1,135 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package piecetracker
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/overlay"
)
var (
// Error is a standard error class for this package.
Error = errs.Class("piecetracker")
mon = monkit.Package()
// check if Observer and Partial interfaces are satisfied.
_ rangedloop.Observer = (*Observer)(nil)
_ rangedloop.Partial = (*observerFork)(nil)
)
// Observer implements piecetraker ranged loop observer.
//
// The piecetracker counts the number of pieces currently expected to reside on each node,
// then passes the counts to the overlay with UpdatePieceCounts().
type Observer struct {
log *zap.Logger
config Config
overlay overlay.DB
metabaseDB *metabase.DB
pieceCounts map[metabase.NodeAlias]int64
}
// NewObserver creates new piecetracker ranged loop observer.
func NewObserver(log *zap.Logger, metabaseDB *metabase.DB, overlay overlay.DB, config Config) *Observer {
return &Observer{
log: log,
overlay: overlay,
metabaseDB: metabaseDB,
config: config,
pieceCounts: map[metabase.NodeAlias]int64{},
}
}
// Start implements ranged loop observer start method.
func (observer *Observer) Start(ctx context.Context, time time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
observer.pieceCounts = map[metabase.NodeAlias]int64{}
return nil
}
// Fork implements ranged loop observer fork method.
func (observer *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
defer mon.Task()(&ctx)(&err)
return newObserverFork(), nil
}
// Join joins piecetracker ranged loop partial to main observer updating piece counts map.
func (observer *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
pieceTracker, ok := partial.(*observerFork)
if !ok {
return Error.New("expected %T but got %T", pieceTracker, partial)
}
// Merge piece counts for each node.
for nodeAlias, pieceCount := range pieceTracker.pieceCounts {
observer.pieceCounts[nodeAlias] += pieceCount
}
return nil
}
// Finish updates piece counts in the DB.
func (observer *Observer) Finish(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
observer.log.Info("piecetracker observer finished")
nodeAliasMap, err := observer.metabaseDB.LatestNodesAliasMap(ctx)
pieceCounts := make(map[storj.NodeID]int64, len(observer.pieceCounts))
for nodeAlias, count := range observer.pieceCounts {
nodeID, ok := nodeAliasMap.Node(nodeAlias)
if !ok {
observer.log.Error("unrecognized node alias in piecetracker ranged-loop", zap.Int32("node-alias", int32(nodeAlias)))
continue
}
pieceCounts[nodeID] = count
}
err = observer.overlay.UpdatePieceCounts(ctx, pieceCounts)
if err != nil {
observer.log.Error("error updating piece counts", zap.Error(err))
return Error.Wrap(err)
}
return nil
}
type observerFork struct {
pieceCounts map[metabase.NodeAlias]int64
}
// newObserverFork creates new piecetracker ranged loop fork.
func newObserverFork() *observerFork {
return &observerFork{
pieceCounts: map[metabase.NodeAlias]int64{},
}
}
// Process iterates over segment range updating partial piece counts for each node.
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error {
for _, segment := range segments {
if segment.Inline() {
continue
}
for _, piece := range segment.AliasPieces {
fork.pieceCounts[piece.Alias]++
}
}
return nil
}

View File

@ -0,0 +1,82 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package piecetracker_test
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
)
func TestObserverPieceTracker(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.PieceTracker.UseRangedLoop = true
config.RangedLoop.Parallelism = 4
config.RangedLoop.BatchSize = 4
// configure RS
config.Metainfo.RS.Min = 2
config.Metainfo.RS.Repair = 3
config.Metainfo.RS.Success = 4
config.Metainfo.RS.Total = 4
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// ensure that the piece counts are empty
pieceCounts, err := planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(pieceCounts))
// Setup: create 50KiB of data for the uplink to upload
testdata := testrand.Bytes(50 * memory.KiB)
testBucket := "testbucket"
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], testBucket, "test/path", testdata)
require.NoError(t, err)
// Run the ranged loop
_, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// Check that the piece counts are correct
pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
require.NoError(t, err)
require.True(t, len(pieceCounts) > 0)
for node, count := range pieceCounts {
require.Equal(t, int64(1), count, "node %s should have 1 piece", node)
}
// upload more objects
numOfObjects := 10
for i := 0; i < numOfObjects; i++ {
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], testBucket, fmt.Sprintf("test/path%d", i), testdata)
require.NoError(t, err)
}
// Run the ranged loop again
_, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// Check that the piece counts are correct
pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
require.NoError(t, err)
require.True(t, len(pieceCounts) > 0)
for node, count := range pieceCounts {
require.Equal(t, int64(numOfObjects+1), count, "node %s should have %d pieces", node, numOfObjects+1)
}
})
}

View File

@ -0,0 +1,9 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package piecetracker
// Config is the configuration for the piecetracker.
type Config struct {
UseRangedLoop bool `help:"whether to enable piece tracker observer with ranged loop" default:"true"`
}

View File

@ -43,6 +43,7 @@ import (
"storj.io/storj/satellite/console/userinfo" "storj.io/storj/satellite/console/userinfo"
"storj.io/storj/satellite/contact" "storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc/bloomfilter" "storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/gc/piecetracker"
"storj.io/storj/satellite/gc/sender" "storj.io/storj/satellite/gc/sender"
"storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/mailservice"
@ -215,6 +216,8 @@ type Config struct {
ProjectLimit accounting.ProjectLimitConfig ProjectLimit accounting.ProjectLimitConfig
Analytics analytics.Config Analytics analytics.Config
PieceTracker piecetracker.Config
} }
func setupMailService(log *zap.Logger, config Config) (*mailservice.Service, error) { func setupMailService(log *zap.Logger, config Config) (*mailservice.Service, error) {

View File

@ -18,6 +18,7 @@ import (
"storj.io/storj/private/lifecycle" "storj.io/storj/private/lifecycle"
"storj.io/storj/satellite/accounting/nodetally" "storj.io/storj/satellite/accounting/nodetally"
"storj.io/storj/satellite/audit" "storj.io/storj/satellite/audit"
"storj.io/storj/satellite/gc/piecetracker"
"storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/rangedloop"
@ -65,6 +66,10 @@ type RangedLoop struct {
NodeTallyObserver *nodetally.Observer NodeTallyObserver *nodetally.Observer
} }
PieceTracker struct {
Observer *piecetracker.Observer
}
RangedLoop struct { RangedLoop struct {
Service *rangedloop.Service Service *rangedloop.Service
} }
@ -124,6 +129,15 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
metabaseDB) metabaseDB)
} }
{ // setup piece tracker observer
peer.PieceTracker.Observer = piecetracker.NewObserver(
log.Named("piecetracker"),
metabaseDB,
peer.DB.OverlayCache(),
config.PieceTracker,
)
}
{ // setup overlay { // setup overlay
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.DB.OverlayCache(), peer.DB.NodeEvents(), config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay) peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.DB.OverlayCache(), peer.DB.NodeEvents(), config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil { if err != nil {
@ -167,6 +181,10 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
observers = append(observers, peer.Repair.Observer) observers = append(observers, peer.Repair.Observer)
} }
if config.PieceTracker.UseRangedLoop {
observers = append(observers, peer.PieceTracker.Observer)
}
segments := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize) segments := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.AsOfSystemInterval, config.RangedLoop.BatchSize)
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, segments, observers) peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, segments, observers)

View File

@ -889,6 +889,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# price user should pay for storage per month in dollars/TB # price user should pay for storage per month in dollars/TB
# payments.usage-price.storage-tb: "4" # payments.usage-price.storage-tb: "4"
# whether to enable piece tracker observer with ranged loop
# piece-tracker.use-ranged-loop: true
# how often to remove unused project bandwidth rollups # how often to remove unused project bandwidth rollups
# project-bw-cleanup.interval: 24h0m0s # project-bw-cleanup.interval: 24h0m0s