diff --git a/satellite/gracefulexit/observer.go b/satellite/gracefulexit/observer.go index 5b72320e2..501cfc03a 100644 --- a/satellite/gracefulexit/observer.go +++ b/satellite/gracefulexit/observer.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "storj.io/common/storj" + "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/overlay" ) @@ -20,26 +21,28 @@ import ( // timed out status and removes transefer queue items for inactive exiting // nodes. type Observer struct { - log *zap.Logger - db DB - overlay overlay.DB - config Config + log *zap.Logger + db DB + overlay overlay.DB + metabase *metabase.DB + config Config // The following variables are reset on each loop cycle - exitingNodes storj.NodeIDList - bytesToTransfer map[storj.NodeID]int64 + exitingNodes map[metabase.NodeAlias]storj.NodeID + bytesToTransfer map[metabase.NodeAlias]int64 } var _ rangedloop.Observer = (*Observer)(nil) var _ rangedloop.Partial = (*observerFork)(nil) // NewObserver returns a new ranged loop observer. -func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, config Config) *Observer { +func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, metabase *metabase.DB, config Config) *Observer { return &Observer{ - log: log, - db: db, - overlay: overlay, - config: config, + log: log, + db: db, + overlay: overlay, + metabase: metabase, + config: config, } } @@ -65,11 +68,20 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) obs.checkForInactiveNodes(ctx, exitingNodes) - obs.exitingNodes = nil - obs.bytesToTransfer = make(map[storj.NodeID]int64) + aliases, err := obs.metabase.LatestNodesAliasMap(ctx) + if err != nil { + return err + } + + obs.exitingNodes = make(map[metabase.NodeAlias]storj.NodeID, len(exitingNodes)) + obs.bytesToTransfer = make(map[metabase.NodeAlias]int64) for _, node := range exitingNodes { if node.ExitLoopCompletedAt == nil { - obs.exitingNodes = append(obs.exitingNodes, node.NodeID) + alias, ok := aliases.Alias(node.NodeID) + if !ok { + return errs.New("unable to find alias for node: %s", node.NodeID) + } + obs.exitingNodes[alias] = node.NodeID } } return nil @@ -96,8 +108,8 @@ func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err return err } - for nodeID, bytesToTransfer := range pathCollector.nodeIDStorage { - obs.bytesToTransfer[nodeID] += bytesToTransfer + for alias, bytesToTransfer := range pathCollector.nodeIDStorage { + obs.bytesToTransfer[alias] += bytesToTransfer } return nil } @@ -109,7 +121,8 @@ func (obs *Observer) Finish(ctx context.Context) (err error) { // Record that the exit loop was completed for each node now := time.Now().UTC() - for nodeID, bytesToTransfer := range obs.bytesToTransfer { + for alias, bytesToTransfer := range obs.bytesToTransfer { + nodeID := obs.exitingNodes[alias] exitStatus := overlay.ExitStatusRequest{ NodeID: nodeID, ExitLoopCompletedAt: now, @@ -164,27 +177,29 @@ func (obs *Observer) checkForInactiveNodes(ctx context.Context, exitingNodes []* } +var flushMon = mon.Task() + type observerFork struct { log *zap.Logger db DB buffer []TransferQueueItem batchSize int - nodeIDStorage map[storj.NodeID]int64 + nodeIDStorage map[metabase.NodeAlias]int64 + exitingNodes map[metabase.NodeAlias]storj.NodeID } -func newObserverFork(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *observerFork { +func newObserverFork(log *zap.Logger, db DB, exitingNodes map[metabase.NodeAlias]storj.NodeID, batchSize int) *observerFork { fork := &observerFork{ log: log, db: db, buffer: make([]TransferQueueItem, 0, batchSize), batchSize: batchSize, - nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)), + nodeIDStorage: make(map[metabase.NodeAlias]int64, len(exitingNodes)), + exitingNodes: exitingNodes, } - if len(exitingNodes) > 0 { - for _, nodeID := range exitingNodes { - fork.nodeIDStorage[nodeID] = 0 - } + for alias := range exitingNodes { + fork.nodeIDStorage[alias] = 0 } return fork @@ -212,41 +227,38 @@ func (observer *observerFork) Process(ctx context.Context, segments []rangedloop func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment rangedloop.Segment) (err error) { numPieces := len(segment.Pieces) - for _, piece := range segment.Pieces { - if _, ok := observer.nodeIDStorage[piece.StorageNode]; !ok { + for _, alias := range segment.AliasPieces { + nodeID, ok := observer.exitingNodes[alias.Alias] + if !ok { continue } pieceSize := segment.PieceSize() - observer.nodeIDStorage[piece.StorageNode] += pieceSize + observer.nodeIDStorage[alias.Alias] += pieceSize item := TransferQueueItem{ - NodeID: piece.StorageNode, + NodeID: nodeID, StreamID: segment.StreamID, Position: segment.Position, - PieceNum: int32(piece.Number), + PieceNum: int32(alias.Number), RootPieceID: segment.RootPieceID, DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares), } - observer.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.StorageNode), + observer.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", nodeID), zap.String("stream_id", segment.StreamID.String()), zap.Int32("part", int32(segment.Position.Part)), - zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", piece.Number), + zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", alias.Number), zap.Int("num pieces", numPieces), zap.Int16("total possible pieces", segment.Redundancy.TotalShares)) observer.buffer = append(observer.buffer, item) - err = observer.flush(ctx, observer.batchSize) - if err != nil { - return err - } } - return nil + return observer.flush(ctx, observer.batchSize) } func (observer *observerFork) flush(ctx context.Context, limit int) (err error) { - defer mon.Task()(&ctx)(&err) + defer flushMon(&ctx)(&err) if len(observer.buffer) >= limit { err = observer.db.Enqueue(ctx, observer.buffer, observer.batchSize) diff --git a/satellite/metabase/rangedloop/service_test.go b/satellite/metabase/rangedloop/service_test.go index b3ece78a2..c3bc35ca2 100644 --- a/satellite/metabase/rangedloop/service_test.go +++ b/satellite/metabase/rangedloop/service_test.go @@ -414,6 +414,7 @@ func TestAllInOne(t *testing.T) { gracefulexit.NewObserver(log.Named("gracefulexit:observer"), satellite.DB.GracefulExit(), satellite.DB.OverlayCache(), + satellite.Metabase.DB, satellite.Config.GracefulExit, ), bloomfilter.NewObserver(log.Named("gc-bf"), diff --git a/satellite/rangedloop.go b/satellite/rangedloop.go index bc76b38b0..824a62391 100644 --- a/satellite/rangedloop.go +++ b/satellite/rangedloop.go @@ -112,6 +112,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf peer.Log.Named("gracefulexit:observer"), peer.DB.GracefulExit(), peer.DB.OverlayCache(), + metabaseDB, config.GracefulExit, ) }