136 lines
3.6 KiB
Go
136 lines
3.6 KiB
Go
|
// Copyright (C) 2023 Storj Labs, Inc.
|
||
|
// See LICENSE for copying information.
|
||
|
|
||
|
package piecetracker
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"time"
|
||
|
|
||
|
"github.com/spacemonkeygo/monkit/v3"
|
||
|
"github.com/zeebo/errs"
|
||
|
"go.uber.org/zap"
|
||
|
|
||
|
"storj.io/common/storj"
|
||
|
"storj.io/storj/satellite/metabase"
|
||
|
"storj.io/storj/satellite/metabase/rangedloop"
|
||
|
"storj.io/storj/satellite/overlay"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// Error is a standard error class for this package.
|
||
|
Error = errs.Class("piecetracker")
|
||
|
mon = monkit.Package()
|
||
|
|
||
|
// check if Observer and Partial interfaces are satisfied.
|
||
|
_ rangedloop.Observer = (*Observer)(nil)
|
||
|
_ rangedloop.Partial = (*observerFork)(nil)
|
||
|
)
|
||
|
|
||
|
// Observer implements piecetraker ranged loop observer.
|
||
|
//
|
||
|
// The piecetracker counts the number of pieces currently expected to reside on each node,
|
||
|
// then passes the counts to the overlay with UpdatePieceCounts().
|
||
|
type Observer struct {
|
||
|
log *zap.Logger
|
||
|
config Config
|
||
|
overlay overlay.DB
|
||
|
metabaseDB *metabase.DB
|
||
|
|
||
|
pieceCounts map[metabase.NodeAlias]int64
|
||
|
}
|
||
|
|
||
|
// NewObserver creates new piecetracker ranged loop observer.
|
||
|
func NewObserver(log *zap.Logger, metabaseDB *metabase.DB, overlay overlay.DB, config Config) *Observer {
|
||
|
return &Observer{
|
||
|
log: log,
|
||
|
overlay: overlay,
|
||
|
metabaseDB: metabaseDB,
|
||
|
config: config,
|
||
|
pieceCounts: map[metabase.NodeAlias]int64{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Start implements ranged loop observer start method.
|
||
|
func (observer *Observer) Start(ctx context.Context, time time.Time) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
observer.pieceCounts = map[metabase.NodeAlias]int64{}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Fork implements ranged loop observer fork method.
|
||
|
func (observer *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
return newObserverFork(), nil
|
||
|
}
|
||
|
|
||
|
// Join joins piecetracker ranged loop partial to main observer updating piece counts map.
|
||
|
func (observer *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
pieceTracker, ok := partial.(*observerFork)
|
||
|
if !ok {
|
||
|
return Error.New("expected %T but got %T", pieceTracker, partial)
|
||
|
}
|
||
|
|
||
|
// Merge piece counts for each node.
|
||
|
for nodeAlias, pieceCount := range pieceTracker.pieceCounts {
|
||
|
observer.pieceCounts[nodeAlias] += pieceCount
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Finish updates piece counts in the DB.
|
||
|
func (observer *Observer) Finish(ctx context.Context) (err error) {
|
||
|
defer mon.Task()(&ctx)(&err)
|
||
|
|
||
|
observer.log.Info("piecetracker observer finished")
|
||
|
|
||
|
nodeAliasMap, err := observer.metabaseDB.LatestNodesAliasMap(ctx)
|
||
|
pieceCounts := make(map[storj.NodeID]int64, len(observer.pieceCounts))
|
||
|
|
||
|
for nodeAlias, count := range observer.pieceCounts {
|
||
|
nodeID, ok := nodeAliasMap.Node(nodeAlias)
|
||
|
if !ok {
|
||
|
observer.log.Error("unrecognized node alias in piecetracker ranged-loop", zap.Int32("node-alias", int32(nodeAlias)))
|
||
|
continue
|
||
|
}
|
||
|
pieceCounts[nodeID] = count
|
||
|
}
|
||
|
err = observer.overlay.UpdatePieceCounts(ctx, pieceCounts)
|
||
|
if err != nil {
|
||
|
observer.log.Error("error updating piece counts", zap.Error(err))
|
||
|
return Error.Wrap(err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type observerFork struct {
|
||
|
pieceCounts map[metabase.NodeAlias]int64
|
||
|
}
|
||
|
|
||
|
// newObserverFork creates new piecetracker ranged loop fork.
|
||
|
func newObserverFork() *observerFork {
|
||
|
return &observerFork{
|
||
|
pieceCounts: map[metabase.NodeAlias]int64{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Process iterates over segment range updating partial piece counts for each node.
|
||
|
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) error {
|
||
|
for _, segment := range segments {
|
||
|
if segment.Inline() {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
for _, piece := range segment.AliasPieces {
|
||
|
fork.pieceCounts[piece.Alias]++
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|