satellite/gracefulexit: observer cleanup

Some changes to make code cleaner and easier to adopt to new ranged
loop.

* removed unneeded mutex
* reorganize constructor args
* avoid creating the same redundancy scheme for each segment piece

Change-Id: I81f3f6597147fc515516949db3ce796a60b1c8a0
This commit is contained in:
Michal Niewrzal 2022-11-04 11:01:52 +01:00 committed by Storj Robot
parent cc858f4e91
commit b2dc8211d6
2 changed files with 22 additions and 21 deletions

View File

@ -100,7 +100,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
}
// Populate transfer queue for nodes that have not completed the exit loop yet
pathCollector := NewPathCollector(chore.db, exitingNodesLoopIncomplete, chore.log, chore.config.ChoreBatchSize)
pathCollector := NewPathCollector(chore.log, chore.db, exitingNodesLoopIncomplete, chore.config.ChoreBatchSize)
err = chore.segmentLoop.Join(ctx, pathCollector)
if err != nil {
chore.log.Error("error joining segment loop.", zap.Error(err))

View File

@ -5,7 +5,6 @@ package gracefulexit
import (
"context"
"sync"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -23,27 +22,25 @@ var _ segmentloop.Observer = (*PathCollector)(nil)
//
// architecture: Observer
type PathCollector struct {
db DB
nodeIDMutex sync.Mutex
nodeIDStorage map[storj.NodeID]int64
buffer []TransferQueueItem
log *zap.Logger
db DB
buffer []TransferQueueItem
batchSize int
nodeIDStorage map[storj.NodeID]int64
}
// NewPathCollector instantiates a path collector.
func NewPathCollector(db DB, nodeIDs storj.NodeIDList, log *zap.Logger, batchSize int) *PathCollector {
buffer := make([]TransferQueueItem, 0, batchSize)
func NewPathCollector(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *PathCollector {
collector := &PathCollector{
db: db,
log: log,
buffer: buffer,
batchSize: batchSize,
log: log,
db: db,
buffer: make([]TransferQueueItem, 0, batchSize),
batchSize: batchSize,
nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)),
}
if len(nodeIDs) > 0 {
collector.nodeIDStorage = make(map[storj.NodeID]int64, len(nodeIDs))
for _, nodeID := range nodeIDs {
if len(exitingNodes) > 0 {
for _, nodeID := range exitingNodes {
collector.nodeIDStorage[nodeID] = 0
}
}
@ -70,19 +67,23 @@ func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *segm
return nil
}
collector.nodeIDMutex.Lock()
defer collector.nodeIDMutex.Unlock()
pieceSize := int64(-1)
numPieces := len(segment.Pieces)
for _, piece := range segment.Pieces {
if _, ok := collector.nodeIDStorage[piece.StorageNode]; !ok {
continue
}
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return err
// avoid creating new redundancy strategy for every segment piece
if pieceSize == -1 {
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return err
}
pieceSize = eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
}
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
collector.nodeIDStorage[piece.StorageNode] += pieceSize
item := TransferQueueItem{