diff --git a/satellite/gracefulexit/chore.go b/satellite/gracefulexit/chore.go index 015cd7d68..1cd4d2ae1 100644 --- a/satellite/gracefulexit/chore.go +++ b/satellite/gracefulexit/chore.go @@ -92,7 +92,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) { } // remove all items from the transfer queue - err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID, progress.UsesSegmentTransferQueue) + err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID) if err != nil { chore.log.Error("error deleting node from transfer queue", zap.Error(err)) } diff --git a/satellite/gracefulexit/chore_test.go b/satellite/gracefulexit/chore_test.go index 9a31e7fc1..ed6878453 100644 --- a/satellite/gracefulexit/chore_test.go +++ b/satellite/gracefulexit/chore_test.go @@ -84,7 +84,7 @@ func TestChore(t *testing.T) { satellite.GracefulExit.Chore.Loop.TriggerWait() - incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true) + incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) require.NoError(t, err) require.Len(t, incompleteTransfers, 3) for _, incomplete := range incompleteTransfers { @@ -97,7 +97,7 @@ func TestChore(t *testing.T) { if node.ID() == exitingNode.ID() { continue } - incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0, true) + incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0) require.NoError(t, err) require.Len(t, incompleteTransfers, 0) } @@ -116,7 +116,7 @@ func TestChore(t *testing.T) { err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0) require.NoError(t, err) - incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true) + incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) require.NoError(t, err) require.Len(t, incompleteTransfers, 3) @@ -129,7 +129,7 @@ func TestChore(t *testing.T) { require.False(t, exitStatus.ExitSuccess) require.NotNil(t, exitStatus.ExitFinishedAt) - incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true) + incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) require.NoError(t, err) require.Len(t, incompleteTransfers, 0) @@ -223,7 +223,7 @@ func TestDurabilityRatio(t *testing.T) { satellite.GracefulExit.Chore.Loop.TriggerWait() - incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true) + incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0) require.NoError(t, err) require.Len(t, incompleteTransfers, 2) for _, incomplete := range incompleteTransfers { @@ -263,14 +263,15 @@ func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) { for j := 0; j < size; j++ { item := gracefulexit.TransferQueueItem{ NodeID: testrand.NodeID(), - Key: testrand.Bytes(memory.B * 256), + StreamID: testrand.UUID(), + Position: metabase.SegmentPosition{}, PieceNum: 0, DurabilityRatio: 1.0, } transferQueueItems = append(transferQueueItems, item) } batchSize := 1000 - err := db.Enqueue(ctx, transferQueueItems, batchSize, true) + err := db.Enqueue(ctx, transferQueueItems, batchSize) require.NoError(b, err) } } diff --git a/satellite/gracefulexit/db.go b/satellite/gracefulexit/db.go index f207cf88f..65f790a94 100644 --- a/satellite/gracefulexit/db.go +++ b/satellite/gracefulexit/db.go @@ -14,18 +14,16 @@ import ( // Progress represents the persisted graceful exit progress record. type Progress struct { - NodeID storj.NodeID - BytesTransferred int64 - PiecesTransferred int64 - PiecesFailed int64 - UpdatedAt time.Time - UsesSegmentTransferQueue bool + NodeID storj.NodeID + BytesTransferred int64 + PiecesTransferred int64 + PiecesFailed int64 + UpdatedAt time.Time } // TransferQueueItem represents the persisted graceful exit queue record. type TransferQueueItem struct { NodeID storj.NodeID - Key metabase.SegmentKey StreamID uuid.UUID Position metabase.SegmentPosition PieceNum int32 @@ -50,15 +48,15 @@ type DB interface { GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error) // Enqueue batch inserts graceful exit transfer queue entries it does not exist. - Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int, usesSegmentTransferQueue bool) error + Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error // UpdateTransferQueueItem creates a graceful exit transfer queue entry. - UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem, usesSegmentTransferQueue bool) error + UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entry. - DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error + DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID. - DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) error + DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error // DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries. - DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) error + DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error // DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer // queue items whose nodes have finished the exit before the indicated time // returning the total number of deleted items. @@ -69,15 +67,15 @@ type DB interface { // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) // GetTransferQueueItem gets a graceful exit transfer queue entry. - GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) (*TransferQueueItem, error) + GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) (*TransferQueueItem, error) // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. - GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error) + GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending. - GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error) + GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. - GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error) + GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error) // IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring. - IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error + IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error // CountFinishedTransferQueueItemsByNode return a map of the nodes which has // finished the exit before the indicated time but there are at least one item // left in the transfer queue. diff --git a/satellite/gracefulexit/db_test.go b/satellite/gracefulexit/db_test.go index e9fc1e164..840a622a7 100644 --- a/satellite/gracefulexit/db_test.go +++ b/satellite/gracefulexit/db_test.go @@ -9,11 +9,9 @@ import ( "github.com/stretchr/testify/require" - "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" - "storj.io/common/uuid" "storj.io/storj/satellite" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/metabase" @@ -56,246 +54,6 @@ func TestProgress(t *testing.T) { }) } -// TODO: remove this test when graceful_exit_transfer_queue is dropped. -func TestTransferQueueItem(t *testing.T) { - // test basic graceful exit transfer queue crud - satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - geDB := db.GracefulExit() - - nodeID1 := testrand.NodeID() - nodeID2 := testrand.NodeID() - key1 := metabase.SegmentKey(testrand.Bytes(memory.B * 32)) - key2 := metabase.SegmentKey(testrand.Bytes(memory.B * 32)) - // root piece IDs for path 1 and 2 - rootPieceID1 := testrand.PieceID() - rootPieceID2 := testrand.PieceID() - items := []gracefulexit.TransferQueueItem{ - { - NodeID: nodeID1, - Key: key1, - PieceNum: 1, - RootPieceID: rootPieceID1, - DurabilityRatio: 0.9, - }, - { - NodeID: nodeID1, - Key: key2, - PieceNum: 2, - RootPieceID: rootPieceID2, - DurabilityRatio: 1.1, - }, - { - NodeID: nodeID2, - Key: key1, - PieceNum: 2, - RootPieceID: rootPieceID1, - DurabilityRatio: 0.9, - }, - { - NodeID: nodeID2, - Key: key2, - PieceNum: 1, - RootPieceID: rootPieceID2, - DurabilityRatio: 1.1, - }, - } - - // test basic create, update, get delete - { - batchSize := 1000 - err := geDB.Enqueue(ctx, items, batchSize, false) - require.NoError(t, err) - - for _, tqi := range items { - item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum) - require.NoError(t, err) - require.Equal(t, tqi.RootPieceID, item.RootPieceID) - require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio) - - now := time.Now() - item.DurabilityRatio = 1.2 - item.RequestedAt = &now - - err = geDB.UpdateTransferQueueItem(ctx, *item, false) - require.NoError(t, err) - - latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum) - require.NoError(t, err) - - require.Equal(t, item.RootPieceID, latestItem.RootPieceID) - require.Equal(t, item.DurabilityRatio, latestItem.DurabilityRatio) - require.WithinDuration(t, now, *latestItem.RequestedAt, time.Second) - } - - queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, false) - require.NoError(t, err) - require.Len(t, queueItems, 2) - } - - // mark the first item finished and test that only 1 item gets returned from the GetIncomplete - { - item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key1, uuid.UUID{}, metabase.SegmentPosition{}, 1) - require.NoError(t, err) - - now := time.Now() - item.FinishedAt = &now - - err = geDB.UpdateTransferQueueItem(ctx, *item, false) - require.NoError(t, err) - - queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, false) - require.NoError(t, err) - require.Len(t, queueItems, 1) - for _, queueItem := range queueItems { - require.Equal(t, nodeID1, queueItem.NodeID) - require.Equal(t, key2, queueItem.Key) - } - } - - // test delete finished queue items. Only key1 should be removed - { - err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1, false) - require.NoError(t, err) - - // key1 should no longer exist for nodeID1 - _, err = geDB.GetTransferQueueItem(ctx, nodeID1, key1, uuid.UUID{}, metabase.SegmentPosition{}, 1) - require.Error(t, err) - - // key2 should still exist for nodeID1 - _, err = geDB.GetTransferQueueItem(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2) - require.NoError(t, err) - } - - // test delete all for a node - { - queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0, false) - require.NoError(t, err) - require.Len(t, queueItems, 2) - - err = geDB.DeleteTransferQueueItems(ctx, nodeID2, false) - require.NoError(t, err) - - queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0, false) - require.NoError(t, err) - require.Len(t, queueItems, 0) - } - - // test increment order limit send count - err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2) - require.NoError(t, err) - - // get queue item for key2 since that still exists - item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2) - require.NoError(t, err) - - require.Equal(t, 1, item.OrderLimitSendCount) - }) -} - -// TODO: remove this test when graceful_exit_transfer_queue is dropped. -func TestBothTransferQueueItem(t *testing.T) { - satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - geDB := db.GracefulExit() - - progress1 := gracefulexit.Progress{ - NodeID: testrand.NodeID(), - UsesSegmentTransferQueue: false, - } - progress2 := gracefulexit.Progress{ - NodeID: testrand.NodeID(), - UsesSegmentTransferQueue: true, - } - progress := []gracefulexit.Progress{progress1, progress2} - key1 := metabase.SegmentKey(testrand.Bytes(memory.B * 32)) - key2 := metabase.SegmentKey(testrand.Bytes(memory.B * 32)) - // root piece IDs for path 1 and 2 - rootPieceID1 := testrand.PieceID() - rootPieceID2 := testrand.PieceID() - // root piece IDs for segments - rootPieceID3 := testrand.PieceID() - rootPieceID4 := testrand.PieceID() - streamID1 := testrand.UUID() - streamID2 := testrand.UUID() - position1 := metabase.SegmentPosition{Part: 1, Index: 2} - position2 := metabase.SegmentPosition{Part: 2, Index: 3} - - itemsInTransferQueue := []gracefulexit.TransferQueueItem{ - { - NodeID: progress1.NodeID, - Key: key1, - PieceNum: 1, - RootPieceID: rootPieceID1, - DurabilityRatio: 0.9, - }, - { - NodeID: progress1.NodeID, - Key: key2, - PieceNum: 2, - RootPieceID: rootPieceID2, - DurabilityRatio: 1.1, - }, - } - itemsInSegmentTransferQueue := []gracefulexit.TransferQueueItem{ - { - NodeID: progress2.NodeID, - StreamID: streamID1, - Position: position1, - PieceNum: 2, - RootPieceID: rootPieceID3, - DurabilityRatio: 0.9, - }, - { - NodeID: progress2.NodeID, - StreamID: streamID2, - Position: position2, - PieceNum: 1, - RootPieceID: rootPieceID4, - DurabilityRatio: 1.1, - }, - } - - { - batchSize := 1000 - err := geDB.Enqueue(ctx, itemsInTransferQueue, batchSize, false) - require.NoError(t, err) - err = geDB.Enqueue(ctx, itemsInSegmentTransferQueue, batchSize, true) - require.NoError(t, err) - - for _, tqi := range append(itemsInTransferQueue, itemsInSegmentTransferQueue...) { - item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum) - require.NoError(t, err) - require.Equal(t, tqi.RootPieceID, item.RootPieceID) - require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio) - } - - // check that we get nothing if we don't use the right transfer queue - for _, p := range progress { - queueItems, err := geDB.GetIncomplete(ctx, p.NodeID, 10, 0, !p.UsesSegmentTransferQueue) - require.NoError(t, err) - require.Len(t, queueItems, 0) - } - } - - // test delete - { - for _, p := range progress { - // check that we have the right number of items before trying to delete - queueItems, err := geDB.GetIncomplete(ctx, p.NodeID, 10, 0, p.UsesSegmentTransferQueue) - require.NoError(t, err) - require.Len(t, queueItems, 2) - - err = geDB.DeleteTransferQueueItems(ctx, p.NodeID, p.UsesSegmentTransferQueue) - require.NoError(t, err) - - queueItems, err = geDB.GetIncomplete(ctx, p.NodeID, 10, 0, p.UsesSegmentTransferQueue) - require.NoError(t, err) - require.Len(t, queueItems, 0) - } - } - - }) -} - func TestSegmentTransferQueueItem(t *testing.T) { // test basic graceful exit transfer queue crud satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { @@ -349,11 +107,11 @@ func TestSegmentTransferQueueItem(t *testing.T) { // test basic create, update, get delete { batchSize := 1000 - err := geDB.Enqueue(ctx, items, batchSize, true) + err := geDB.Enqueue(ctx, items, batchSize) require.NoError(t, err) for _, tqi := range items { - item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum) + item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.StreamID, tqi.Position, tqi.PieceNum) require.NoError(t, err) require.Equal(t, tqi.RootPieceID, item.RootPieceID) require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio) @@ -362,10 +120,10 @@ func TestSegmentTransferQueueItem(t *testing.T) { item.DurabilityRatio = 1.2 item.RequestedAt = &now - err = geDB.UpdateTransferQueueItem(ctx, *item, true) + err = geDB.UpdateTransferQueueItem(ctx, *item) require.NoError(t, err) - latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum) + latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.StreamID, tqi.Position, tqi.PieceNum) require.NoError(t, err) require.Equal(t, item.RootPieceID, latestItem.RootPieceID) @@ -373,23 +131,23 @@ func TestSegmentTransferQueueItem(t *testing.T) { require.WithinDuration(t, now, *latestItem.RequestedAt, time.Second) } - queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, true) + queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0) require.NoError(t, err) require.Len(t, queueItems, 2) } // mark the first item finished and test that only 1 item gets returned from the GetIncomplete { - item, err := geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID1, position1, 1) + item, err := geDB.GetTransferQueueItem(ctx, nodeID1, streamID1, position1, 1) require.NoError(t, err) now := time.Now() item.FinishedAt = &now - err = geDB.UpdateTransferQueueItem(ctx, *item, true) + err = geDB.UpdateTransferQueueItem(ctx, *item) require.NoError(t, err) - queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, true) + queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0) require.NoError(t, err) require.Len(t, queueItems, 1) for _, queueItem := range queueItems { @@ -401,38 +159,38 @@ func TestSegmentTransferQueueItem(t *testing.T) { // test delete finished queue items. Only key1 should be removed { - err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1, true) + err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1) require.NoError(t, err) // key1 should no longer exist for nodeID1 - _, err = geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID1, position1, 1) + _, err = geDB.GetTransferQueueItem(ctx, nodeID1, streamID1, position1, 1) require.Error(t, err) // key2 should still exist for nodeID1 - _, err = geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID2, position2, 2) + _, err = geDB.GetTransferQueueItem(ctx, nodeID1, streamID2, position2, 2) require.NoError(t, err) } // test delete all for a node { - queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0, true) + queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0) require.NoError(t, err) require.Len(t, queueItems, 2) - err = geDB.DeleteTransferQueueItems(ctx, nodeID2, true) + err = geDB.DeleteTransferQueueItems(ctx, nodeID2) require.NoError(t, err) - queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0, true) + queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0) require.NoError(t, err) require.Len(t, queueItems, 0) } // test increment order limit send count - err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, nil, streamID2, position2, 2) + err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, streamID2, position2, 2) require.NoError(t, err) // get queue item for key2 since that still exists - item, err := geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID2, position2, 2) + item, err := geDB.GetTransferQueueItem(ctx, nodeID1, streamID2, position2, 2) require.NoError(t, err) require.Equal(t, 1, item.OrderLimitSendCount) diff --git a/satellite/gracefulexit/endpoint.go b/satellite/gracefulexit/endpoint.go index 036985e35..d23bd0f65 100644 --- a/satellite/gracefulexit/endpoint.go +++ b/satellite/gracefulexit/endpoint.go @@ -5,7 +5,6 @@ package gracefulexit import ( "context" - "database/sql" "io" "sync" "time" @@ -134,17 +133,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr endpoint.connections.delete(nodeID) }() - //TODO: remove this call when we remove graceful_exit_transfer_queue is removed - usesSegmentTransferQueue := true - progress, err := endpoint.db.GetProgress(ctx, nodeID) - if err != nil && !errs.Is(err, sql.ErrNoRows) { - return rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - if progress != nil { - usesSegmentTransferQueue = progress.UsesSegmentTransferQueue - } - - isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID, usesSegmentTransferQueue) + isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } @@ -190,14 +179,14 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr loopErr := incompleteLoop.Run(ctx, func(ctx context.Context) error { if pending.Length() == 0 { - incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0, usesSegmentTransferQueue) + incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0) if err != nil { cancel() return pending.DoneSending(err) } if len(incomplete) == 0 { - incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0, usesSegmentTransferQueue) + incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0) if err != nil { cancel() return pending.DoneSending(err) @@ -233,7 +222,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr // if there is no more work to receive send complete if finished { - isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID, usesSegmentTransferQueue) + isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } @@ -247,7 +236,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr return rpcstatus.Error(rpcstatus.Internal, err.Error()) } - err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason, usesSegmentTransferQueue) + err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } @@ -310,7 +299,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr ExitSuccess: false, } - err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED, usesSegmentTransferQueue) + err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } @@ -339,7 +328,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS if err != nil { return Error.Wrap(err) } - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) if err != nil { return Error.Wrap(err) } @@ -347,10 +336,10 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS return nil } - segment, err := endpoint.getValidSegment(ctx, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.RootPieceID) + segment, err := endpoint.getValidSegment(ctx, incomplete.StreamID, incomplete.Position, incomplete.RootPieceID) if err != nil { endpoint.log.Warn("invalid segment", zap.Error(err)) - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) if err != nil { return Error.Wrap(err) } @@ -360,7 +349,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS nodePiece, err := endpoint.getNodePiece(ctx, segment, incomplete) if err != nil { - deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) + deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) if deleteErr != nil { return Error.Wrap(deleteErr) } @@ -374,7 +363,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS return Error.Wrap(err) } - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) if err != nil { return Error.Wrap(err) } @@ -403,12 +392,11 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS } if len(newNodes) == 0 { - return Error.New("could not find a node to receive piece transfer: node ID %v, key %v, piece num %v", nodeID, incomplete.Key, incomplete.PieceNum) + return Error.New("could not find a node to receive piece transfer: node ID %v, stream_id %v, piece num %v", nodeID, incomplete.StreamID, incomplete.PieceNum) } newNode := newNodes[0] endpoint.log.Debug("found new node for piece transfer", zap.Stringer("original node ID", nodeID), zap.Stringer("replacement node ID", newNode.ID), - zap.ByteString("key", incomplete.Key), zap.ByteString("streamID", incomplete.StreamID[:]), zap.Uint32("Part", incomplete.Position.Part), zap.Uint32("Index", incomplete.Position.Index), zap.Int32("piece num", incomplete.PieceNum)) @@ -433,14 +421,13 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS return Error.Wrap(err) } - err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) + err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum) if err != nil { return Error.Wrap(err) } // update pending queue with the transfer item err = pending.Put(pieceID, &PendingTransfer{ - Key: incomplete.Key, StreamID: incomplete.StreamID, Position: incomplete.Position, PieceSize: pieceSize, @@ -479,12 +466,12 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat if err != nil { return Error.Wrap(err) } - transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) + transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) if err != nil { return Error.Wrap(err) } - err = endpoint.updateSegment(ctx, exitingNodeID, receivingNodeID, transfer.Key, transfer.StreamID, transfer.Position, transfer.PieceNum, transferQueueItem.RootPieceID) + err = endpoint.updateSegment(ctx, exitingNodeID, receivingNodeID, transfer.StreamID, transfer.Position, transfer.PieceNum, transferQueueItem.RootPieceID) if err != nil { // remove the piece from the pending queue so it gets retried deleteErr := pending.Delete(originalPieceID) @@ -502,7 +489,7 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat return Error.Wrap(err) } - err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) + err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) if err != nil { return Error.Wrap(err) } @@ -548,7 +535,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, return nil } - transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) + transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) if err != nil { return Error.Wrap(err) } @@ -564,11 +551,11 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, // Remove the queue item and remove the node from the pointer. // If the pointer is not piece hash verified, do not count this as a failure. if pb.TransferFailed_Error(errorCode) == pb.TransferFailed_NOT_FOUND { - endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("key", transfer.Key), + endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("streamID", transfer.StreamID[:]), zap.Uint32("Part", transfer.Position.Part), zap.Uint32("Index", transfer.Position.Index), zap.Uint16("piece num", transfer.PieceNum)) - segment, err := endpoint.getValidSegment(ctx, transfer.Key, transfer.StreamID, transfer.Position, storj.PieceID{}) + segment, err := endpoint.getValidSegment(ctx, transfer.StreamID, transfer.Position, storj.PieceID{}) if err != nil { return Error.Wrap(err) } @@ -581,7 +568,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, } } if nodePiece == (metabase.Piece{}) { - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) if err != nil { return Error.Wrap(err) } @@ -598,7 +585,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, return Error.Wrap(err) } - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) + err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum)) if err != nil { return Error.Wrap(err) } @@ -608,7 +595,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, transferQueueItem.LastFailedAt = &now transferQueueItem.FailedCount = &failedCount transferQueueItem.LastFailedCode = &errorCode - err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem, transfer.Key == nil) + err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem) if err != nil { return Error.Wrap(err) } @@ -624,7 +611,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap, return pending.Delete(pieceID) } -func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (isDisqualified bool, err error) { +func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) { // check if node is disqualified nodeInfo, err := endpoint.overlay.Get(ctx, nodeID) if err != nil { @@ -645,7 +632,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto } // remove remaining items from the queue - err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID, usesSegmentTransferQueue) + err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID) if err != nil { return true, Error.Wrap(err) } @@ -656,7 +643,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto return false, nil } -func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason, usesSegmentTransferQueue bool) error { +func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason) error { finishedMsg, err := endpoint.getFinishedMessage(ctx, exitStatusRequest.NodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, failedReason) if err != nil { return Error.Wrap(err) @@ -673,7 +660,7 @@ func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSate } // remove remaining items from the queue after notifying nodes about their exit status - err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID, usesSegmentTransferQueue) + err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID) if err != nil { return Error.Wrap(err) } @@ -720,11 +707,11 @@ func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, nodeID storj.N return message, nil } -func (endpoint *Endpoint) updateSegment(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNumber uint16, originalRootPieceID storj.PieceID) (err error) { +func (endpoint *Endpoint) updateSegment(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNumber uint16, originalRootPieceID storj.PieceID) (err error) { defer mon.Task()(&ctx)(&err) // remove the node from the segment - segment, err := endpoint.getValidSegment(ctx, key, streamID, position, originalRootPieceID) + segment, err := endpoint.getValidSegment(ctx, streamID, position, originalRootPieceID) if err != nil { return Error.Wrap(err) } @@ -867,7 +854,7 @@ func (endpoint *Endpoint) calculatePieceSize(ctx context.Context, segment metaba } if len(segment.Pieces) > redundancy.OptimalThreshold() { - endpoint.log.Debug("segment has more pieces than required. removing node from segment.", zap.Stringer("node ID", nodeID), zap.ByteString("key", incomplete.Key), zap.Int32("piece num", incomplete.PieceNum)) + endpoint.log.Debug("segment has more pieces than required. removing node from segment.", zap.Stringer("node ID", nodeID), zap.Int32("piece num", incomplete.PieceNum)) return 0, ErrAboveOptimalThreshold.New("") } @@ -875,27 +862,7 @@ func (endpoint *Endpoint) calculatePieceSize(ctx context.Context, segment metaba return eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy), nil } -func (endpoint *Endpoint) getValidSegment(ctx context.Context, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, originalRootPieceID storj.PieceID) (metabase.Segment, error) { - // TODO: remove when table graceful_exit_transfer_queue is dropped - if key != nil { - location, err := metabase.ParseSegmentKey(key) - if err != nil { - return metabase.Segment{}, Error.Wrap(err) - } - - segment, err := endpoint.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{ - SegmentLocation: location, - }) - if err != nil { - return metabase.Segment{}, Error.Wrap(err) - } - - if !originalRootPieceID.IsZero() && originalRootPieceID != segment.RootPieceID { - return metabase.Segment{}, Error.New("segment has changed") - } - return segment, nil - } - +func (endpoint *Endpoint) getValidSegment(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, originalRootPieceID storj.PieceID) (metabase.Segment, error) { segment, err := endpoint.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{ StreamID: streamID, Position: position, @@ -921,7 +888,7 @@ func (endpoint *Endpoint) getNodePiece(ctx context.Context, segment metabase.Seg } if nodePiece == (metabase.Piece{}) { - endpoint.log.Debug("piece no longer held by node", zap.Stringer("node ID", nodeID), zap.ByteString("key", incomplete.Key), zap.Int32("piece num", incomplete.PieceNum)) + endpoint.log.Debug("piece no longer held by node", zap.Stringer("node ID", nodeID), zap.Int32("piece num", incomplete.PieceNum)) return metabase.Piece{}, Error.New("piece no longer held by node") } diff --git a/satellite/gracefulexit/endpoint_test.go b/satellite/gracefulexit/endpoint_test.go index 65b817cd2..fa445cc4f 100644 --- a/satellite/gracefulexit/endpoint_test.go +++ b/satellite/gracefulexit/endpoint_test.go @@ -304,7 +304,7 @@ func TestRecvTimeout(t *testing.T) { require.Len(t, exitingNodes, 1) require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID) - queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true) + queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) require.NoError(t, err) require.Len(t, queueItems, 1) @@ -1031,7 +1031,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) { satellite.GracefulExit.Chore.Loop.TriggerWait() // make sure all the pieces are in the transfer queue - incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true) + incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) require.NoError(t, err) require.Len(t, incomplete, 2) @@ -1071,7 +1071,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) { require.FailNow(t, "should not reach this case: %#v", m) } - queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, true) + queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0) require.NoError(t, err) require.Len(t, queueItems, 0) }) @@ -1142,7 +1142,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) { satellite.GracefulExit.Chore.Loop.TriggerWait() // make sure all the pieces are in the transfer queue - incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true) + incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) require.NoError(t, err) require.Len(t, incomplete, 1) // TODO: change to this when an object part can be overwritten @@ -1185,7 +1185,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) { require.FailNow(t, "should not reach this case: %#v", m) } - queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, true) + queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0) require.NoError(t, err) require.Len(t, queueItems, 0) }) @@ -1321,7 +1321,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) { satellite.GracefulExit.Chore.Loop.TriggerWait() // make sure all the pieces are in the transfer queue - _, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0, true) + _, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0) require.NoError(t, err) var messageCount int @@ -1365,7 +1365,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) { require.Equal(t, messageCount, maxOrderLimitSendCount) // make sure not responding piece not in queue - incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true) + incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) require.NoError(t, err) require.Len(t, incompletes, 0) @@ -1525,7 +1525,7 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun satellite.GracefulExit.Chore.Loop.TriggerWait() // make sure all the pieces are in the transfer queue - incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0, true) + incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0) require.NoError(t, err) // connect to satellite again to start receiving transfers diff --git a/satellite/gracefulexit/gracefulexit_test.go b/satellite/gracefulexit/gracefulexit_test.go index b81828879..1ce9f5ebf 100644 --- a/satellite/gracefulexit/gracefulexit_test.go +++ b/satellite/gracefulexit/gracefulexit_test.go @@ -173,7 +173,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { gracefulExitDB := planet.Satellites[0].DB.GracefulExit() batchSize := 1000 - err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true) + err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize) require.NoError(t, err) asOfSystemTime := -1 * time.Microsecond @@ -260,7 +260,7 @@ func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize( queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...) // Add some items to the transfer queue for the exited nodes. - err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true) + err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize) require.NoError(t, err) disableAsOfSystemTime := time.Second * 0 @@ -405,7 +405,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) { queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...) // Add some items to the transfer queue for the exited nodes. - err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true) + err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize) require.NoError(t, err) disableAsOfSystemTime := time.Second * 0 diff --git a/satellite/gracefulexit/pathcollector.go b/satellite/gracefulexit/pathcollector.go index bb8ae1d6f..4f1c5a488 100644 --- a/satellite/gracefulexit/pathcollector.go +++ b/satellite/gracefulexit/pathcollector.go @@ -116,7 +116,7 @@ func (collector *PathCollector) flush(ctx context.Context, limit int) (err error defer mon.Task()(&ctx)(&err) if len(collector.buffer) >= limit { - err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize, true) + err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize) collector.buffer = collector.buffer[:0] return errs.Wrap(err) diff --git a/satellite/gracefulexit/pending.go b/satellite/gracefulexit/pending.go index e3b60fb8b..fdc6addce 100644 --- a/satellite/gracefulexit/pending.go +++ b/satellite/gracefulexit/pending.go @@ -51,7 +51,6 @@ func (promise *PendingFinishedPromise) finishCalled(err error) { // PendingTransfer is the representation of work on the pending map. // It contains information about a transfer request that has been sent to a storagenode by the satellite. type PendingTransfer struct { - Key metabase.SegmentKey StreamID uuid.UUID Position metabase.SegmentPosition PieceSize int64 diff --git a/satellite/gracefulexit/pending_test.go b/satellite/gracefulexit/pending_test.go index 0ed84024b..856227e21 100644 --- a/satellite/gracefulexit/pending_test.go +++ b/satellite/gracefulexit/pending_test.go @@ -25,8 +25,11 @@ func TestPendingBasic(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + streamID := testrand.UUID() + position := metabase.SegmentPosition{Part: 3, Index: 10} newWork := &gracefulexit.PendingTransfer{ - Key: metabase.SegmentKey("testbucket/testfile"), + StreamID: streamID, + Position: position, PieceSize: 10, SatelliteMessage: &pb.SatelliteMessage{}, PieceNum: 1, @@ -47,7 +50,8 @@ func TestPendingBasic(t *testing.T) { // get should work w, ok := pending.Get(pieceID) require.True(t, ok) - require.True(t, bytes.Equal(newWork.Key, w.Key)) + require.True(t, bytes.Equal(newWork.StreamID[:], w.StreamID[:])) + require.Equal(t, position, w.Position) invalidPieceID := testrand.PieceID() _, ok = pending.Get(invalidPieceID) @@ -99,8 +103,11 @@ func TestPendingIsFinishedWorkAdded(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() + streamID := testrand.UUID() + position := metabase.SegmentPosition{Part: 3, Index: 10} newWork := &gracefulexit.PendingTransfer{ - Key: metabase.SegmentKey("testbucket/testfile"), + StreamID: streamID, + Position: position, PieceSize: 10, SatelliteMessage: &pb.SatelliteMessage{}, PieceNum: 1, diff --git a/satellite/gracefulexit/validation.go b/satellite/gracefulexit/validation.go index e56fdaee4..b62b5addd 100644 --- a/satellite/gracefulexit/validation.go +++ b/satellite/gracefulexit/validation.go @@ -25,8 +25,8 @@ func (endpoint *Endpoint) validatePendingTransfer(ctx context.Context, transfer if transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil { return Error.New("Addressed order limit on transfer piece cannot be nil") } - if transfer.Key == nil && transfer.StreamID.IsZero() { - return Error.New("Transfer key and StreamID cannot be both empty") + if transfer.StreamID.IsZero() { + return Error.New("StreamID cannot be zero") } if transfer.OriginalRootPieceID.IsZero() { return Error.New("could not get original root piece ID from transfer item") diff --git a/satellite/satellitedb/gracefulexit.go b/satellite/satellitedb/gracefulexit.go index 61fcb8924..880209451 100644 --- a/satellite/satellitedb/gracefulexit.go +++ b/satellite/satellitedb/gracefulexit.go @@ -67,43 +67,31 @@ func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID) } progress := &gracefulexit.Progress{ - NodeID: nID, - BytesTransferred: dbxProgress.BytesTransferred, - PiecesTransferred: dbxProgress.PiecesTransferred, - PiecesFailed: dbxProgress.PiecesFailed, - UpdatedAt: dbxProgress.UpdatedAt, - UsesSegmentTransferQueue: dbxProgress.UsesSegmentTransferQueue, + NodeID: nID, + BytesTransferred: dbxProgress.BytesTransferred, + PiecesTransferred: dbxProgress.PiecesTransferred, + PiecesFailed: dbxProgress.PiecesFailed, + UpdatedAt: dbxProgress.UpdatedAt, } return progress, Error.Wrap(err) } // Enqueue batch inserts graceful exit transfer queue entries if it does not exist. -func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int, usesSegmentTransferQueue bool) (err error) { +func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) { defer mon.Task()(&ctx)(&err) - // TODO: to be removed when graceful_exit_transfer_queue is dropped - if !usesSegmentTransferQueue { - sort.Slice(items, func(i, k int) bool { - compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes()) + sort.Slice(items, func(i, k int) bool { + compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes()) + if compare == 0 { + compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:]) if compare == 0 { - return bytes.Compare(items[i].Key, items[k].Key) < 0 + return items[i].Position.Encode() < items[k].Position.Encode() } return compare < 0 - }) - } else { - sort.Slice(items, func(i, k int) bool { - compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes()) - if compare == 0 { - compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:]) - if compare == 0 { - return items[i].Position.Encode() < items[k].Position.Encode() - } - return compare < 0 - } - return compare < 0 - }) - } + } + return compare < 0 + }) for i := 0; i < len(items); i += batchSize { lowerBound := i @@ -114,7 +102,6 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran } var nodeIDs []storj.NodeID - var keys [][]byte var streamIds [][]byte var positions []int64 var pieceNums []int32 @@ -124,33 +111,13 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran for _, item := range items[lowerBound:upperBound] { item := item nodeIDs = append(nodeIDs, item.NodeID) - if !usesSegmentTransferQueue { - keys = append(keys, item.Key) - } else { - streamIds = append(streamIds, item.StreamID[:]) - positions = append(positions, int64(item.Position.Encode())) - } + streamIds = append(streamIds, item.StreamID[:]) + positions = append(positions, int64(item.Position.Encode())) pieceNums = append(pieceNums, item.PieceNum) rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes()) durabilities = append(durabilities, item.DurabilityRatio) } - // TODO: to be removed when graceful_exit_transfer_queue is dropped - if !usesSegmentTransferQueue { - _, err = db.db.ExecContext(ctx, db.db.Rebind(` - INSERT INTO graceful_exit_transfer_queue ( - node_id, path, piece_num, - root_piece_id, durability_ratio, queued_at - ) SELECT - unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]), - unnest($4::bytea[]), unnest($5::float8[]), $6 - ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums), - pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC()) - if err != nil { - return Error.Wrap(err) - } - continue - } _, err = db.db.ExecContext(ctx, db.db.Rebind(` INSERT INTO graceful_exit_segment_transfer_queue ( node_id, stream_id, position, piece_num, @@ -172,35 +139,9 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran } // UpdateTransferQueueItem creates a graceful exit transfer queue entry. -func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem, usesSegmentTransferQueue bool) (err error) { +func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem) (err error) { defer mon.Task()(&ctx)(&err) - // TODO: to be removed when graceful_exit_transfer_queue is dropped. - if !usesSegmentTransferQueue { - update := dbx.GracefulExitTransferQueue_Update_Fields{ - DurabilityRatio: dbx.GracefulExitTransferQueue_DurabilityRatio(item.DurabilityRatio), - LastFailedCode: dbx.GracefulExitTransferQueue_LastFailedCode_Raw(item.LastFailedCode), - FailedCount: dbx.GracefulExitTransferQueue_FailedCount_Raw(item.FailedCount), - } - - if item.RequestedAt != nil { - update.RequestedAt = dbx.GracefulExitTransferQueue_RequestedAt_Raw(item.RequestedAt) - } - if item.LastFailedAt != nil { - update.LastFailedAt = dbx.GracefulExitTransferQueue_LastFailedAt_Raw(item.LastFailedAt) - } - if item.FinishedAt != nil { - update.FinishedAt = dbx.GracefulExitTransferQueue_FinishedAt_Raw(item.FinishedAt) - } - - return db.db.UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx, - dbx.GracefulExitTransferQueue_NodeId(item.NodeID.Bytes()), - dbx.GracefulExitTransferQueue_Path(item.Key), - dbx.GracefulExitTransferQueue_PieceNum(int(item.PieceNum)), - update, - ) - } - update := dbx.GracefulExitSegmentTransfer_Update_Fields{ DurabilityRatio: dbx.GracefulExitSegmentTransfer_DurabilityRatio(item.DurabilityRatio), LastFailedCode: dbx.GracefulExitSegmentTransfer_LastFailedCode_Raw(item.LastFailedCode), @@ -227,16 +168,9 @@ func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item grac } // DeleteTransferQueueItem deletes a graceful exit transfer queue entry. -func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) { +func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) { defer mon.Task()(&ctx)(&err) - // TODO: to be removed when graceful_exit_transfer_queue is dropped. - if key != nil { - _, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), dbx.GracefulExitTransferQueue_Path(key), - dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum))) - return Error.Wrap(err) - } - _, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()), dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]), @@ -246,26 +180,17 @@ func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID st } // DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID. -func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (err error) { +func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) { defer mon.Task()(&ctx)(&err) - // TODO: to be removed when graceful_exit_transfer_queue is dropped - if !usesSegmentTransferQueue { - _, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes())) - return Error.Wrap(err) - } + _, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes())) return Error.Wrap(err) } // DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries by nodeID. -func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (err error) { +func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) { defer mon.Task()(&ctx)(&err) - // TODO: to be removed when graceful_exit_transfer_queue is dropped. - if !usesSegmentTransferQueue { - _, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes())) - return Error.Wrap(err) - } _, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes())) return Error.Wrap(err) @@ -297,26 +222,8 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( if err != nil { return 0, Error.Wrap(err) } - //TODO: remove when graceful_exit_transfer_queue is dropped. - statement = ` - DELETE FROM graceful_exit_transfer_queue - WHERE node_id IN ( - SELECT node_id FROM graceful_exit_transfer_queue INNER JOIN nodes - ON graceful_exit_transfer_queue.node_id = nodes.id - WHERE nodes.exit_finished_at IS NOT NULL - AND nodes.exit_finished_at < $1 - )` - res, err = db.db.ExecContext(ctx, statement, before) - if err != nil { - return 0, Error.Wrap(err) - } - countOldQueue, err := res.RowsAffected() - if err != nil { - return 0, Error.Wrap(err) - } - - return count + countOldQueue, nil + return count, nil case dbutil.Cockroach: nodesQuery := ` @@ -332,12 +239,7 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( WHERE node_id = $1 LIMIT $2 ` - //TODO: remove when graceful_exit_transfer_queue is dropped. - deleteStmtOldQueue := ` - DELETE FROM graceful_exit_transfer_queue - WHERE node_id = $1 - LIMIT $2 - ` + var ( deleteCount int64 offset int @@ -383,21 +285,6 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( break } } - //TODO: remove when graceful_exit_transfer_queue is dropped. - for { - res, err := db.db.ExecContext(ctx, deleteStmtOldQueue, id.Bytes(), batchSize) - if err != nil { - return deleteCount, Error.Wrap(err) - } - count, err := res.RowsAffected() - if err != nil { - return deleteCount, Error.Wrap(err) - } - deleteCount += count - if count < int64(batchSize) { - break - } - } } return deleteCount, nil } @@ -488,26 +375,9 @@ func (db *gracefulexitDB) DeleteBatchExitProgress(ctx context.Context, nodeIDs [ } // GetTransferQueueItem gets a graceful exit transfer queue entry. -func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) { +func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) { defer mon.Task()(&ctx)(&err) - // TODO: to be removed when graceful_exit_transfer_queue is dropped - if key != nil { - dbxTransferQueue, err := db.db.Get_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx, - dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), - dbx.GracefulExitTransferQueue_Path(key), - dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum))) - if err != nil { - return nil, Error.Wrap(err) - } - transferQueueItem, err := dbxToTransferQueueItem(dbxTransferQueue) - if err != nil { - return nil, Error.Wrap(err) - } - - return transferQueueItem, Error.Wrap(err) - } - dbxTransferQueue, err := db.db.Get_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()), dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]), @@ -527,25 +397,10 @@ func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj } // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. -func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) { +func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { defer mon.Task()(&ctx)(&err) - var sql string - if !usesSegmentTransferQueue { - // TODO: to be removed when graceful_exit_transfer_queue is dropped - sql = ` - SELECT - node_id, path, piece_num, - root_piece_id, durability_ratio, - queued_at, requested_at, last_failed_at, - last_failed_code, failed_count, finished_at, - order_limit_send_count - FROM graceful_exit_transfer_queue - WHERE node_id = ? - AND finished_at is NULL - ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` - } else { - sql = ` + sql := ` SELECT node_id, stream_id, position, piece_num, root_piece_id, durability_ratio, @@ -556,14 +411,13 @@ func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID WHERE node_id = ? AND finished_at is NULL ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` - } rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset) if err != nil { return nil, Error.Wrap(err) } defer func() { err = errs.Combine(err, rows.Close()) }() - transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue) + transferQueueItemRows, err := scanRows(rows) if err != nil { return nil, Error.Wrap(err) } @@ -572,25 +426,10 @@ func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID } // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending. -func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) { +func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { defer mon.Task()(&ctx)(&err) - var sql string - if !usesSegmentTransferQueue { - // TODO: to be removed when graceful_exit_transfer_queue is dropped - sql = ` - SELECT - node_id, path, piece_num, - root_piece_id, durability_ratio, queued_at, - requested_at, last_failed_at, last_failed_code, - failed_count, finished_at, order_limit_send_count - FROM graceful_exit_transfer_queue - WHERE node_id = ? - AND finished_at is NULL - AND last_failed_at is NULL - ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` - } else { - sql = ` + sql := ` SELECT node_id, stream_id, position, piece_num, root_piece_id, durability_ratio, @@ -602,14 +441,13 @@ func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID sto AND finished_at is NULL AND last_failed_at is NULL ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` - } rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset) if err != nil { return nil, Error.Wrap(err) } defer func() { err = errs.Combine(err, rows.Close()) }() - transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue) + transferQueueItemRows, err := scanRows(rows) if err != nil { return nil, Error.Wrap(err) } @@ -618,26 +456,10 @@ func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID sto } // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. -func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) { +func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { defer mon.Task()(&ctx)(&err) - var sql string - if !usesSegmentTransferQueue { - // TODO: to be removed when graceful_exit_transfer_queue is dropped - sql = ` - SELECT - node_id, path, piece_num, - root_piece_id, durability_ratio, queued_at, - requested_at, last_failed_at, last_failed_code, - failed_count, finished_at, order_limit_send_count - FROM graceful_exit_transfer_queue - WHERE node_id = ? - AND finished_at is NULL - AND last_failed_at is not NULL - AND failed_count < ? - ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` - } else { - sql = ` + sql := ` SELECT node_id, stream_id, position, piece_num, root_piece_id, durability_ratio, @@ -650,14 +472,13 @@ func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj. AND last_failed_at is not NULL AND failed_count < ? ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` - } rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), maxFailures, limit, offset) if err != nil { return nil, Error.Wrap(err) } defer func() { err = errs.Combine(err, rows.Close()) }() - transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue) + transferQueueItemRows, err := scanRows(rows) if err != nil { return nil, Error.Wrap(err) } @@ -666,25 +487,15 @@ func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj. } // IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring. -func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) { +func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) { defer mon.Task()(&ctx)(&err) - var sql string - if key != nil { - // TODO: to be removed when graceful_exit_transfer_queue is dropped - sql = `UPDATE graceful_exit_transfer_queue SET order_limit_send_count = graceful_exit_transfer_queue.order_limit_send_count + 1 - WHERE node_id = ? - AND path = ? - AND piece_num = ?` - _, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, key, pieceNum) - } else { - sql = `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1 + sql := `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1 WHERE node_id = ? AND stream_id = ? AND position = ? AND piece_num = ?` - _, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum) - } + _, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum) return Error.Wrap(err) } @@ -725,56 +536,16 @@ func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Cont nodesItemsCount[nodeID] = n } - err = rows.Err() - if err != nil && !errors.Is(err, sql.ErrNoRows) { - return nodesItemsCount, Error.Wrap(err) - } - // TODO: remove when graceful_exit_transfer_queue is dropped - query = `SELECT n.id, count(getq.node_id) - FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq - ON n.id = getq.node_id - ` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + ` - WHERE n.exit_finished_at IS NOT NULL - AND n.exit_finished_at < ? - GROUP BY n.id` - - statement = db.db.Rebind(query) - - rowsOldTransfers, err := db.db.QueryContext(ctx, statement, before) - if err != nil { - return nil, Error.Wrap(err) - } - defer func() { err = errs.Combine(err, Error.Wrap(rowsOldTransfers.Close())) }() - - for rowsOldTransfers.Next() { - var ( - nodeID storj.NodeID - n int64 - ) - err := rowsOldTransfers.Scan(&nodeID, &n) - if err != nil { - return nil, Error.Wrap(err) - } - - nodesItemsCount[nodeID] = n - } - return nodesItemsCount, Error.Wrap(rowsOldTransfers.Err()) + return nodesItemsCount, Error.Wrap(rows.Err()) } -func scanRows(rows tagsql.Rows, usesSegmentTransferQueue bool) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) { +func scanRows(rows tagsql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) { for rows.Next() { transferQueueItem := &gracefulexit.TransferQueueItem{} var pieceIDBytes []byte - if !usesSegmentTransferQueue { - // TODO: to be removed when graceful_exit_transfer_queue is dropped - err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.Key, &transferQueueItem.PieceNum, &pieceIDBytes, - &transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt, - &transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount) - } else { - err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes, - &transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt, - &transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount) - } + err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes, + &transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt, + &transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount) if err != nil { return nil, Error.Wrap(err) @@ -791,45 +562,6 @@ func scanRows(rows tagsql.Rows, usesSegmentTransferQueue bool) (transferQueueIte return transferQueueItemRows, Error.Wrap(rows.Err()) } -func dbxToTransferQueueItem(dbxTransferQueue *dbx.GracefulExitTransferQueue) (item *gracefulexit.TransferQueueItem, err error) { - nID, err := storj.NodeIDFromBytes(dbxTransferQueue.NodeId) - if err != nil { - return nil, Error.Wrap(err) - } - - item = &gracefulexit.TransferQueueItem{ - NodeID: nID, - Key: dbxTransferQueue.Path, - PieceNum: int32(dbxTransferQueue.PieceNum), - DurabilityRatio: dbxTransferQueue.DurabilityRatio, - QueuedAt: dbxTransferQueue.QueuedAt, - OrderLimitSendCount: dbxTransferQueue.OrderLimitSendCount, - } - if dbxTransferQueue.RootPieceId != nil { - item.RootPieceID, err = storj.PieceIDFromBytes(dbxTransferQueue.RootPieceId) - if err != nil { - return nil, err - } - } - if dbxTransferQueue.LastFailedCode != nil { - item.LastFailedCode = dbxTransferQueue.LastFailedCode - } - if dbxTransferQueue.FailedCount != nil { - item.FailedCount = dbxTransferQueue.FailedCount - } - if dbxTransferQueue.RequestedAt != nil && !dbxTransferQueue.RequestedAt.IsZero() { - item.RequestedAt = dbxTransferQueue.RequestedAt - } - if dbxTransferQueue.LastFailedAt != nil && !dbxTransferQueue.LastFailedAt.IsZero() { - item.LastFailedAt = dbxTransferQueue.LastFailedAt - } - if dbxTransferQueue.FinishedAt != nil && !dbxTransferQueue.FinishedAt.IsZero() { - item.FinishedAt = dbxTransferQueue.FinishedAt - } - - return item, nil -} - func dbxSegmentTransferToTransferQueueItem(dbxSegmentTransfer *dbx.GracefulExitSegmentTransfer) (item *gracefulexit.TransferQueueItem, err error) { nID, err := storj.NodeIDFromBytes(dbxSegmentTransfer.NodeId) if err != nil { diff --git a/storagenode/gracefulexit/chore_test.go b/storagenode/gracefulexit/chore_test.go index 057c9d2b1..dd1f2d921 100644 --- a/storagenode/gracefulexit/chore_test.go +++ b/storagenode/gracefulexit/chore_test.go @@ -100,7 +100,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, require.Len(t, exitingNodes, 1) require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID) - queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, true) + queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0) require.NoError(t, err) require.Len(t, queueItems, 1) @@ -110,7 +110,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, exitingNode.GracefulExit.Chore.TestWaitForWorkers() // check that there are no more items to process - queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, true) + queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0) require.NoError(t, err) require.Len(t, queueItems, 0) diff --git a/storagenode/gracefulexit/worker_test.go b/storagenode/gracefulexit/worker_test.go index 0411f41fd..aff7f7a6c 100644 --- a/storagenode/gracefulexit/worker_test.go +++ b/storagenode/gracefulexit/worker_test.go @@ -69,7 +69,7 @@ func TestWorkerSuccess(t *testing.T) { require.Len(t, exitingNodes, 1) require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID) - queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true) + queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) require.NoError(t, err) require.Len(t, queueItems, 1) @@ -84,7 +84,6 @@ func TestWorkerSuccess(t *testing.T) { require.NoError(t, err) require.EqualValues(t, progress.PiecesFailed, 0) require.EqualValues(t, progress.PiecesTransferred, 1) - require.True(t, progress.UsesSegmentTransferQueue) exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID()) require.NoError(t, err) @@ -143,7 +142,7 @@ func TestWorkerTimeout(t *testing.T) { require.Len(t, exitingNodes, 1) require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID) - queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true) + queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) require.NoError(t, err) require.Len(t, queueItems, 1)