satellite/gracefulexit: implement rangedloop observer
The tests are forked from the chore tests with slight adaptations for being run against the ranged loop. I also moved a benchmark for the database from chore_test.go to db_test.go. The pathcollector is reused as a rangedloop.Partial. https://github.com/storj/storj/issues/5234 Change-Id: I56182031d133812a9f4d4a433c01b9150af39f31
This commit is contained in:
parent
013e74f804
commit
4241e6bf5f
@ -581,8 +581,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
}
|
||||
|
||||
{ // setup graceful exit
|
||||
if config.GracefulExit.Enabled {
|
||||
peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("gracefulexit"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.SegmentLoop, config.GracefulExit)
|
||||
log := peer.Log.Named("gracefulexit")
|
||||
switch {
|
||||
case !config.GracefulExit.Enabled:
|
||||
log.Info("disabled")
|
||||
case config.GracefulExit.UseRangedLoop:
|
||||
log.Info("using ranged loop")
|
||||
default:
|
||||
peer.GracefulExit.Chore = gracefulexit.NewChore(log, peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.SegmentLoop, config.GracefulExit)
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "gracefulexit",
|
||||
Run: peer.GracefulExit.Chore.Run,
|
||||
@ -590,8 +596,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
})
|
||||
peer.Debug.Server.Panel.Add(
|
||||
debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
|
||||
} else {
|
||||
peer.Log.Named("gracefulexit").Info("disabled")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,6 @@
|
||||
package gracefulexit_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -17,10 +16,8 @@ import (
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestChore(t *testing.T) {
|
||||
@ -136,7 +133,7 @@ func TestChore(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestDurabilityRatio(t *testing.T) {
|
||||
func TestChoreDurabilityRatio(t *testing.T) {
|
||||
const (
|
||||
maximumInactiveTimeFrame = time.Second * 1
|
||||
successThreshold = 4
|
||||
@ -232,46 +229,3 @@ func TestDurabilityRatio(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkChore(b *testing.B) {
|
||||
satellitedbtest.Bench(b, func(b *testing.B, db satellite.DB) {
|
||||
gracefulexitdb := db.GracefulExit()
|
||||
ctx := context.Background()
|
||||
|
||||
b.Run("BatchUpdateStats-100", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 100)
|
||||
})
|
||||
if !testing.Short() {
|
||||
b.Run("BatchUpdateStats-250", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 250)
|
||||
})
|
||||
b.Run("BatchUpdateStats-500", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 500)
|
||||
})
|
||||
b.Run("BatchUpdateStats-1000", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 1000)
|
||||
})
|
||||
b.Run("BatchUpdateStats-5000", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 5000)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
var transferQueueItems []gracefulexit.TransferQueueItem
|
||||
for j := 0; j < size; j++ {
|
||||
item := gracefulexit.TransferQueueItem{
|
||||
NodeID: testrand.NodeID(),
|
||||
StreamID: testrand.UUID(),
|
||||
Position: metabase.SegmentPosition{},
|
||||
PieceNum: 0,
|
||||
DurabilityRatio: 1.0,
|
||||
}
|
||||
transferQueueItems = append(transferQueueItems, item)
|
||||
}
|
||||
batchSize := 1000
|
||||
err := db.Enqueue(ctx, transferQueueItems, batchSize)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ type Config struct {
|
||||
|
||||
ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500" testDefault:"10"`
|
||||
ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s" testDefault:"$TESTINTERVAL"`
|
||||
UseRangedLoop bool `help:"whether or not to use the ranged loop observer instead of the chore." default:"false" testDefault:"false"`
|
||||
|
||||
EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"300" testDefault:"100"`
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
package gracefulexit_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -196,3 +197,46 @@ func TestSegmentTransferQueueItem(t *testing.T) {
|
||||
require.Equal(t, 1, item.OrderLimitSendCount)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkEnqueue(b *testing.B) {
|
||||
satellitedbtest.Bench(b, func(b *testing.B, db satellite.DB) {
|
||||
gracefulexitdb := db.GracefulExit()
|
||||
ctx := context.Background()
|
||||
|
||||
b.Run("BatchUpdateStats-100", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 100)
|
||||
})
|
||||
if !testing.Short() {
|
||||
b.Run("BatchUpdateStats-250", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 250)
|
||||
})
|
||||
b.Run("BatchUpdateStats-500", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 500)
|
||||
})
|
||||
b.Run("BatchUpdateStats-1000", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 1000)
|
||||
})
|
||||
b.Run("BatchUpdateStats-5000", func(b *testing.B) {
|
||||
batch(ctx, b, gracefulexitdb, 5000)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
var transferQueueItems []gracefulexit.TransferQueueItem
|
||||
for j := 0; j < size; j++ {
|
||||
item := gracefulexit.TransferQueueItem{
|
||||
NodeID: testrand.NodeID(),
|
||||
StreamID: testrand.UUID(),
|
||||
Position: metabase.SegmentPosition{},
|
||||
PieceNum: 0,
|
||||
DurabilityRatio: 1.0,
|
||||
}
|
||||
transferQueueItems = append(transferQueueItems, item)
|
||||
}
|
||||
batchSize := 1000
|
||||
err := db.Enqueue(ctx, transferQueueItems, batchSize)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
166
satellite/gracefulexit/observer.go
Normal file
166
satellite/gracefulexit/observer.go
Normal file
@ -0,0 +1,166 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package gracefulexit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
// Observer populates the transfer queue for exiting nodes. It also updates the
|
||||
// timed out status and removes transefer queue items for inactive exiting
|
||||
// nodes.
|
||||
type Observer struct {
|
||||
log *zap.Logger
|
||||
db DB
|
||||
overlay overlay.DB
|
||||
config Config
|
||||
|
||||
// The following variables are reset on each loop cycle
|
||||
exitingNodes storj.NodeIDList
|
||||
bytesToTransfer map[storj.NodeID]int64
|
||||
}
|
||||
|
||||
var _ rangedloop.Observer = (*Observer)(nil)
|
||||
|
||||
// NewObserver returns a new ranged loop observer.
|
||||
func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, config Config) *Observer {
|
||||
return &Observer{
|
||||
log: log,
|
||||
db: db,
|
||||
overlay: overlay,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// Start updates the status and clears the transfer queue for inactive exiting
|
||||
// nodes. It then prepares to populate the transfer queue for newly exiting
|
||||
// nodes during the ranged loop cycle.
|
||||
func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Determine which exiting nodes have yet to have complete a segment loop
|
||||
// that queues up related pieces for transfer.
|
||||
exitingNodes, err := obs.overlay.GetExitingNodes(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodeCount := len(exitingNodes)
|
||||
if nodeCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
obs.log.Debug("found exiting nodes", zap.Int("exitingNodes", nodeCount))
|
||||
|
||||
obs.checkForInactiveNodes(ctx, exitingNodes)
|
||||
|
||||
obs.exitingNodes = nil
|
||||
obs.bytesToTransfer = make(map[storj.NodeID]int64)
|
||||
for _, node := range exitingNodes {
|
||||
if node.ExitLoopCompletedAt == nil {
|
||||
obs.exitingNodes = append(obs.exitingNodes, node.NodeID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fork returns path collector that will populate the transfer queue for
|
||||
// segments belonging to newly exiting nodes for its range.
|
||||
func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// TODO: trim out/refactor segmentloop.Observer bits from path collector
|
||||
// once segmentloop.Observer is removed.
|
||||
return NewPathCollector(obs.log, obs.db, obs.exitingNodes, obs.config.ChoreBatchSize), nil
|
||||
}
|
||||
|
||||
// Join flushes the forked path collector and aggregates collected metrics.
|
||||
func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
pathCollector, ok := partial.(*PathCollector)
|
||||
if !ok {
|
||||
return Error.New("expected partial type %T but got %T", pathCollector, partial)
|
||||
}
|
||||
|
||||
if err := pathCollector.Flush(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for nodeID, bytesToTransfer := range pathCollector.nodeIDStorage {
|
||||
obs.bytesToTransfer[nodeID] += bytesToTransfer
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Finish marks that the exit loop has been completed for newly exiting nodes
|
||||
// that were processed in this loop cycle.
|
||||
func (obs *Observer) Finish(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// Record that the exit loop was completed for each node
|
||||
now := time.Now().UTC()
|
||||
for nodeID, bytesToTransfer := range obs.bytesToTransfer {
|
||||
exitStatus := overlay.ExitStatusRequest{
|
||||
NodeID: nodeID,
|
||||
ExitLoopCompletedAt: now,
|
||||
}
|
||||
if _, err := obs.overlay.UpdateExitStatus(ctx, &exitStatus); err != nil {
|
||||
obs.log.Error("error updating exit status.", zap.Error(err))
|
||||
}
|
||||
mon.IntVal("graceful_exit_init_bytes_stored").Observe(bytesToTransfer)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (obs *Observer) checkForInactiveNodes(ctx context.Context, exitingNodes []*overlay.ExitStatus) {
|
||||
for _, node := range exitingNodes {
|
||||
if node.ExitLoopCompletedAt == nil {
|
||||
// Node has not yet had all of its pieces added to the transfer queue
|
||||
continue
|
||||
}
|
||||
|
||||
progress, err := obs.db.GetProgress(ctx, node.NodeID)
|
||||
if err != nil && !errs.Is(err, sql.ErrNoRows) {
|
||||
obs.log.Error("error retrieving progress for node", zap.Stringer("Node ID", node.NodeID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
lastActivityTime := *node.ExitLoopCompletedAt
|
||||
if progress != nil {
|
||||
lastActivityTime = progress.UpdatedAt
|
||||
}
|
||||
|
||||
// check inactive timeframe
|
||||
if lastActivityTime.Add(obs.config.MaxInactiveTimeFrame).Before(time.Now().UTC()) {
|
||||
exitStatusRequest := &overlay.ExitStatusRequest{
|
||||
NodeID: node.NodeID,
|
||||
ExitSuccess: false,
|
||||
ExitFinishedAt: time.Now().UTC(),
|
||||
}
|
||||
mon.Meter("graceful_exit_fail_inactive").Mark(1)
|
||||
_, err = obs.overlay.UpdateExitStatus(ctx, exitStatusRequest)
|
||||
if err != nil {
|
||||
obs.log.Error("error updating exit status", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// remove all items from the transfer queue
|
||||
err := obs.db.DeleteTransferQueueItems(ctx, node.NodeID)
|
||||
if err != nil {
|
||||
obs.log.Error("error deleting node from transfer queue", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
235
satellite/gracefulexit/observer_test.go
Normal file
235
satellite/gracefulexit/observer_test.go
Normal file
@ -0,0 +1,235 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package gracefulexit_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
func TestObserver(t *testing.T) {
|
||||
var maximumInactiveTimeFrame = time.Second * 1
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 8,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(
|
||||
func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
|
||||
config.GracefulExit.UseRangedLoop = true
|
||||
},
|
||||
testplanet.ReconfigureRS(4, 6, 8, 8),
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellite := planet.Satellites[0]
|
||||
exitingNode := planet.StorageNodes[1]
|
||||
|
||||
project, err := uplinkPeer.GetProject(ctx, satellite)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, project.Close()) }()
|
||||
|
||||
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path2", testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := project.BeginUpload(ctx, "testbucket", "test/path3", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
upload, err := project.UploadPart(ctx, "testbucket", "test/path3", info.UploadID, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, upload.Commit())
|
||||
|
||||
exitStatusRequest := overlay.ExitStatusRequest{
|
||||
NodeID: exitingNode.ID(),
|
||||
ExitInitiatedAt: time.Now(),
|
||||
}
|
||||
|
||||
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusRequest)
|
||||
require.NoError(t, err)
|
||||
|
||||
exitingNodes, err := satellite.Overlay.DB.GetExitingNodes(ctx)
|
||||
require.NoError(t, err)
|
||||
nodeIDs := make(storj.NodeIDList, 0, len(exitingNodes))
|
||||
for _, exitingNode := range exitingNodes {
|
||||
if exitingNode.ExitLoopCompletedAt == nil {
|
||||
nodeIDs = append(nodeIDs, exitingNode.NodeID)
|
||||
}
|
||||
}
|
||||
require.Len(t, nodeIDs, 1)
|
||||
|
||||
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 3)
|
||||
for _, incomplete := range incompleteTransfers {
|
||||
require.True(t, incomplete.DurabilityRatio > 0)
|
||||
require.NotNil(t, incomplete.RootPieceID)
|
||||
}
|
||||
|
||||
// test the other nodes don't have anything to transfer
|
||||
for _, node := range planet.StorageNodes {
|
||||
if node.ID() == exitingNode.ID() {
|
||||
continue
|
||||
}
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 0)
|
||||
}
|
||||
|
||||
exitingNodes, err = satellite.Overlay.DB.GetExitingNodes(ctx)
|
||||
require.NoError(t, err)
|
||||
nodeIDs = make(storj.NodeIDList, 0, len(exitingNodes))
|
||||
for _, exitingNode := range exitingNodes {
|
||||
if exitingNode.ExitLoopCompletedAt == nil {
|
||||
nodeIDs = append(nodeIDs, exitingNode.NodeID)
|
||||
}
|
||||
}
|
||||
require.Len(t, nodeIDs, 0)
|
||||
|
||||
err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 3)
|
||||
|
||||
// node should fail graceful exit if it has been inactive for maximum inactive time frame since last activity
|
||||
time.Sleep(maximumInactiveTimeFrame + time.Second*1)
|
||||
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID())
|
||||
require.NoError(t, err)
|
||||
require.False(t, exitStatus.ExitSuccess)
|
||||
require.NotNil(t, exitStatus.ExitFinishedAt)
|
||||
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 0)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func TestObserverDurabilityRatio(t *testing.T) {
|
||||
const (
|
||||
maximumInactiveTimeFrame = time.Second * 1
|
||||
successThreshold = 4
|
||||
)
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 4,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(
|
||||
func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
|
||||
config.GracefulExit.UseRangedLoop = true
|
||||
},
|
||||
testplanet.ReconfigureRS(2, 3, successThreshold, 4),
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
uplinkPeer := planet.Uplinks[0]
|
||||
satellite := planet.Satellites[0]
|
||||
nodeToRemove := planet.StorageNodes[0]
|
||||
exitingNode := planet.StorageNodes[1]
|
||||
|
||||
project, err := uplinkPeer.GetProject(ctx, satellite)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, project.Close()) }()
|
||||
|
||||
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := project.BeginUpload(ctx, "testbucket", "test/path2", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
upload, err := project.UploadPart(ctx, "testbucket", "test/path2", info.UploadID, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, upload.Commit())
|
||||
|
||||
exitStatusRequest := overlay.ExitStatusRequest{
|
||||
NodeID: exitingNode.ID(),
|
||||
ExitInitiatedAt: time.Now(),
|
||||
}
|
||||
|
||||
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusRequest)
|
||||
require.NoError(t, err)
|
||||
|
||||
exitingNodes, err := satellite.Overlay.DB.GetExitingNodes(ctx)
|
||||
require.NoError(t, err)
|
||||
nodeIDs := make(storj.NodeIDList, 0, len(exitingNodes))
|
||||
for _, exitingNode := range exitingNodes {
|
||||
if exitingNode.ExitLoopCompletedAt == nil {
|
||||
nodeIDs = append(nodeIDs, exitingNode.NodeID)
|
||||
}
|
||||
}
|
||||
require.Len(t, nodeIDs, 1)
|
||||
|
||||
// retrieve remote segment
|
||||
segments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, segments, 2)
|
||||
|
||||
for _, segment := range segments {
|
||||
remotePieces := segment.Pieces
|
||||
var newPieces metabase.Pieces = make(metabase.Pieces, len(remotePieces)-1)
|
||||
idx := 0
|
||||
for _, p := range remotePieces {
|
||||
if p.StorageNode != nodeToRemove.ID() {
|
||||
newPieces[idx] = p
|
||||
idx++
|
||||
}
|
||||
}
|
||||
err = satellite.Metabase.DB.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
|
||||
StreamID: segment.StreamID,
|
||||
Position: segment.Position,
|
||||
|
||||
OldPieces: segment.Pieces,
|
||||
NewPieces: newPieces,
|
||||
NewRedundancy: segment.Redundancy,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 2)
|
||||
for _, incomplete := range incompleteTransfers {
|
||||
require.Equal(t, float64(successThreshold-1)/float64(successThreshold), incomplete.DurabilityRatio)
|
||||
require.NotNil(t, incomplete.RootPieceID)
|
||||
}
|
||||
})
|
||||
}
|
@ -10,6 +10,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
"storj.io/uplink/private/eestream"
|
||||
)
|
||||
@ -17,6 +18,7 @@ import (
|
||||
var remoteSegmentFunc = mon.Task()
|
||||
|
||||
var _ segmentloop.Observer = (*PathCollector)(nil)
|
||||
var _ rangedloop.Partial = (*PathCollector)(nil)
|
||||
|
||||
// PathCollector uses the metainfo loop to add paths to node reservoirs.
|
||||
//
|
||||
@ -62,11 +64,13 @@ func (collector *PathCollector) Flush(ctx context.Context) (err error) {
|
||||
// RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already.
|
||||
func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
defer remoteSegmentFunc(&ctx)(&err)
|
||||
|
||||
if len(collector.nodeIDStorage) == 0 {
|
||||
return nil
|
||||
}
|
||||
return collector.handleRemoteSegment(ctx, segment)
|
||||
}
|
||||
|
||||
func (collector *PathCollector) handleRemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
|
||||
pieceSize := int64(-1)
|
||||
|
||||
numPieces := len(segment.Pieces)
|
||||
@ -115,6 +119,27 @@ func (collector *PathCollector) InlineSegment(ctx context.Context, segment *segm
|
||||
return nil
|
||||
}
|
||||
|
||||
// Process adds transfer queue items for remote segments belonging to newly
|
||||
// exiting nodes.
|
||||
func (collector *PathCollector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
|
||||
// Intentionally emitting mon.Task here. The duration for all process
|
||||
// calls are aggregated and and emitted by the ranged loop service.
|
||||
|
||||
if len(collector.nodeIDStorage) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, segment := range segments {
|
||||
if segment.Inline() {
|
||||
continue
|
||||
}
|
||||
if err := collector.handleRemoteSegment(ctx, &segment); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
|
@ -18,9 +18,9 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/private/debug"
|
||||
"storj.io/storj/private/lifecycle"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
|
||||
"storj.io/storj/satellite/metrics"
|
||||
)
|
||||
|
||||
@ -44,6 +44,10 @@ type RangedLoop struct {
|
||||
Observer rangedloop.Observer
|
||||
}
|
||||
|
||||
GracefulExit struct {
|
||||
Observer rangedloop.Observer
|
||||
}
|
||||
|
||||
RangedLoop struct {
|
||||
Service *rangedloop.Service
|
||||
}
|
||||
@ -83,17 +87,28 @@ func NewRangedLoop(log *zap.Logger, full *identity.FullIdentity, db DB, metabase
|
||||
peer.Metrics.Observer = metrics.NewObserver()
|
||||
}
|
||||
|
||||
{ // setup gracefulexit
|
||||
peer.GracefulExit.Observer = gracefulexit.NewObserver(
|
||||
peer.Log.Named("gracefulexit:observer"),
|
||||
peer.DB.GracefulExit(),
|
||||
peer.DB.OverlayCache(),
|
||||
config.GracefulExit,
|
||||
)
|
||||
}
|
||||
|
||||
{ // setup ranged loop
|
||||
var observers []rangedloop.Observer
|
||||
|
||||
// TODO: replace with real segment provider
|
||||
segments := &rangedlooptest.RangeSplitter{}
|
||||
|
||||
if config.Metrics.UseRangedLoop {
|
||||
observers = append(observers, peer.Metrics.Observer)
|
||||
}
|
||||
|
||||
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, segments, observers)
|
||||
if config.GracefulExit.Enabled && config.GracefulExit.UseRangedLoop {
|
||||
observers = append(observers, peer.GracefulExit.Observer)
|
||||
}
|
||||
|
||||
segments := rangedloop.NewMetabaseRangeSplitter(metabaseDB, config.RangedLoop.BatchSize)
|
||||
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, &segments, observers)
|
||||
|
||||
peer.Services.Add(lifecycle.Item{
|
||||
Name: "rangeloop",
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -499,6 +499,9 @@ contact.external-address: ""
|
||||
# batch size (crdb specific) for deleting and adding items to the transfer queue
|
||||
# graceful-exit.transfer-queue-batch-size: 1000
|
||||
|
||||
# whether or not to use the ranged loop observer instead of the chore.
|
||||
# graceful-exit.use-ranged-loop: false
|
||||
|
||||
# path to the certificate chain for this identity
|
||||
identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user