diff --git a/satellite/gracefulexit/chore.go b/satellite/gracefulexit/chore.go index 1cd4d2ae1..1e1fa4a96 100644 --- a/satellite/gracefulexit/chore.go +++ b/satellite/gracefulexit/chore.go @@ -100,7 +100,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) { } // Populate transfer queue for nodes that have not completed the exit loop yet - pathCollector := NewPathCollector(chore.db, exitingNodesLoopIncomplete, chore.log, chore.config.ChoreBatchSize) + pathCollector := NewPathCollector(chore.log, chore.db, exitingNodesLoopIncomplete, chore.config.ChoreBatchSize) err = chore.segmentLoop.Join(ctx, pathCollector) if err != nil { chore.log.Error("error joining segment loop.", zap.Error(err)) diff --git a/satellite/gracefulexit/pathcollector.go b/satellite/gracefulexit/pathcollector.go index 31933bcc5..fced09340 100644 --- a/satellite/gracefulexit/pathcollector.go +++ b/satellite/gracefulexit/pathcollector.go @@ -5,7 +5,6 @@ package gracefulexit import ( "context" - "sync" "github.com/zeebo/errs" "go.uber.org/zap" @@ -23,27 +22,25 @@ var _ segmentloop.Observer = (*PathCollector)(nil) // // architecture: Observer type PathCollector struct { - db DB - nodeIDMutex sync.Mutex - nodeIDStorage map[storj.NodeID]int64 - buffer []TransferQueueItem log *zap.Logger + db DB + buffer []TransferQueueItem batchSize int + nodeIDStorage map[storj.NodeID]int64 } // NewPathCollector instantiates a path collector. -func NewPathCollector(db DB, nodeIDs storj.NodeIDList, log *zap.Logger, batchSize int) *PathCollector { - buffer := make([]TransferQueueItem, 0, batchSize) +func NewPathCollector(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *PathCollector { collector := &PathCollector{ - db: db, - log: log, - buffer: buffer, - batchSize: batchSize, + log: log, + db: db, + buffer: make([]TransferQueueItem, 0, batchSize), + batchSize: batchSize, + nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)), } - if len(nodeIDs) > 0 { - collector.nodeIDStorage = make(map[storj.NodeID]int64, len(nodeIDs)) - for _, nodeID := range nodeIDs { + if len(exitingNodes) > 0 { + for _, nodeID := range exitingNodes { collector.nodeIDStorage[nodeID] = 0 } } @@ -70,19 +67,23 @@ func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *segm return nil } - collector.nodeIDMutex.Lock() - defer collector.nodeIDMutex.Unlock() + pieceSize := int64(-1) numPieces := len(segment.Pieces) for _, piece := range segment.Pieces { if _, ok := collector.nodeIDStorage[piece.StorageNode]; !ok { continue } - redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) - if err != nil { - return err + + // avoid creating new redundancy strategy for every segment piece + if pieceSize == -1 { + redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy) + if err != nil { + return err + } + pieceSize = eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy) } - pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy) + collector.nodeIDStorage[piece.StorageNode] += pieceSize item := TransferQueueItem{