From 6ac5bf0d7c798e8c56d6aa6bdfceb90782b0d660 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 24 Apr 2023 11:10:00 +0200 Subject: [PATCH] satellite/gracefulexit: remove segments loop parts We are switching completely to ranged loop. https://github.com/storj/storj/issues/5368 Change-Id: Ia3e2d7879d91f7f5ffa99b8e8f108380e3b39f31 --- private/testplanet/satellite.go | 2 - satellite/core.go | 24 ---- satellite/gracefulexit/chore.go | 138 ------------------- satellite/gracefulexit/chore_test.go | 16 ++- satellite/gracefulexit/common.go | 2 +- satellite/gracefulexit/endpoint_test.go | 46 +++---- satellite/gracefulexit/gracefulexit_test.go | 5 +- satellite/gracefulexit/observer.go | 103 +++++++++++++- satellite/gracefulexit/pathcollector.go | 143 -------------------- scripts/testdata/satellite-config.yaml.lock | 4 +- storagenode/gracefulexit/chore_test.go | 7 +- storagenode/gracefulexit/worker_test.go | 18 +-- 12 files changed, 140 insertions(+), 368 deletions(-) delete mode 100644 satellite/gracefulexit/chore.go delete mode 100644 satellite/gracefulexit/pathcollector.go diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 6383f19c1..f810d45d1 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -198,7 +198,6 @@ type Satellite struct { } GracefulExit struct { - Chore *gracefulexit.Chore Endpoint *gracefulexit.Endpoint } } @@ -660,7 +659,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer system.ProjectLimits.Cache = api.ProjectLimits.Cache - system.GracefulExit.Chore = peer.GracefulExit.Chore system.GracefulExit.Endpoint = api.GracefulExit.Endpoint return system diff --git a/satellite/core.go b/satellite/core.go index b8b43f10c..53a8bef46 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -33,7 +33,6 @@ import ( "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleauth" "storj.io/storj/satellite/console/emailreminders" - "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase/segmentloop" @@ -142,10 +141,6 @@ type Core struct { StorjscanService *storjscan.Service StorjscanChore *storjscan.Chore } - - GracefulExit struct { - Chore *gracefulexit.Chore - } } // New creates a new satellite. @@ -591,25 +586,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, } } - { // setup graceful exit - log := peer.Log.Named("gracefulexit") - switch { - case !config.GracefulExit.Enabled: - log.Info("disabled") - case config.GracefulExit.UseRangedLoop: - log.Info("using ranged loop") - default: - peer.GracefulExit.Chore = gracefulexit.NewChore(log, peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.SegmentLoop, config.GracefulExit) - peer.Services.Add(lifecycle.Item{ - Name: "gracefulexit", - Run: peer.GracefulExit.Chore.Run, - Close: peer.GracefulExit.Chore.Close, - }) - peer.Debug.Server.Panel.Add( - debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop)) - } - } - return peer, nil } diff --git a/satellite/gracefulexit/chore.go b/satellite/gracefulexit/chore.go deleted file mode 100644 index 1e1fa4a96..000000000 --- a/satellite/gracefulexit/chore.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package gracefulexit - -import ( - "context" - "database/sql" - "time" - - "github.com/zeebo/errs" - "go.uber.org/zap" - - "storj.io/common/storj" - "storj.io/common/sync2" - "storj.io/storj/satellite/metabase/segmentloop" - "storj.io/storj/satellite/overlay" -) - -// Chore populates the graceful exit transfer queue. -// -// architecture: Chore -type Chore struct { - log *zap.Logger - Loop *sync2.Cycle - db DB - config Config - overlay overlay.DB - segmentLoop *segmentloop.Service -} - -// NewChore instantiates Chore. -func NewChore(log *zap.Logger, db DB, overlay overlay.DB, segmentLoop *segmentloop.Service, config Config) *Chore { - return &Chore{ - log: log, - Loop: sync2.NewCycle(config.ChoreInterval), - db: db, - config: config, - overlay: overlay, - segmentLoop: segmentLoop, - } -} - -// Run starts the chore. -func (chore *Chore) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - return chore.Loop.Run(ctx, func(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - exitingNodes, err := chore.overlay.GetExitingNodes(ctx) - if err != nil { - chore.log.Error("error retrieving nodes that have not finished exiting", zap.Error(err)) - return nil - } - - nodeCount := len(exitingNodes) - if nodeCount == 0 { - return nil - } - chore.log.Debug("found exiting nodes", zap.Int("exitingNodes", nodeCount)) - - exitingNodesLoopIncomplete := make(storj.NodeIDList, 0, nodeCount) - for _, node := range exitingNodes { - if node.ExitLoopCompletedAt == nil { - exitingNodesLoopIncomplete = append(exitingNodesLoopIncomplete, node.NodeID) - continue - } - - progress, err := chore.db.GetProgress(ctx, node.NodeID) - if err != nil && !errs.Is(err, sql.ErrNoRows) { - chore.log.Error("error retrieving progress for node", zap.Stringer("Node ID", node.NodeID), zap.Error(err)) - continue - } - - lastActivityTime := *node.ExitLoopCompletedAt - if progress != nil { - lastActivityTime = progress.UpdatedAt - } - - // check inactive timeframe - if lastActivityTime.Add(chore.config.MaxInactiveTimeFrame).Before(time.Now().UTC()) { - exitStatusRequest := &overlay.ExitStatusRequest{ - NodeID: node.NodeID, - ExitSuccess: false, - ExitFinishedAt: time.Now().UTC(), - } - mon.Meter("graceful_exit_fail_inactive").Mark(1) - _, err = chore.overlay.UpdateExitStatus(ctx, exitStatusRequest) - if err != nil { - chore.log.Error("error updating exit status", zap.Error(err)) - continue - } - - // remove all items from the transfer queue - err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID) - if err != nil { - chore.log.Error("error deleting node from transfer queue", zap.Error(err)) - } - } - } - - // Populate transfer queue for nodes that have not completed the exit loop yet - pathCollector := NewPathCollector(chore.log, chore.db, exitingNodesLoopIncomplete, chore.config.ChoreBatchSize) - err = chore.segmentLoop.Join(ctx, pathCollector) - if err != nil { - chore.log.Error("error joining segment loop.", zap.Error(err)) - return nil - } - - err = pathCollector.Flush(ctx) - if err != nil { - chore.log.Error("error flushing collector buffer.", zap.Error(err)) - return nil - } - - now := time.Now().UTC() - for _, nodeID := range exitingNodesLoopIncomplete { - exitStatus := overlay.ExitStatusRequest{ - NodeID: nodeID, - ExitLoopCompletedAt: now, - } - _, err = chore.overlay.UpdateExitStatus(ctx, &exitStatus) - if err != nil { - chore.log.Error("error updating exit status.", zap.Error(err)) - } - - bytesToTransfer := pathCollector.nodeIDStorage[nodeID] - mon.IntVal("graceful_exit_init_bytes_stored").Observe(bytesToTransfer) - } - return nil - }) -} - -// Close closes chore. -func (chore *Chore) Close() error { - chore.Loop.Close() - return nil -} diff --git a/satellite/gracefulexit/chore_test.go b/satellite/gracefulexit/chore_test.go index def8aa980..f1df2fff5 100644 --- a/satellite/gracefulexit/chore_test.go +++ b/satellite/gracefulexit/chore_test.go @@ -43,8 +43,6 @@ func TestChore(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, project.Close()) }() - satellite.GracefulExit.Chore.Loop.Pause() - err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) @@ -79,7 +77,9 @@ func TestChore(t *testing.T) { } require.Len(t, nodeIDs, 1) - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) require.NoError(t, err) @@ -109,7 +109,6 @@ func TestChore(t *testing.T) { } require.Len(t, nodeIDs, 0) - satellite.GracefulExit.Chore.Loop.Pause() err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0) require.NoError(t, err) @@ -119,7 +118,9 @@ func TestChore(t *testing.T) { // node should fail graceful exit if it has been inactive for maximum inactive time frame since last activity time.Sleep(maximumInactiveTimeFrame + time.Second*1) - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID()) require.NoError(t, err) @@ -159,7 +160,6 @@ func TestChoreDurabilityRatio(t *testing.T) { project, err := uplinkPeer.GetProject(ctx, satellite) require.NoError(t, err) defer func() { require.NoError(t, project.Close()) }() - satellite.GracefulExit.Chore.Loop.Pause() err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) @@ -218,7 +218,9 @@ func TestChoreDurabilityRatio(t *testing.T) { require.NoError(t, err) } - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) require.NoError(t, err) diff --git a/satellite/gracefulexit/common.go b/satellite/gracefulexit/common.go index 8c87fb21a..edefc3cea 100644 --- a/satellite/gracefulexit/common.go +++ b/satellite/gracefulexit/common.go @@ -29,7 +29,7 @@ type Config struct { ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500" testDefault:"10"` ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s" testDefault:"$TESTINTERVAL"` - UseRangedLoop bool `help:"whether or not to use the ranged loop observer instead of the chore." default:"false" testDefault:"false"` + UseRangedLoop bool `help:"whether use GE observer with ranged loop." default:"true"` EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"300" testDefault:"100"` diff --git a/satellite/gracefulexit/endpoint_test.go b/satellite/gracefulexit/endpoint_test.go index af07cea23..90e917343 100644 --- a/satellite/gracefulexit/endpoint_test.go +++ b/satellite/gracefulexit/endpoint_test.go @@ -161,8 +161,6 @@ func TestConcurrentConnections(t *testing.T) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - satellite.GracefulExit.Chore.Loop.Pause() - err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) @@ -228,8 +226,9 @@ func TestConcurrentConnections(t *testing.T) { require.NoError(t, c.Close()) } - // wait for initial loop to start so we have pieces to transfer - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) { // this connection should not close immediately, since there are pieces to transfer c, err := client.Process(ctx) @@ -281,8 +280,6 @@ func TestRecvTimeout(t *testing.T) { satellite := planet.Satellites[0] ul := planet.Uplinks[0] - satellite.GracefulExit.Chore.Loop.Pause() - err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) @@ -297,9 +294,9 @@ func TestRecvTimeout(t *testing.T) { _, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq) require.NoError(t, err) - // run the satellite chore to build the transfer queue. - satellite.GracefulExit.Chore.Loop.TriggerWait() - satellite.GracefulExit.Chore.Loop.Pause() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // check that the satellite knows the storage node is exiting. exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx) @@ -971,7 +968,6 @@ func TestExitDisabled(t *testing.T) { satellite := planet.Satellites[0] exitingNode := planet.StorageNodes[0] - require.Nil(t, satellite.GracefulExit.Chore) require.Nil(t, satellite.GracefulExit.Endpoint) conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL()) @@ -1004,8 +1000,6 @@ func TestSegmentChangedOrDeleted(t *testing.T) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - satellite.GracefulExit.Chore.Loop.Pause() - err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path0", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) @@ -1034,8 +1028,9 @@ func TestSegmentChangedOrDeleted(t *testing.T) { require.Len(t, exitingNodes, 1) require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID) - // trigger the metainfo loop chore so we can get some pieces to transfer - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // make sure all the pieces are in the transfer queue incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) @@ -1101,8 +1096,6 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, project.Close()) }() - satellite.GracefulExit.Chore.Loop.Pause() - _, err = project.EnsureBucket(ctx, "testbucket") require.NoError(t, err) @@ -1145,8 +1138,9 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) { require.Len(t, exitingNodes, 1) require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID) - // trigger the metainfo loop chore so we can get some pieces to transfer - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // make sure all the pieces are in the transfer queue incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) @@ -1278,8 +1272,6 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - satellite.GracefulExit.Chore.Loop.Pause() - nodeFullIDs := make(map[storj.NodeID]*identity.FullIdentity) for _, node := range planet.StorageNodes { nodeFullIDs[node.ID()] = node.Identity @@ -1324,8 +1316,9 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) { // close the old client require.NoError(t, c.CloseSend()) - // trigger the metainfo loop chore so we can get some pieces to transfer - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // make sure all the pieces are in the transfer queue _, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0) @@ -1404,8 +1397,6 @@ func TestIneligibleNodeAge(t *testing.T) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - satellite.GracefulExit.Chore.Loop.Pause() - nodeFullIDs := make(map[storj.NodeID]*identity.FullIdentity) for _, node := range planet.StorageNodes { nodeFullIDs[node.ID()] = node.Identity @@ -1467,8 +1458,6 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun _, err = project.EnsureBucket(ctx, "testbucket") require.NoError(t, err) - satellite.GracefulExit.Chore.Loop.Pause() - nodeFullIDs := make(map[storj.NodeID]*identity.FullIdentity) for _, node := range planet.StorageNodes { nodeFullIDs[node.ID()] = node.Identity @@ -1529,8 +1518,9 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun // close the old client require.NoError(t, c.CloseSend()) - // trigger the metainfo loop chore so we can get some pieces to transfer - satellite.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // make sure all the pieces are in the transfer queue incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0) diff --git a/satellite/gracefulexit/gracefulexit_test.go b/satellite/gracefulexit/gracefulexit_test.go index 7d9a2a67e..8fdd10022 100644 --- a/satellite/gracefulexit/gracefulexit_test.go +++ b/satellite/gracefulexit/gracefulexit_test.go @@ -245,9 +245,10 @@ func TestGracefulExit_CopiedObjects(t *testing.T) { NodeID: node, ExitInitiatedAt: time.Now().UTC(), }) + require.NoError(t, err) - // trigger segments loop with GE - planet.Satellites[0].GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx) require.NoError(t, err) // we should get only one item from GE queue as we have only one remote segments diff --git a/satellite/gracefulexit/observer.go b/satellite/gracefulexit/observer.go index d0d936178..cae9972ba 100644 --- a/satellite/gracefulexit/observer.go +++ b/satellite/gracefulexit/observer.go @@ -13,6 +13,7 @@ import ( "storj.io/common/storj" "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/metabase/segmentloop" "storj.io/storj/satellite/overlay" ) @@ -31,6 +32,7 @@ type Observer struct { } var _ rangedloop.Observer = (*Observer)(nil) +var _ rangedloop.Partial = (*observerFork)(nil) // NewObserver returns a new ranged loop observer. func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, config Config) *Observer { @@ -79,21 +81,19 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error) { defer mon.Task()(&ctx)(&err) - // TODO: trim out/refactor segmentloop.Observer bits from path collector - // once segmentloop.Observer is removed. - return NewPathCollector(obs.log, obs.db, obs.exitingNodes, obs.config.ChoreBatchSize), nil + return newObserverFork(obs.log, obs.db, obs.exitingNodes, obs.config.ChoreBatchSize), nil } // Join flushes the forked path collector and aggregates collected metrics. func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error) { defer mon.Task()(&ctx)(&err) - pathCollector, ok := partial.(*PathCollector) + pathCollector, ok := partial.(*observerFork) if !ok { return Error.New("expected partial type %T but got %T", pathCollector, partial) } - if err := pathCollector.Flush(ctx); err != nil { + if err := pathCollector.flush(ctx, 1); err != nil { return err } @@ -164,3 +164,96 @@ func (obs *Observer) checkForInactiveNodes(ctx context.Context, exitingNodes []* } } + +type observerFork struct { + log *zap.Logger + db DB + buffer []TransferQueueItem + batchSize int + nodeIDStorage map[storj.NodeID]int64 +} + +func newObserverFork(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *observerFork { + fork := &observerFork{ + log: log, + db: db, + buffer: make([]TransferQueueItem, 0, batchSize), + batchSize: batchSize, + nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)), + } + + if len(exitingNodes) > 0 { + for _, nodeID := range exitingNodes { + fork.nodeIDStorage[nodeID] = 0 + } + } + + return fork +} + +// Process adds transfer queue items for remote segments belonging to newly exiting nodes. +func (observer *observerFork) Process(ctx context.Context, segments []segmentloop.Segment) (err error) { + // Intentionally omitting mon.Task here. The duration for all process + // calls are aggregated and and emitted by the ranged loop service. + + if len(observer.nodeIDStorage) == 0 { + return nil + } + + for _, segment := range segments { + if segment.Inline() { + continue + } + if err := observer.handleRemoteSegment(ctx, segment); err != nil { + return err + } + } + return nil +} + +func (observer *observerFork) handleRemoteSegment(ctx context.Context, segment segmentloop.Segment) (err error) { + numPieces := len(segment.Pieces) + for _, piece := range segment.Pieces { + if _, ok := observer.nodeIDStorage[piece.StorageNode]; !ok { + continue + } + + pieceSize := segment.PieceSize() + + observer.nodeIDStorage[piece.StorageNode] += pieceSize + + item := TransferQueueItem{ + NodeID: piece.StorageNode, + StreamID: segment.StreamID, + Position: segment.Position, + PieceNum: int32(piece.Number), + RootPieceID: segment.RootPieceID, + DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares), + } + + observer.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.StorageNode), + zap.String("stream_id", segment.StreamID.String()), zap.Int32("part", int32(segment.Position.Part)), + zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", piece.Number), + zap.Int("num pieces", numPieces), zap.Int16("total possible pieces", segment.Redundancy.TotalShares)) + + observer.buffer = append(observer.buffer, item) + err = observer.flush(ctx, observer.batchSize) + if err != nil { + return err + } + } + + return nil +} + +func (observer *observerFork) flush(ctx context.Context, limit int) (err error) { + defer mon.Task()(&ctx)(&err) + + if len(observer.buffer) >= limit { + err = observer.db.Enqueue(ctx, observer.buffer, observer.batchSize) + observer.buffer = observer.buffer[:0] + + return errs.Wrap(err) + } + return nil +} diff --git a/satellite/gracefulexit/pathcollector.go b/satellite/gracefulexit/pathcollector.go deleted file mode 100644 index b9a49d5a9..000000000 --- a/satellite/gracefulexit/pathcollector.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package gracefulexit - -import ( - "context" - - "github.com/zeebo/errs" - "go.uber.org/zap" - - "storj.io/common/storj" - "storj.io/storj/satellite/metabase/rangedloop" - "storj.io/storj/satellite/metabase/segmentloop" -) - -var remoteSegmentFunc = mon.Task() - -var _ segmentloop.Observer = (*PathCollector)(nil) -var _ rangedloop.Partial = (*PathCollector)(nil) - -// PathCollector uses the metainfo loop to add paths to node reservoirs. -// -// architecture: Observer -type PathCollector struct { - log *zap.Logger - db DB - buffer []TransferQueueItem - batchSize int - nodeIDStorage map[storj.NodeID]int64 -} - -// NewPathCollector instantiates a path collector. -func NewPathCollector(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *PathCollector { - collector := &PathCollector{ - log: log, - db: db, - buffer: make([]TransferQueueItem, 0, batchSize), - batchSize: batchSize, - nodeIDStorage: make(map[storj.NodeID]int64, len(exitingNodes)), - } - - if len(exitingNodes) > 0 { - for _, nodeID := range exitingNodes { - collector.nodeIDStorage[nodeID] = 0 - } - } - - return collector -} - -// LoopStarted is called at each start of a loop. -func (collector *PathCollector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error) { - return nil -} - -// Flush persists the current buffer items to the database. -func (collector *PathCollector) Flush(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - return collector.flush(ctx, 1) -} - -// RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already. -func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) { - defer remoteSegmentFunc(&ctx)(&err) - if len(collector.nodeIDStorage) == 0 { - return nil - } - return collector.handleRemoteSegment(ctx, segment) -} - -func (collector *PathCollector) handleRemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error) { - numPieces := len(segment.Pieces) - for _, piece := range segment.Pieces { - if _, ok := collector.nodeIDStorage[piece.StorageNode]; !ok { - continue - } - - pieceSize := segment.PieceSize() - - collector.nodeIDStorage[piece.StorageNode] += pieceSize - - item := TransferQueueItem{ - NodeID: piece.StorageNode, - StreamID: segment.StreamID, - Position: segment.Position, - PieceNum: int32(piece.Number), - RootPieceID: segment.RootPieceID, - DurabilityRatio: float64(numPieces) / float64(segment.Redundancy.TotalShares), - } - - collector.log.Debug("adding piece to transfer queue.", zap.Stringer("Node ID", piece.StorageNode), - zap.String("stream_id", segment.StreamID.String()), zap.Int32("part", int32(segment.Position.Part)), - zap.Int32("index", int32(segment.Position.Index)), zap.Uint16("piece num", piece.Number), - zap.Int("num pieces", numPieces), zap.Int16("total possible pieces", segment.Redundancy.TotalShares)) - - collector.buffer = append(collector.buffer, item) - err = collector.flush(ctx, collector.batchSize) - if err != nil { - return err - } - } - - return nil -} - -// InlineSegment returns nil because we're only auditing for storage nodes for now. -func (collector *PathCollector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) { - return nil -} - -// Process adds transfer queue items for remote segments belonging to newly -// exiting nodes. -func (collector *PathCollector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) { - // Intentionally omitting mon.Task here. The duration for all process - // calls are aggregated and and emitted by the ranged loop service. - - if len(collector.nodeIDStorage) == 0 { - return nil - } - - for _, segment := range segments { - if segment.Inline() { - continue - } - if err := collector.handleRemoteSegment(ctx, &segment); err != nil { - return err - } - } - return nil -} - -func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) { - defer mon.Task()(&ctx)(&err) - - if len(collector.buffer) >= limit { - err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize) - collector.buffer = collector.buffer[:0] - - return errs.Wrap(err) - } - return nil -} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 2577c58bc..c8a55e6ba 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -517,8 +517,8 @@ contact.external-address: "" # batch size (crdb specific) for deleting and adding items to the transfer queue # graceful-exit.transfer-queue-batch-size: 1000 -# whether or not to use the ranged loop observer instead of the chore. -# graceful-exit.use-ranged-loop: false +# whether use GE observer with ranged loop. +# graceful-exit.use-ranged-loop: true # path to the certificate chain for this identity identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert diff --git a/storagenode/gracefulexit/chore_test.go b/storagenode/gracefulexit/chore_test.go index 2ef36688f..e38c40560 100644 --- a/storagenode/gracefulexit/chore_test.go +++ b/storagenode/gracefulexit/chore_test.go @@ -33,8 +33,6 @@ func TestChore(t *testing.T) { satellite1 := planet.Satellites[0] uplinkPeer := planet.Uplinks[0] - satellite1.GracefulExit.Chore.Loop.Pause() - err := uplinkPeer.Upload(ctx, satellite1, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) @@ -91,8 +89,9 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, // initiate graceful exit on satellite side by running the SN chore. exitingNode.GracefulExit.Chore.Loop.TriggerWait() - // run the satellite chore to build the transfer queue. - satellite1.GracefulExit.Chore.Loop.TriggerWait() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite1.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // check that the satellite knows the storage node is exiting. exitingNodes, err := satellite1.DB.OverlayCache().GetExitingNodes(ctx) diff --git a/storagenode/gracefulexit/worker_test.go b/storagenode/gracefulexit/worker_test.go index a62f68c73..9c7becffa 100644 --- a/storagenode/gracefulexit/worker_test.go +++ b/storagenode/gracefulexit/worker_test.go @@ -43,8 +43,6 @@ func TestWorkerSuccess(t *testing.T) { satellite := planet.Satellites[0] ul := planet.Uplinks[0] - satellite.GracefulExit.Chore.Loop.Pause() - err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) @@ -59,9 +57,9 @@ func TestWorkerSuccess(t *testing.T) { _, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq) require.NoError(t, err) - // run the satellite chore to build the transfer queue. - satellite.GracefulExit.Chore.Loop.TriggerWait() - satellite.GracefulExit.Chore.Loop.Pause() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // check that the satellite knows the storage node is exiting. exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx) @@ -114,8 +112,6 @@ func TestWorkerTimeout(t *testing.T) { satellite := planet.Satellites[0] ul := planet.Uplinks[0] - satellite.GracefulExit.Chore.Loop.Pause() - err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err) @@ -130,9 +126,9 @@ func TestWorkerTimeout(t *testing.T) { _, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq) require.NoError(t, err) - // run the satellite chore to build the transfer queue. - satellite.GracefulExit.Chore.Loop.TriggerWait() - satellite.GracefulExit.Chore.Loop.Pause() + // run the satellite ranged loop to build the transfer queue. + _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx) + require.NoError(t, err) // check that the satellite knows the storage node is exiting. exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx) @@ -192,8 +188,6 @@ func TestWorkerFailure_IneligibleNodeAge(t *testing.T) { satellite := planet.Satellites[0] ul := planet.Uplinks[0] - satellite.GracefulExit.Chore.Loop.Pause() - err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) require.NoError(t, err)