storj/satellite/gracefulexit/pathcollector.go
Michal Niewrzal 16b7901fde satellite/metabase: add piece size calculation to segment
This code is essentially replacement for eestream.CalcPieceSize. To call
eestream.CalcPieceSize we need eestream.RedundancyStrategy which is not
trivial to get as it requires infectious.FEC. For example infectious.FEC
creation is visible on GE loop observer CPU profile because we were
doing this for each segment in DB.

New method was added to storj.Redundancy and here we are just wiring it
with metabase Segment.

BenchmarkSegmentPieceSize
BenchmarkSegmentPieceSize/eestream.CalcPieceSize
BenchmarkSegmentPieceSize/eestream.CalcPieceSize-8         	    5822	    189189 ns/op	    9776 B/op	       8 allocs/op
BenchmarkSegmentPieceSize/segment.PieceSize
BenchmarkSegmentPieceSize/segment.PieceSize-8              	94721329	        11.49 ns/op	       0 B/op	       0 allocs/op

Change-Id: I5a8b4237aedd1424c54ed0af448061a236b00295
2023-02-22 11:04:02 +00:00

144 lines
4.2 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
var remoteSegmentFunc = mon.Task()
var _ segmentloop.Observer = (*PathCollector)(nil)
var _ rangedloop.Partial = (*PathCollector)(nil)
// PathCollector uses the metainfo loop to add paths to node reservoirs.
//
// architecture: Observer
type PathCollector struct {
log *zap.Logger
db DB
buffer []TransferQueueItem
batchSize int
nodeIDStorage map[storj.NodeID]int64
}
// NewPathCollector instantiates a path collector.
func NewPathCollector(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *PathCollector {
collector := &PathCollector{
log: log,
db: db,
buffer: make([]TransferQueueItem, 0, batchSize),
batchSize: batchSize,
nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)),
}
if len(exitingNodes) > 0 {
for _, nodeID := range exitingNodes {
collector.nodeIDStorage[nodeID] = 0
}
}
return collector
}
// LoopStarted is called at each start of a loop.
func (collector *PathCollector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
return nil
}
// Flush persists the current buffer items to the database.
func (collector *PathCollector) Flush(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return collector.flush(ctx, 1)
}
// RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already.
func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
defer remoteSegmentFunc(&ctx)(&err)
if len(collector.nodeIDStorage) == 0 {
return nil
}
return collector.handleRemoteSegment(ctx, segment)
}
func (collector *PathCollector) handleRemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
numPieces := len(segment.Pieces)
for _, piece := range segment.Pieces {
if _, ok := collector.nodeIDStorage[piece.StorageNode]; !ok {
continue
}
pieceSize := segment.PieceSize()
collector.nodeIDStorage[piece.StorageNode] += pieceSize
item := TransferQueueItem{
NodeID: piece.StorageNode,
StreamID: segment.StreamID,
Position: segment.Position,
PieceNum: int32(piece.Number),
RootPieceID: segment.RootPieceID,
DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares),
}
collector.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.StorageNode),
zap.String("stream_id", segment.StreamID.String()), zap.Int32("part", int32(segment.Position.Part)),
zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", piece.Number),
zap.Int("num pieces", numPieces), zap.Int16("total possible pieces", segment.Redundancy.TotalShares))
collector.buffer = append(collector.buffer, item)
err = collector.flush(ctx, collector.batchSize)
if err != nil {
return err
}
}
return nil
}
// InlineSegment returns nil because we're only auditing for storage nodes for now.
func (collector *PathCollector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}
// Process adds transfer queue items for remote segments belonging to newly
// exiting nodes.
func (collector *PathCollector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
// Intentionally omitting mon.Task here. The duration for all process
// calls are aggregated and and emitted by the ranged loop service.
if len(collector.nodeIDStorage) == 0 {
return nil
}
for _, segment := range segments {
if segment.Inline() {
continue
}
if err := collector.handleRemoteSegment(ctx, &segment); err != nil {
return err
}
}
return nil
}
func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) {
defer mon.Task()(&ctx)(&err)
if len(collector.buffer) >= limit {
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize)
collector.buffer = collector.buffer[:0]
return errs.Wrap(err)
}
return nil
}