satellite/gracefulexit: remove segments loop parts

We are switching completely to ranged loop.

https://github.com/storj/storj/issues/5368

Change-Id: Ia3e2d7879d91f7f5ffa99b8e8f108380e3b39f31
This commit is contained in:
Michal Niewrzal 2023-04-24 11:10:00 +02:00 committed by Storj Robot
parent 6a55682bc6
commit 6ac5bf0d7c
12 changed files with 140 additions and 368 deletions

View File

@ -198,7 +198,6 @@ type Satellite struct {
}
GracefulExit struct {
Chore *gracefulexit.Chore
Endpoint *gracefulexit.Endpoint
}
}
@ -660,7 +659,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.ProjectLimits.Cache = api.ProjectLimits.Cache
system.GracefulExit.Chore = peer.GracefulExit.Chore
system.GracefulExit.Endpoint = api.GracefulExit.Endpoint
return system

View File

@ -33,7 +33,6 @@ import (
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleauth"
"storj.io/storj/satellite/console/emailreminders"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
@ -142,10 +141,6 @@ type Core struct {
StorjscanService *storjscan.Service
StorjscanChore *storjscan.Chore
}
GracefulExit struct {
Chore *gracefulexit.Chore
}
}
// New creates a new satellite.
@ -591,25 +586,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
}
{ // setup graceful exit
log := peer.Log.Named("gracefulexit")
switch {
case !config.GracefulExit.Enabled:
log.Info("disabled")
case config.GracefulExit.UseRangedLoop:
log.Info("using ranged loop")
default:
peer.GracefulExit.Chore = gracefulexit.NewChore(log, peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.SegmentLoop, config.GracefulExit)
peer.Services.Add(lifecycle.Item{
Name: "gracefulexit",
Run: peer.GracefulExit.Chore.Run,
Close: peer.GracefulExit.Chore.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
}
}
return peer, nil
}

View File

@ -1,138 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"context"
"database/sql"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
)
// Chore populates the graceful exit transfer queue.
//
// architecture: Chore
type Chore struct {
log *zap.Logger
Loop *sync2.Cycle
db DB
config Config
overlay overlay.DB
segmentLoop *segmentloop.Service
}
// NewChore instantiates Chore.
func NewChore(log *zap.Logger, db DB, overlay overlay.DB, segmentLoop *segmentloop.Service, config Config) *Chore {
return &Chore{
log: log,
Loop: sync2.NewCycle(config.ChoreInterval),
db: db,
config: config,
overlay: overlay,
segmentLoop: segmentLoop,
}
}
// Run starts the chore.
func (chore *Chore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return chore.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
exitingNodes, err := chore.overlay.GetExitingNodes(ctx)
if err != nil {
chore.log.Error("error retrieving nodes that have not finished exiting", zap.Error(err))
return nil
}
nodeCount := len(exitingNodes)
if nodeCount == 0 {
return nil
}
chore.log.Debug("found exiting nodes", zap.Int("exitingNodes", nodeCount))
exitingNodesLoopIncomplete := make(storj.NodeIDList, 0, nodeCount)
for _, node := range exitingNodes {
if node.ExitLoopCompletedAt == nil {
exitingNodesLoopIncomplete = append(exitingNodesLoopIncomplete, node.NodeID)
continue
}
progress, err := chore.db.GetProgress(ctx, node.NodeID)
if err != nil && !errs.Is(err, sql.ErrNoRows) {
chore.log.Error("error retrieving progress for node", zap.Stringer("Node ID", node.NodeID), zap.Error(err))
continue
}
lastActivityTime := *node.ExitLoopCompletedAt
if progress != nil {
lastActivityTime = progress.UpdatedAt
}
// check inactive timeframe
if lastActivityTime.Add(chore.config.MaxInactiveTimeFrame).Before(time.Now().UTC()) {
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: node.NodeID,
ExitSuccess: false,
ExitFinishedAt: time.Now().UTC(),
}
mon.Meter("graceful_exit_fail_inactive").Mark(1)
_, err = chore.overlay.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
chore.log.Error("error updating exit status", zap.Error(err))
continue
}
// remove all items from the transfer queue
err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID)
if err != nil {
chore.log.Error("error deleting node from transfer queue", zap.Error(err))
}
}
}
// Populate transfer queue for nodes that have not completed the exit loop yet
pathCollector := NewPathCollector(chore.log, chore.db, exitingNodesLoopIncomplete, chore.config.ChoreBatchSize)
err = chore.segmentLoop.Join(ctx, pathCollector)
if err != nil {
chore.log.Error("error joining segment loop.", zap.Error(err))
return nil
}
err = pathCollector.Flush(ctx)
if err != nil {
chore.log.Error("error flushing collector buffer.", zap.Error(err))
return nil
}
now := time.Now().UTC()
for _, nodeID := range exitingNodesLoopIncomplete {
exitStatus := overlay.ExitStatusRequest{
NodeID: nodeID,
ExitLoopCompletedAt: now,
}
_, err = chore.overlay.UpdateExitStatus(ctx, &exitStatus)
if err != nil {
chore.log.Error("error updating exit status.", zap.Error(err))
}
bytesToTransfer := pathCollector.nodeIDStorage[nodeID]
mon.IntVal("graceful_exit_init_bytes_stored").Observe(bytesToTransfer)
}
return nil
})
}
// Close closes chore.
func (chore *Chore) Close() error {
chore.Loop.Close()
return nil
}

View File

@ -43,8 +43,6 @@ func TestChore(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
satellite.GracefulExit.Chore.Loop.Pause()
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
@ -79,7 +77,9 @@ func TestChore(t *testing.T) {
}
require.Len(t, nodeIDs, 1)
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
@ -109,7 +109,6 @@ func TestChore(t *testing.T) {
}
require.Len(t, nodeIDs, 0)
satellite.GracefulExit.Chore.Loop.Pause()
err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0)
require.NoError(t, err)
@ -119,7 +118,9 @@ func TestChore(t *testing.T) {
// node should fail graceful exit if it has been inactive for maximum inactive time frame since last activity
time.Sleep(maximumInactiveTimeFrame + time.Second*1)
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
@ -159,7 +160,6 @@ func TestChoreDurabilityRatio(t *testing.T) {
project, err := uplinkPeer.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
satellite.GracefulExit.Chore.Loop.Pause()
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
@ -218,7 +218,9 @@ func TestChoreDurabilityRatio(t *testing.T) {
require.NoError(t, err)
}
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)

View File

@ -29,7 +29,7 @@ type Config struct {
ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500" testDefault:"10"`
ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s" testDefault:"$TESTINTERVAL"`
UseRangedLoop bool `help:"whether or not to use the ranged loop observer instead of the chore." default:"false" testDefault:"false"`
UseRangedLoop bool `help:"whether use GE observer with ranged loop." default:"true"`
EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"300" testDefault:"100"`

View File

@ -161,8 +161,6 @@ func TestConcurrentConnections(t *testing.T) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
satellite.GracefulExit.Chore.Loop.Pause()
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
@ -228,8 +226,9 @@ func TestConcurrentConnections(t *testing.T) {
require.NoError(t, c.Close())
}
// wait for initial loop to start so we have pieces to transfer
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
{ // this connection should not close immediately, since there are pieces to transfer
c, err := client.Process(ctx)
@ -281,8 +280,6 @@ func TestRecvTimeout(t *testing.T) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
@ -297,9 +294,9 @@ func TestRecvTimeout(t *testing.T) {
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq)
require.NoError(t, err)
// run the satellite chore to build the transfer queue.
satellite.GracefulExit.Chore.Loop.TriggerWait()
satellite.GracefulExit.Chore.Loop.Pause()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
@ -971,7 +968,6 @@ func TestExitDisabled(t *testing.T) {
satellite := planet.Satellites[0]
exitingNode := planet.StorageNodes[0]
require.Nil(t, satellite.GracefulExit.Chore)
require.Nil(t, satellite.GracefulExit.Endpoint)
conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL())
@ -1004,8 +1000,6 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
satellite.GracefulExit.Chore.Loop.Pause()
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path0", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
@ -1034,8 +1028,9 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
// trigger the metainfo loop chore so we can get some pieces to transfer
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// make sure all the pieces are in the transfer queue
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
@ -1101,8 +1096,6 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
satellite.GracefulExit.Chore.Loop.Pause()
_, err = project.EnsureBucket(ctx, "testbucket")
require.NoError(t, err)
@ -1145,8 +1138,9 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
// trigger the metainfo loop chore so we can get some pieces to transfer
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// make sure all the pieces are in the transfer queue
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
@ -1278,8 +1272,6 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
satellite.GracefulExit.Chore.Loop.Pause()
nodeFullIDs := make(map[storj.NodeID]*identity.FullIdentity)
for _, node := range planet.StorageNodes {
nodeFullIDs[node.ID()] = node.Identity
@ -1324,8 +1316,9 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
// close the old client
require.NoError(t, c.CloseSend())
// trigger the metainfo loop chore so we can get some pieces to transfer
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// make sure all the pieces are in the transfer queue
_, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0)
@ -1404,8 +1397,6 @@ func TestIneligibleNodeAge(t *testing.T) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
satellite.GracefulExit.Chore.Loop.Pause()
nodeFullIDs := make(map[storj.NodeID]*identity.FullIdentity)
for _, node := range planet.StorageNodes {
nodeFullIDs[node.ID()] = node.Identity
@ -1467,8 +1458,6 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun
_, err = project.EnsureBucket(ctx, "testbucket")
require.NoError(t, err)
satellite.GracefulExit.Chore.Loop.Pause()
nodeFullIDs := make(map[storj.NodeID]*identity.FullIdentity)
for _, node := range planet.StorageNodes {
nodeFullIDs[node.ID()] = node.Identity
@ -1529,8 +1518,9 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun
// close the old client
require.NoError(t, c.CloseSend())
// trigger the metainfo loop chore so we can get some pieces to transfer
satellite.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// make sure all the pieces are in the transfer queue
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0)

View File

@ -245,9 +245,10 @@ func TestGracefulExit_CopiedObjects(t *testing.T) {
NodeID: node,
ExitInitiatedAt: time.Now().UTC(),
})
require.NoError(t, err)
// trigger segments loop with GE
planet.Satellites[0].GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// we should get only one item from GE queue as we have only one remote segments

View File

@ -13,6 +13,7 @@ import (
"storj.io/common/storj"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
"storj.io/storj/satellite/overlay"
)
@ -31,6 +32,7 @@ type Observer struct {
}
var _ rangedloop.Observer = (*Observer)(nil)
var _ rangedloop.Partial = (*observerFork)(nil)
// NewObserver returns a new ranged loop observer.
func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, config Config) *Observer {
@ -79,21 +81,19 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)
func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: trim out/refactor segmentloop.Observer bits from path collector
// once segmentloop.Observer is removed.
return NewPathCollector(obs.log, obs.db, obs.exitingNodes, obs.config.ChoreBatchSize), nil
return newObserverFork(obs.log, obs.db, obs.exitingNodes, obs.config.ChoreBatchSize), nil
}
// Join flushes the forked path collector and aggregates collected metrics.
func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
pathCollector, ok := partial.(*PathCollector)
pathCollector, ok := partial.(*observerFork)
if !ok {
return Error.New("expected partial type %T but got %T", pathCollector, partial)
}
if err := pathCollector.Flush(ctx); err != nil {
if err := pathCollector.flush(ctx, 1); err != nil {
return err
}
@ -164,3 +164,96 @@ func (obs *Observer) checkForInactiveNodes(ctx context.Context, exitingNodes []*
}
}
type observerFork struct {
log *zap.Logger
db DB
buffer []TransferQueueItem
batchSize int
nodeIDStorage map[storj.NodeID]int64
}
func newObserverFork(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *observerFork {
fork := &observerFork{
log: log,
db: db,
buffer: make([]TransferQueueItem, 0, batchSize),
batchSize: batchSize,
nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)),
}
if len(exitingNodes) > 0 {
for _, nodeID := range exitingNodes {
fork.nodeIDStorage[nodeID] = 0
}
}
return fork
}
// Process adds transfer queue items for remote segments belonging to newly exiting nodes.
func (observer *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
// Intentionally omitting mon.Task here. The duration for all process
// calls are aggregated and and emitted by the ranged loop service.
if len(observer.nodeIDStorage) == 0 {
return nil
}
for _, segment := range segments {
if segment.Inline() {
continue
}
if err := observer.handleRemoteSegment(ctx, segment); err != nil {
return err
}
}
return nil
}
func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment segmentloop.Segment) (err error) {
numPieces := len(segment.Pieces)
for _, piece := range segment.Pieces {
if _, ok := observer.nodeIDStorage[piece.StorageNode]; !ok {
continue
}
pieceSize := segment.PieceSize()
observer.nodeIDStorage[piece.StorageNode] += pieceSize
item := TransferQueueItem{
NodeID: piece.StorageNode,
StreamID: segment.StreamID,
Position: segment.Position,
PieceNum: int32(piece.Number),
RootPieceID: segment.RootPieceID,
DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares),
}
observer.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.StorageNode),
zap.String("stream_id", segment.StreamID.String()), zap.Int32("part", int32(segment.Position.Part)),
zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", piece.Number),
zap.Int("num pieces", numPieces), zap.Int16("total possible pieces", segment.Redundancy.TotalShares))
observer.buffer = append(observer.buffer, item)
err = observer.flush(ctx, observer.batchSize)
if err != nil {
return err
}
}
return nil
}
func (observer *observerFork) flush(ctx context.Context, limit int) (err error) {
defer mon.Task()(&ctx)(&err)
if len(observer.buffer) >= limit {
err = observer.db.Enqueue(ctx, observer.buffer, observer.batchSize)
observer.buffer = observer.buffer[:0]
return errs.Wrap(err)
}
return nil
}

View File

@ -1,143 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
var remoteSegmentFunc = mon.Task()
var _ segmentloop.Observer = (*PathCollector)(nil)
var _ rangedloop.Partial = (*PathCollector)(nil)
// PathCollector uses the metainfo loop to add paths to node reservoirs.
//
// architecture: Observer
type PathCollector struct {
log *zap.Logger
db DB
buffer []TransferQueueItem
batchSize int
nodeIDStorage map[storj.NodeID]int64
}
// NewPathCollector instantiates a path collector.
func NewPathCollector(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *PathCollector {
collector := &PathCollector{
log: log,
db: db,
buffer: make([]TransferQueueItem, 0, batchSize),
batchSize: batchSize,
nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)),
}
if len(exitingNodes) > 0 {
for _, nodeID := range exitingNodes {
collector.nodeIDStorage[nodeID] = 0
}
}
return collector
}
// LoopStarted is called at each start of a loop.
func (collector *PathCollector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) {
return nil
}
// Flush persists the current buffer items to the database.
func (collector *PathCollector) Flush(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return collector.flush(ctx, 1)
}
// RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already.
func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
defer remoteSegmentFunc(&ctx)(&err)
if len(collector.nodeIDStorage) == 0 {
return nil
}
return collector.handleRemoteSegment(ctx, segment)
}
func (collector *PathCollector) handleRemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
numPieces := len(segment.Pieces)
for _, piece := range segment.Pieces {
if _, ok := collector.nodeIDStorage[piece.StorageNode]; !ok {
continue
}
pieceSize := segment.PieceSize()
collector.nodeIDStorage[piece.StorageNode] += pieceSize
item := TransferQueueItem{
NodeID: piece.StorageNode,
StreamID: segment.StreamID,
Position: segment.Position,
PieceNum: int32(piece.Number),
RootPieceID: segment.RootPieceID,
DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares),
}
collector.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.StorageNode),
zap.String("stream_id", segment.StreamID.String()), zap.Int32("part", int32(segment.Position.Part)),
zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", piece.Number),
zap.Int("num pieces", numPieces), zap.Int16("total possible pieces", segment.Redundancy.TotalShares))
collector.buffer = append(collector.buffer, item)
err = collector.flush(ctx, collector.batchSize)
if err != nil {
return err
}
}
return nil
}
// InlineSegment returns nil because we're only auditing for storage nodes for now.
func (collector *PathCollector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}
// Process adds transfer queue items for remote segments belonging to newly
// exiting nodes.
func (collector *PathCollector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
// Intentionally omitting mon.Task here. The duration for all process
// calls are aggregated and and emitted by the ranged loop service.
if len(collector.nodeIDStorage) == 0 {
return nil
}
for _, segment := range segments {
if segment.Inline() {
continue
}
if err := collector.handleRemoteSegment(ctx, &segment); err != nil {
return err
}
}
return nil
}
func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) {
defer mon.Task()(&ctx)(&err)
if len(collector.buffer) >= limit {
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize)
collector.buffer = collector.buffer[:0]
return errs.Wrap(err)
}
return nil
}

View File

@ -517,8 +517,8 @@ contact.external-address: ""
# batch size (crdb specific) for deleting and adding items to the transfer queue
# graceful-exit.transfer-queue-batch-size: 1000
# whether or not to use the ranged loop observer instead of the chore.
# graceful-exit.use-ranged-loop: false
# whether use GE observer with ranged loop.
# graceful-exit.use-ranged-loop: true
# path to the certificate chain for this identity
identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert

View File

@ -33,8 +33,6 @@ func TestChore(t *testing.T) {
satellite1 := planet.Satellites[0]
uplinkPeer := planet.Uplinks[0]
satellite1.GracefulExit.Chore.Loop.Pause()
err := uplinkPeer.Upload(ctx, satellite1, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
@ -91,8 +89,9 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
// initiate graceful exit on satellite side by running the SN chore.
exitingNode.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite chore to build the transfer queue.
satellite1.GracefulExit.Chore.Loop.TriggerWait()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite1.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite1.DB.OverlayCache().GetExitingNodes(ctx)

View File

@ -43,8 +43,6 @@ func TestWorkerSuccess(t *testing.T) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
@ -59,9 +57,9 @@ func TestWorkerSuccess(t *testing.T) {
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq)
require.NoError(t, err)
// run the satellite chore to build the transfer queue.
satellite.GracefulExit.Chore.Loop.TriggerWait()
satellite.GracefulExit.Chore.Loop.Pause()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
@ -114,8 +112,6 @@ func TestWorkerTimeout(t *testing.T) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
@ -130,9 +126,9 @@ func TestWorkerTimeout(t *testing.T) {
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq)
require.NoError(t, err)
// run the satellite chore to build the transfer queue.
satellite.GracefulExit.Chore.Loop.TriggerWait()
satellite.GracefulExit.Chore.Loop.Pause()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
@ -192,8 +188,6 @@ func TestWorkerFailure_IneligibleNodeAge(t *testing.T) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)