satellite/gracefulexit: use node alias instead id with observer
Using node alias helps using less cpu and memory. Fixes https://github.com/storj/storj/issues/5654 Change-Id: If3a5c7810732cbb1bff4dcb78706c81aca56b71b
This commit is contained in:
parent
de737bdee9
commit
9b3488276d
@ -12,6 +12,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
@ -20,26 +21,28 @@ import (
|
||||
// timed out status and removes transefer queue items for inactive exiting
|
||||
// nodes.
|
||||
type Observer struct {
|
||||
log *zap.Logger
|
||||
db DB
|
||||
overlay overlay.DB
|
||||
config Config
|
||||
log *zap.Logger
|
||||
db DB
|
||||
overlay overlay.DB
|
||||
metabase *metabase.DB
|
||||
config Config
|
||||
|
||||
// The following variables are reset on each loop cycle
|
||||
exitingNodes storj.NodeIDList
|
||||
bytesToTransfer map[storj.NodeID]int64
|
||||
exitingNodes map[metabase.NodeAlias]storj.NodeID
|
||||
bytesToTransfer map[metabase.NodeAlias]int64
|
||||
}
|
||||
|
||||
var _ rangedloop.Observer = (*Observer)(nil)
|
||||
var _ rangedloop.Partial = (*observerFork)(nil)
|
||||
|
||||
// NewObserver returns a new ranged loop observer.
|
||||
func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, config Config) *Observer {
|
||||
func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, metabase *metabase.DB, config Config) *Observer {
|
||||
return &Observer{
|
||||
log: log,
|
||||
db: db,
|
||||
overlay: overlay,
|
||||
config: config,
|
||||
log: log,
|
||||
db: db,
|
||||
overlay: overlay,
|
||||
metabase: metabase,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@ -65,11 +68,20 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)
|
||||
|
||||
obs.checkForInactiveNodes(ctx, exitingNodes)
|
||||
|
||||
obs.exitingNodes = nil
|
||||
obs.bytesToTransfer = make(map[storj.NodeID]int64)
|
||||
aliases, err := obs.metabase.LatestNodesAliasMap(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
obs.exitingNodes = make(map[metabase.NodeAlias]storj.NodeID, len(exitingNodes))
|
||||
obs.bytesToTransfer = make(map[metabase.NodeAlias]int64)
|
||||
for _, node := range exitingNodes {
|
||||
if node.ExitLoopCompletedAt == nil {
|
||||
obs.exitingNodes = append(obs.exitingNodes, node.NodeID)
|
||||
alias, ok := aliases.Alias(node.NodeID)
|
||||
if !ok {
|
||||
return errs.New("unable to find alias for node: %s", node.NodeID)
|
||||
}
|
||||
obs.exitingNodes[alias] = node.NodeID
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -96,8 +108,8 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err
|
||||
return err
|
||||
}
|
||||
|
||||
for nodeID, bytesToTransfer := range pathCollector.nodeIDStorage {
|
||||
obs.bytesToTransfer[nodeID] += bytesToTransfer
|
||||
for alias, bytesToTransfer := range pathCollector.nodeIDStorage {
|
||||
obs.bytesToTransfer[alias] += bytesToTransfer
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -109,7 +121,8 @@ func (obs *Observer) Finish(ctx context.Context) (err error) {
|
||||
|
||||
// Record that the exit loop was completed for each node
|
||||
now := time.Now().UTC()
|
||||
for nodeID, bytesToTransfer := range obs.bytesToTransfer {
|
||||
for alias, bytesToTransfer := range obs.bytesToTransfer {
|
||||
nodeID := obs.exitingNodes[alias]
|
||||
exitStatus := overlay.ExitStatusRequest{
|
||||
NodeID: nodeID,
|
||||
ExitLoopCompletedAt: now,
|
||||
@ -164,27 +177,29 @@ func (obs *Observer) checkForInactiveNodes(ctx context.Context, exitingNodes []*
|
||||
|
||||
}
|
||||
|
||||
var flushMon = mon.Task()
|
||||
|
||||
type observerFork struct {
|
||||
log *zap.Logger
|
||||
db DB
|
||||
buffer []TransferQueueItem
|
||||
batchSize int
|
||||
nodeIDStorage map[storj.NodeID]int64
|
||||
nodeIDStorage map[metabase.NodeAlias]int64
|
||||
exitingNodes map[metabase.NodeAlias]storj.NodeID
|
||||
}
|
||||
|
||||
func newObserverFork(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *observerFork {
|
||||
func newObserverFork(log *zap.Logger, db DB, exitingNodes map[metabase.NodeAlias]storj.NodeID, batchSize int) *observerFork {
|
||||
fork := &observerFork{
|
||||
log: log,
|
||||
db: db,
|
||||
buffer: make([]TransferQueueItem, 0, batchSize),
|
||||
batchSize: batchSize,
|
||||
nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)),
|
||||
nodeIDStorage: make(map[metabase.NodeAlias]int64, len(exitingNodes)),
|
||||
exitingNodes: exitingNodes,
|
||||
}
|
||||
|
||||
if len(exitingNodes) > 0 {
|
||||
for _, nodeID := range exitingNodes {
|
||||
fork.nodeIDStorage[nodeID] = 0
|
||||
}
|
||||
for alias := range exitingNodes {
|
||||
fork.nodeIDStorage[alias] = 0
|
||||
}
|
||||
|
||||
return fork
|
||||
@ -212,41 +227,38 @@ func (observer *observerFork) Process(ctx context.Context, segments []rangedloop
|
||||
|
||||
func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment rangedloop.Segment) (err error) {
|
||||
numPieces := len(segment.Pieces)
|
||||
for _, piece := range segment.Pieces {
|
||||
if _, ok := observer.nodeIDStorage[piece.StorageNode]; !ok {
|
||||
for _, alias := range segment.AliasPieces {
|
||||
nodeID, ok := observer.exitingNodes[alias.Alias]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
pieceSize := segment.PieceSize()
|
||||
|
||||
observer.nodeIDStorage[piece.StorageNode] += pieceSize
|
||||
observer.nodeIDStorage[alias.Alias] += pieceSize
|
||||
|
||||
item := TransferQueueItem{
|
||||
NodeID: piece.StorageNode,
|
||||
NodeID: nodeID,
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
PieceNum: int32(piece.Number),
|
||||
PieceNum: int32(alias.Number),
|
||||
RootPieceID: segment.RootPieceID,
|
||||
DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares),
|
||||
}
|
||||
|
||||
observer.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.StorageNode),
|
||||
observer.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", nodeID),
|
||||
zap.String("stream_id", segment.StreamID.String()), zap.Int32("part", int32(segment.Position.Part)),
|
||||
zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", piece.Number),
|
||||
zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", alias.Number),
|
||||
zap.Int("num pieces", numPieces), zap.Int16("total possible pieces", segment.Redundancy.TotalShares))
|
||||
|
||||
observer.buffer = append(observer.buffer, item)
|
||||
err = observer.flush(ctx, observer.batchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return observer.flush(ctx, observer.batchSize)
|
||||
}
|
||||
|
||||
func (observer *observerFork) flush(ctx context.Context, limit int) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
defer flushMon(&ctx)(&err)
|
||||
|
||||
if len(observer.buffer) >= limit {
|
||||
err = observer.db.Enqueue(ctx, observer.buffer, observer.batchSize)
|
||||
|
@ -414,6 +414,7 @@ func TestAllInOne(t *testing.T) {
|
||||
gracefulexit.NewObserver(log.Named("gracefulexit:observer"),
|
||||
satellite.DB.GracefulExit(),
|
||||
satellite.DB.OverlayCache(),
|
||||
satellite.Metabase.DB,
|
||||
satellite.Config.GracefulExit,
|
||||
),
|
||||
bloomfilter.NewObserver(log.Named("gc-bf"),
|
||||
|
@ -112,6 +112,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
|
||||
peer.Log.Named("gracefulexit:observer"),
|
||||
peer.DB.GracefulExit(),
|
||||
peer.DB.OverlayCache(),
|
||||
metabaseDB,
|
||||
config.GracefulExit,
|
||||
)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user