satellite/gracefulexit: stop using gracefulexit_transfer_queue

Remove the logic associated to the old transfer queue.
A new transfer queue (gracefulexit_segment_transfer_queue) has been created for migration to segmentLoop.
Transfers from the old queue were not moved to the new queue.
Instead, it was still used for nodes which have initiated graceful exit before migration.
There is no such node left, so we can remove all this logic.
In a next step, we will drop the table.

Change-Id: I3aa9bc29b76065d34b57a73f6e9c9d0297587f54
This commit is contained in:
Fadila Khadar 2021-09-05 23:29:22 +02:00 committed by Fadila
parent e1a4195a35
commit c00ecae75c
14 changed files with 140 additions and 679 deletions

View File

@ -92,7 +92,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
}
// remove all items from the transfer queue
err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID, progress.UsesSegmentTransferQueue)
err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID)
if err != nil {
chore.log.Error("error deleting node from transfer queue", zap.Error(err))
}

View File

@ -84,7 +84,7 @@ func TestChore(t *testing.T) {
satellite.GracefulExit.Chore.Loop.TriggerWait()
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 3)
for _, incomplete := range incompleteTransfers {
@ -97,7 +97,7 @@ func TestChore(t *testing.T) {
if node.ID() == exitingNode.ID() {
continue
}
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0, true)
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 0)
}
@ -116,7 +116,7 @@ func TestChore(t *testing.T) {
err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0)
require.NoError(t, err)
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 3)
@ -129,7 +129,7 @@ func TestChore(t *testing.T) {
require.False(t, exitStatus.ExitSuccess)
require.NotNil(t, exitStatus.ExitFinishedAt)
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 0)
@ -223,7 +223,7 @@ func TestDurabilityRatio(t *testing.T) {
satellite.GracefulExit.Chore.Loop.TriggerWait()
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, true)
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 2)
for _, incomplete := range incompleteTransfers {
@ -263,14 +263,15 @@ func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) {
for j := 0; j < size; j++ {
item := gracefulexit.TransferQueueItem{
NodeID: testrand.NodeID(),
Key: testrand.Bytes(memory.B * 256),
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{},
PieceNum: 0,
DurabilityRatio: 1.0,
}
transferQueueItems = append(transferQueueItems, item)
}
batchSize := 1000
err := db.Enqueue(ctx, transferQueueItems, batchSize, true)
err := db.Enqueue(ctx, transferQueueItems, batchSize)
require.NoError(b, err)
}
}

View File

@ -14,18 +14,16 @@ import (
// Progress represents the persisted graceful exit progress record.
type Progress struct {
NodeID storj.NodeID
BytesTransferred int64
PiecesTransferred int64
PiecesFailed int64
UpdatedAt time.Time
UsesSegmentTransferQueue bool
NodeID storj.NodeID
BytesTransferred int64
PiecesTransferred int64
PiecesFailed int64
UpdatedAt time.Time
}
// TransferQueueItem represents the persisted graceful exit queue record.
type TransferQueueItem struct {
NodeID storj.NodeID
Key metabase.SegmentKey
StreamID uuid.UUID
Position metabase.SegmentPosition
PieceNum int32
@ -50,15 +48,15 @@ type DB interface {
GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error)
// Enqueue batch inserts graceful exit transfer queue entries it does not exist.
Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int, usesSegmentTransferQueue bool) error
Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error
// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem, usesSegmentTransferQueue bool) error
UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) error
DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries.
DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) error
DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
// queue items whose nodes have finished the exit before the indicated time
// returning the total number of deleted items.
@ -69,15 +67,15 @@ type DB interface {
// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error)
// GetTransferQueueItem gets a graceful exit transfer queue entry.
GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) (*TransferQueueItem, error)
GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) (*TransferQueueItem, error)
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error)
GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending.
GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error)
GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error)
GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error)
// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has
// finished the exit before the indicated time but there are at least one item
// left in the transfer queue.

View File

@ -9,11 +9,9 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metabase"
@ -56,246 +54,6 @@ func TestProgress(t *testing.T) {
})
}
// TODO: remove this test when graceful_exit_transfer_queue is dropped.
func TestTransferQueueItem(t *testing.T) {
// test basic graceful exit transfer queue crud
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
geDB := db.GracefulExit()
nodeID1 := testrand.NodeID()
nodeID2 := testrand.NodeID()
key1 := metabase.SegmentKey(testrand.Bytes(memory.B * 32))
key2 := metabase.SegmentKey(testrand.Bytes(memory.B * 32))
// root piece IDs for path 1 and 2
rootPieceID1 := testrand.PieceID()
rootPieceID2 := testrand.PieceID()
items := []gracefulexit.TransferQueueItem{
{
NodeID: nodeID1,
Key: key1,
PieceNum: 1,
RootPieceID: rootPieceID1,
DurabilityRatio: 0.9,
},
{
NodeID: nodeID1,
Key: key2,
PieceNum: 2,
RootPieceID: rootPieceID2,
DurabilityRatio: 1.1,
},
{
NodeID: nodeID2,
Key: key1,
PieceNum: 2,
RootPieceID: rootPieceID1,
DurabilityRatio: 0.9,
},
{
NodeID: nodeID2,
Key: key2,
PieceNum: 1,
RootPieceID: rootPieceID2,
DurabilityRatio: 1.1,
},
}
// test basic create, update, get delete
{
batchSize := 1000
err := geDB.Enqueue(ctx, items, batchSize, false)
require.NoError(t, err)
for _, tqi := range items {
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
require.NoError(t, err)
require.Equal(t, tqi.RootPieceID, item.RootPieceID)
require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio)
now := time.Now()
item.DurabilityRatio = 1.2
item.RequestedAt = &now
err = geDB.UpdateTransferQueueItem(ctx, *item, false)
require.NoError(t, err)
latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
require.NoError(t, err)
require.Equal(t, item.RootPieceID, latestItem.RootPieceID)
require.Equal(t, item.DurabilityRatio, latestItem.DurabilityRatio)
require.WithinDuration(t, now, *latestItem.RequestedAt, time.Second)
}
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, false)
require.NoError(t, err)
require.Len(t, queueItems, 2)
}
// mark the first item finished and test that only 1 item gets returned from the GetIncomplete
{
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key1, uuid.UUID{}, metabase.SegmentPosition{}, 1)
require.NoError(t, err)
now := time.Now()
item.FinishedAt = &now
err = geDB.UpdateTransferQueueItem(ctx, *item, false)
require.NoError(t, err)
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, false)
require.NoError(t, err)
require.Len(t, queueItems, 1)
for _, queueItem := range queueItems {
require.Equal(t, nodeID1, queueItem.NodeID)
require.Equal(t, key2, queueItem.Key)
}
}
// test delete finished queue items. Only key1 should be removed
{
err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1, false)
require.NoError(t, err)
// key1 should no longer exist for nodeID1
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, key1, uuid.UUID{}, metabase.SegmentPosition{}, 1)
require.Error(t, err)
// key2 should still exist for nodeID1
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2)
require.NoError(t, err)
}
// test delete all for a node
{
queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0, false)
require.NoError(t, err)
require.Len(t, queueItems, 2)
err = geDB.DeleteTransferQueueItems(ctx, nodeID2, false)
require.NoError(t, err)
queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0, false)
require.NoError(t, err)
require.Len(t, queueItems, 0)
}
// test increment order limit send count
err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2)
require.NoError(t, err)
// get queue item for key2 since that still exists
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2)
require.NoError(t, err)
require.Equal(t, 1, item.OrderLimitSendCount)
})
}
// TODO: remove this test when graceful_exit_transfer_queue is dropped.
func TestBothTransferQueueItem(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
geDB := db.GracefulExit()
progress1 := gracefulexit.Progress{
NodeID: testrand.NodeID(),
UsesSegmentTransferQueue: false,
}
progress2 := gracefulexit.Progress{
NodeID: testrand.NodeID(),
UsesSegmentTransferQueue: true,
}
progress := []gracefulexit.Progress{progress1, progress2}
key1 := metabase.SegmentKey(testrand.Bytes(memory.B * 32))
key2 := metabase.SegmentKey(testrand.Bytes(memory.B * 32))
// root piece IDs for path 1 and 2
rootPieceID1 := testrand.PieceID()
rootPieceID2 := testrand.PieceID()
// root piece IDs for segments
rootPieceID3 := testrand.PieceID()
rootPieceID4 := testrand.PieceID()
streamID1 := testrand.UUID()
streamID2 := testrand.UUID()
position1 := metabase.SegmentPosition{Part: 1, Index: 2}
position2 := metabase.SegmentPosition{Part: 2, Index: 3}
itemsInTransferQueue := []gracefulexit.TransferQueueItem{
{
NodeID: progress1.NodeID,
Key: key1,
PieceNum: 1,
RootPieceID: rootPieceID1,
DurabilityRatio: 0.9,
},
{
NodeID: progress1.NodeID,
Key: key2,
PieceNum: 2,
RootPieceID: rootPieceID2,
DurabilityRatio: 1.1,
},
}
itemsInSegmentTransferQueue := []gracefulexit.TransferQueueItem{
{
NodeID: progress2.NodeID,
StreamID: streamID1,
Position: position1,
PieceNum: 2,
RootPieceID: rootPieceID3,
DurabilityRatio: 0.9,
},
{
NodeID: progress2.NodeID,
StreamID: streamID2,
Position: position2,
PieceNum: 1,
RootPieceID: rootPieceID4,
DurabilityRatio: 1.1,
},
}
{
batchSize := 1000
err := geDB.Enqueue(ctx, itemsInTransferQueue, batchSize, false)
require.NoError(t, err)
err = geDB.Enqueue(ctx, itemsInSegmentTransferQueue, batchSize, true)
require.NoError(t, err)
for _, tqi := range append(itemsInTransferQueue, itemsInSegmentTransferQueue...) {
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
require.NoError(t, err)
require.Equal(t, tqi.RootPieceID, item.RootPieceID)
require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio)
}
// check that we get nothing if we don't use the right transfer queue
for _, p := range progress {
queueItems, err := geDB.GetIncomplete(ctx, p.NodeID, 10, 0, !p.UsesSegmentTransferQueue)
require.NoError(t, err)
require.Len(t, queueItems, 0)
}
}
// test delete
{
for _, p := range progress {
// check that we have the right number of items before trying to delete
queueItems, err := geDB.GetIncomplete(ctx, p.NodeID, 10, 0, p.UsesSegmentTransferQueue)
require.NoError(t, err)
require.Len(t, queueItems, 2)
err = geDB.DeleteTransferQueueItems(ctx, p.NodeID, p.UsesSegmentTransferQueue)
require.NoError(t, err)
queueItems, err = geDB.GetIncomplete(ctx, p.NodeID, 10, 0, p.UsesSegmentTransferQueue)
require.NoError(t, err)
require.Len(t, queueItems, 0)
}
}
})
}
func TestSegmentTransferQueueItem(t *testing.T) {
// test basic graceful exit transfer queue crud
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
@ -349,11 +107,11 @@ func TestSegmentTransferQueueItem(t *testing.T) {
// test basic create, update, get delete
{
batchSize := 1000
err := geDB.Enqueue(ctx, items, batchSize, true)
err := geDB.Enqueue(ctx, items, batchSize)
require.NoError(t, err)
for _, tqi := range items {
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.StreamID, tqi.Position, tqi.PieceNum)
require.NoError(t, err)
require.Equal(t, tqi.RootPieceID, item.RootPieceID)
require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio)
@ -362,10 +120,10 @@ func TestSegmentTransferQueueItem(t *testing.T) {
item.DurabilityRatio = 1.2
item.RequestedAt = &now
err = geDB.UpdateTransferQueueItem(ctx, *item, true)
err = geDB.UpdateTransferQueueItem(ctx, *item)
require.NoError(t, err)
latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.StreamID, tqi.Position, tqi.PieceNum)
require.NoError(t, err)
require.Equal(t, item.RootPieceID, latestItem.RootPieceID)
@ -373,23 +131,23 @@ func TestSegmentTransferQueueItem(t *testing.T) {
require.WithinDuration(t, now, *latestItem.RequestedAt, time.Second)
}
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, true)
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 2)
}
// mark the first item finished and test that only 1 item gets returned from the GetIncomplete
{
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID1, position1, 1)
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, streamID1, position1, 1)
require.NoError(t, err)
now := time.Now()
item.FinishedAt = &now
err = geDB.UpdateTransferQueueItem(ctx, *item, true)
err = geDB.UpdateTransferQueueItem(ctx, *item)
require.NoError(t, err)
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, true)
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
for _, queueItem := range queueItems {
@ -401,38 +159,38 @@ func TestSegmentTransferQueueItem(t *testing.T) {
// test delete finished queue items. Only key1 should be removed
{
err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1, true)
err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1)
require.NoError(t, err)
// key1 should no longer exist for nodeID1
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID1, position1, 1)
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, streamID1, position1, 1)
require.Error(t, err)
// key2 should still exist for nodeID1
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID2, position2, 2)
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, streamID2, position2, 2)
require.NoError(t, err)
}
// test delete all for a node
{
queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0, true)
queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 2)
err = geDB.DeleteTransferQueueItems(ctx, nodeID2, true)
err = geDB.DeleteTransferQueueItems(ctx, nodeID2)
require.NoError(t, err)
queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0, true)
queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 0)
}
// test increment order limit send count
err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, nil, streamID2, position2, 2)
err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, streamID2, position2, 2)
require.NoError(t, err)
// get queue item for key2 since that still exists
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID2, position2, 2)
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, streamID2, position2, 2)
require.NoError(t, err)
require.Equal(t, 1, item.OrderLimitSendCount)

View File

@ -5,7 +5,6 @@ package gracefulexit
import (
"context"
"database/sql"
"io"
"sync"
"time"
@ -134,17 +133,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
endpoint.connections.delete(nodeID)
}()
//TODO: remove this call when we remove graceful_exit_transfer_queue is removed
usesSegmentTransferQueue := true
progress, err := endpoint.db.GetProgress(ctx, nodeID)
if err != nil && !errs.Is(err, sql.ErrNoRows) {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if progress != nil {
usesSegmentTransferQueue = progress.UsesSegmentTransferQueue
}
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID, usesSegmentTransferQueue)
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -190,14 +179,14 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
loopErr := incompleteLoop.Run(ctx, func(ctx context.Context) error {
if pending.Length() == 0 {
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0, usesSegmentTransferQueue)
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0)
if err != nil {
cancel()
return pending.DoneSending(err)
}
if len(incomplete) == 0 {
incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0, usesSegmentTransferQueue)
incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0)
if err != nil {
cancel()
return pending.DoneSending(err)
@ -233,7 +222,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
// if there is no more work to receive send complete
if finished {
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID, usesSegmentTransferQueue)
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -247,7 +236,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason, usesSegmentTransferQueue)
err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -310,7 +299,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
ExitSuccess: false,
}
err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED, usesSegmentTransferQueue)
err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -339,7 +328,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
if err != nil {
return Error.Wrap(err)
}
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
if err != nil {
return Error.Wrap(err)
}
@ -347,10 +336,10 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
return nil
}
segment, err := endpoint.getValidSegment(ctx, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.RootPieceID)
segment, err := endpoint.getValidSegment(ctx, incomplete.StreamID, incomplete.Position, incomplete.RootPieceID)
if err != nil {
endpoint.log.Warn("invalid segment", zap.Error(err))
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
if err != nil {
return Error.Wrap(err)
}
@ -360,7 +349,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
nodePiece, err := endpoint.getNodePiece(ctx, segment, incomplete)
if err != nil {
deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
if deleteErr != nil {
return Error.Wrap(deleteErr)
}
@ -374,7 +363,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
return Error.Wrap(err)
}
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
if err != nil {
return Error.Wrap(err)
}
@ -403,12 +392,11 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
}
if len(newNodes) == 0 {
return Error.New("could not find a node to receive piece transfer: node ID %v, key %v, piece num %v", nodeID, incomplete.Key, incomplete.PieceNum)
return Error.New("could not find a node to receive piece transfer: node ID %v, stream_id %v, piece num %v", nodeID, incomplete.StreamID, incomplete.PieceNum)
}
newNode := newNodes[0]
endpoint.log.Debug("found new node for piece transfer", zap.Stringer("original node ID", nodeID), zap.Stringer("replacement node ID", newNode.ID),
zap.ByteString("key", incomplete.Key),
zap.ByteString("streamID", incomplete.StreamID[:]), zap.Uint32("Part", incomplete.Position.Part), zap.Uint32("Index", incomplete.Position.Index),
zap.Int32("piece num", incomplete.PieceNum))
@ -433,14 +421,13 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
return Error.Wrap(err)
}
err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.Key, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.StreamID, incomplete.Position, incomplete.PieceNum)
if err != nil {
return Error.Wrap(err)
}
// update pending queue with the transfer item
err = pending.Put(pieceID, &PendingTransfer{
Key: incomplete.Key,
StreamID: incomplete.StreamID,
Position: incomplete.Position,
PieceSize: pieceSize,
@ -479,12 +466,12 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat
if err != nil {
return Error.Wrap(err)
}
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
if err != nil {
return Error.Wrap(err)
}
err = endpoint.updateSegment(ctx, exitingNodeID, receivingNodeID, transfer.Key, transfer.StreamID, transfer.Position, transfer.PieceNum, transferQueueItem.RootPieceID)
err = endpoint.updateSegment(ctx, exitingNodeID, receivingNodeID, transfer.StreamID, transfer.Position, transfer.PieceNum, transferQueueItem.RootPieceID)
if err != nil {
// remove the piece from the pending queue so it gets retried
deleteErr := pending.Delete(originalPieceID)
@ -502,7 +489,7 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat
return Error.Wrap(err)
}
err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
if err != nil {
return Error.Wrap(err)
}
@ -548,7 +535,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
return nil
}
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
if err != nil {
return Error.Wrap(err)
}
@ -564,11 +551,11 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
// Remove the queue item and remove the node from the pointer.
// If the pointer is not piece hash verified, do not count this as a failure.
if pb.TransferFailed_Error(errorCode) == pb.TransferFailed_NOT_FOUND {
endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("key", transfer.Key),
endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID),
zap.ByteString("streamID", transfer.StreamID[:]), zap.Uint32("Part", transfer.Position.Part), zap.Uint32("Index", transfer.Position.Index),
zap.Uint16("piece num", transfer.PieceNum))
segment, err := endpoint.getValidSegment(ctx, transfer.Key, transfer.StreamID, transfer.Position, storj.PieceID{})
segment, err := endpoint.getValidSegment(ctx, transfer.StreamID, transfer.Position, storj.PieceID{})
if err != nil {
return Error.Wrap(err)
}
@ -581,7 +568,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
}
}
if nodePiece == (metabase.Piece{}) {
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
if err != nil {
return Error.Wrap(err)
}
@ -598,7 +585,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
return Error.Wrap(err)
}
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.StreamID, transfer.Position, int32(transfer.PieceNum))
if err != nil {
return Error.Wrap(err)
}
@ -608,7 +595,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
transferQueueItem.LastFailedAt = &now
transferQueueItem.FailedCount = &failedCount
transferQueueItem.LastFailedCode = &errorCode
err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem, transfer.Key == nil)
err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem)
if err != nil {
return Error.Wrap(err)
}
@ -624,7 +611,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
return pending.Delete(pieceID)
}
func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (isDisqualified bool, err error) {
func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) {
// check if node is disqualified
nodeInfo, err := endpoint.overlay.Get(ctx, nodeID)
if err != nil {
@ -645,7 +632,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto
}
// remove remaining items from the queue
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID, usesSegmentTransferQueue)
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID)
if err != nil {
return true, Error.Wrap(err)
}
@ -656,7 +643,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto
return false, nil
}
func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason, usesSegmentTransferQueue bool) error {
func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason) error {
finishedMsg, err := endpoint.getFinishedMessage(ctx, exitStatusRequest.NodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, failedReason)
if err != nil {
return Error.Wrap(err)
@ -673,7 +660,7 @@ func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSate
}
// remove remaining items from the queue after notifying nodes about their exit status
err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID, usesSegmentTransferQueue)
err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID)
if err != nil {
return Error.Wrap(err)
}
@ -720,11 +707,11 @@ func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, nodeID storj.N
return message, nil
}
func (endpoint *Endpoint) updateSegment(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNumber uint16, originalRootPieceID storj.PieceID) (err error) {
func (endpoint *Endpoint) updateSegment(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNumber uint16, originalRootPieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
// remove the node from the segment
segment, err := endpoint.getValidSegment(ctx, key, streamID, position, originalRootPieceID)
segment, err := endpoint.getValidSegment(ctx, streamID, position, originalRootPieceID)
if err != nil {
return Error.Wrap(err)
}
@ -867,7 +854,7 @@ func (endpoint *Endpoint) calculatePieceSize(ctx context.Context, segment metaba
}
if len(segment.Pieces) > redundancy.OptimalThreshold() {
endpoint.log.Debug("segment has more pieces than required. removing node from segment.", zap.Stringer("node ID", nodeID), zap.ByteString("key", incomplete.Key), zap.Int32("piece num", incomplete.PieceNum))
endpoint.log.Debug("segment has more pieces than required. removing node from segment.", zap.Stringer("node ID", nodeID), zap.Int32("piece num", incomplete.PieceNum))
return 0, ErrAboveOptimalThreshold.New("")
}
@ -875,27 +862,7 @@ func (endpoint *Endpoint) calculatePieceSize(ctx context.Context, segment metaba
return eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy), nil
}
func (endpoint *Endpoint) getValidSegment(ctx context.Context, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, originalRootPieceID storj.PieceID) (metabase.Segment, error) {
// TODO: remove when table graceful_exit_transfer_queue is dropped
if key != nil {
location, err := metabase.ParseSegmentKey(key)
if err != nil {
return metabase.Segment{}, Error.Wrap(err)
}
segment, err := endpoint.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{
SegmentLocation: location,
})
if err != nil {
return metabase.Segment{}, Error.Wrap(err)
}
if !originalRootPieceID.IsZero() && originalRootPieceID != segment.RootPieceID {
return metabase.Segment{}, Error.New("segment has changed")
}
return segment, nil
}
func (endpoint *Endpoint) getValidSegment(ctx context.Context, streamID uuid.UUID, position metabase.SegmentPosition, originalRootPieceID storj.PieceID) (metabase.Segment, error) {
segment, err := endpoint.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: streamID,
Position: position,
@ -921,7 +888,7 @@ func (endpoint *Endpoint) getNodePiece(ctx context.Context, segment metabase.Seg
}
if nodePiece == (metabase.Piece{}) {
endpoint.log.Debug("piece no longer held by node", zap.Stringer("node ID", nodeID), zap.ByteString("key", incomplete.Key), zap.Int32("piece num", incomplete.PieceNum))
endpoint.log.Debug("piece no longer held by node", zap.Stringer("node ID", nodeID), zap.Int32("piece num", incomplete.PieceNum))
return metabase.Piece{}, Error.New("piece no longer held by node")
}

View File

@ -304,7 +304,7 @@ func TestRecvTimeout(t *testing.T) {
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
@ -1031,7 +1031,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
satellite.GracefulExit.Chore.Loop.TriggerWait()
// make sure all the pieces are in the transfer queue
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, incomplete, 2)
@ -1071,7 +1071,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
require.FailNow(t, "should not reach this case: %#v", m)
}
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, true)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0)
require.NoError(t, err)
require.Len(t, queueItems, 0)
})
@ -1142,7 +1142,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
satellite.GracefulExit.Chore.Loop.TriggerWait()
// make sure all the pieces are in the transfer queue
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, incomplete, 1)
// TODO: change to this when an object part can be overwritten
@ -1185,7 +1185,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
require.FailNow(t, "should not reach this case: %#v", m)
}
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, true)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0)
require.NoError(t, err)
require.Len(t, queueItems, 0)
})
@ -1321,7 +1321,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
satellite.GracefulExit.Chore.Loop.TriggerWait()
// make sure all the pieces are in the transfer queue
_, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0, true)
_, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0)
require.NoError(t, err)
var messageCount int
@ -1365,7 +1365,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
require.Equal(t, messageCount, maxOrderLimitSendCount)
// make sure not responding piece not in queue
incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, incompletes, 0)
@ -1525,7 +1525,7 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun
satellite.GracefulExit.Chore.Loop.TriggerWait()
// make sure all the pieces are in the transfer queue
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0, true)
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0)
require.NoError(t, err)
// connect to satellite again to start receiving transfers

View File

@ -173,7 +173,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
gracefulExitDB := planet.Satellites[0].DB.GracefulExit()
batchSize := 1000
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true)
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
require.NoError(t, err)
asOfSystemTime := -1 * time.Microsecond
@ -260,7 +260,7 @@ func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(
queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...)
// Add some items to the transfer queue for the exited nodes.
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true)
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
require.NoError(t, err)
disableAsOfSystemTime := time.Second * 0
@ -405,7 +405,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) {
queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...)
// Add some items to the transfer queue for the exited nodes.
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, true)
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
require.NoError(t, err)
disableAsOfSystemTime := time.Second * 0

View File

@ -116,7 +116,7 @@ func (collector *PathCollector) flush(ctx context.Context, limit int) (err error
defer mon.Task()(&ctx)(&err)
if len(collector.buffer) >= limit {
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize, true)
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize)
collector.buffer = collector.buffer[:0]
return errs.Wrap(err)

View File

@ -51,7 +51,6 @@ func (promise *PendingFinishedPromise) finishCalled(err error) {
// PendingTransfer is the representation of work on the pending map.
// It contains information about a transfer request that has been sent to a storagenode by the satellite.
type PendingTransfer struct {
Key metabase.SegmentKey
StreamID uuid.UUID
Position metabase.SegmentPosition
PieceSize int64

View File

@ -25,8 +25,11 @@ func TestPendingBasic(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
streamID := testrand.UUID()
position := metabase.SegmentPosition{Part: 3, Index: 10}
newWork := &gracefulexit.PendingTransfer{
Key: metabase.SegmentKey("testbucket/testfile"),
StreamID: streamID,
Position: position,
PieceSize: 10,
SatelliteMessage: &pb.SatelliteMessage{},
PieceNum: 1,
@ -47,7 +50,8 @@ func TestPendingBasic(t *testing.T) {
// get should work
w, ok := pending.Get(pieceID)
require.True(t, ok)
require.True(t, bytes.Equal(newWork.Key, w.Key))
require.True(t, bytes.Equal(newWork.StreamID[:], w.StreamID[:]))
require.Equal(t, position, w.Position)
invalidPieceID := testrand.PieceID()
_, ok = pending.Get(invalidPieceID)
@ -99,8 +103,11 @@ func TestPendingIsFinishedWorkAdded(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
streamID := testrand.UUID()
position := metabase.SegmentPosition{Part: 3, Index: 10}
newWork := &gracefulexit.PendingTransfer{
Key: metabase.SegmentKey("testbucket/testfile"),
StreamID: streamID,
Position: position,
PieceSize: 10,
SatelliteMessage: &pb.SatelliteMessage{},
PieceNum: 1,

View File

@ -25,8 +25,8 @@ func (endpoint *Endpoint) validatePendingTransfer(ctx context.Context, transfer
if transfer.SatelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil {
return Error.New("Addressed order limit on transfer piece cannot be nil")
}
if transfer.Key == nil && transfer.StreamID.IsZero() {
return Error.New("Transfer key and StreamID cannot be both empty")
if transfer.StreamID.IsZero() {
return Error.New("StreamID cannot be zero")
}
if transfer.OriginalRootPieceID.IsZero() {
return Error.New("could not get original root piece ID from transfer item")

View File

@ -67,43 +67,31 @@ func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID)
}
progress := &gracefulexit.Progress{
NodeID: nID,
BytesTransferred: dbxProgress.BytesTransferred,
PiecesTransferred: dbxProgress.PiecesTransferred,
PiecesFailed: dbxProgress.PiecesFailed,
UpdatedAt: dbxProgress.UpdatedAt,
UsesSegmentTransferQueue: dbxProgress.UsesSegmentTransferQueue,
NodeID: nID,
BytesTransferred: dbxProgress.BytesTransferred,
PiecesTransferred: dbxProgress.PiecesTransferred,
PiecesFailed: dbxProgress.PiecesFailed,
UpdatedAt: dbxProgress.UpdatedAt,
}
return progress, Error.Wrap(err)
}
// Enqueue batch inserts graceful exit transfer queue entries if it does not exist.
func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int, usesSegmentTransferQueue bool) (err error) {
func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: to be removed when graceful_exit_transfer_queue is dropped
if !usesSegmentTransferQueue {
sort.Slice(items, func(i, k int) bool {
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
sort.Slice(items, func(i, k int) bool {
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
if compare == 0 {
compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:])
if compare == 0 {
return bytes.Compare(items[i].Key, items[k].Key) < 0
return items[i].Position.Encode() < items[k].Position.Encode()
}
return compare < 0
})
} else {
sort.Slice(items, func(i, k int) bool {
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
if compare == 0 {
compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:])
if compare == 0 {
return items[i].Position.Encode() < items[k].Position.Encode()
}
return compare < 0
}
return compare < 0
})
}
}
return compare < 0
})
for i := 0; i < len(items); i += batchSize {
lowerBound := i
@ -114,7 +102,6 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran
}
var nodeIDs []storj.NodeID
var keys [][]byte
var streamIds [][]byte
var positions []int64
var pieceNums []int32
@ -124,33 +111,13 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran
for _, item := range items[lowerBound:upperBound] {
item := item
nodeIDs = append(nodeIDs, item.NodeID)
if !usesSegmentTransferQueue {
keys = append(keys, item.Key)
} else {
streamIds = append(streamIds, item.StreamID[:])
positions = append(positions, int64(item.Position.Encode()))
}
streamIds = append(streamIds, item.StreamID[:])
positions = append(positions, int64(item.Position.Encode()))
pieceNums = append(pieceNums, item.PieceNum)
rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes())
durabilities = append(durabilities, item.DurabilityRatio)
}
// TODO: to be removed when graceful_exit_transfer_queue is dropped
if !usesSegmentTransferQueue {
_, err = db.db.ExecContext(ctx, db.db.Rebind(`
INSERT INTO graceful_exit_transfer_queue (
node_id, path, piece_num,
root_piece_id, durability_ratio, queued_at
) SELECT
unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]),
unnest($4::bytea[]), unnest($5::float8[]), $6
ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums),
pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC())
if err != nil {
return Error.Wrap(err)
}
continue
}
_, err = db.db.ExecContext(ctx, db.db.Rebind(`
INSERT INTO graceful_exit_segment_transfer_queue (
node_id, stream_id, position, piece_num,
@ -172,35 +139,9 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran
}
// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem, usesSegmentTransferQueue bool) (err error) {
func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: to be removed when graceful_exit_transfer_queue is dropped.
if !usesSegmentTransferQueue {
update := dbx.GracefulExitTransferQueue_Update_Fields{
DurabilityRatio: dbx.GracefulExitTransferQueue_DurabilityRatio(item.DurabilityRatio),
LastFailedCode: dbx.GracefulExitTransferQueue_LastFailedCode_Raw(item.LastFailedCode),
FailedCount: dbx.GracefulExitTransferQueue_FailedCount_Raw(item.FailedCount),
}
if item.RequestedAt != nil {
update.RequestedAt = dbx.GracefulExitTransferQueue_RequestedAt_Raw(item.RequestedAt)
}
if item.LastFailedAt != nil {
update.LastFailedAt = dbx.GracefulExitTransferQueue_LastFailedAt_Raw(item.LastFailedAt)
}
if item.FinishedAt != nil {
update.FinishedAt = dbx.GracefulExitTransferQueue_FinishedAt_Raw(item.FinishedAt)
}
return db.db.UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx,
dbx.GracefulExitTransferQueue_NodeId(item.NodeID.Bytes()),
dbx.GracefulExitTransferQueue_Path(item.Key),
dbx.GracefulExitTransferQueue_PieceNum(int(item.PieceNum)),
update,
)
}
update := dbx.GracefulExitSegmentTransfer_Update_Fields{
DurabilityRatio: dbx.GracefulExitSegmentTransfer_DurabilityRatio(item.DurabilityRatio),
LastFailedCode: dbx.GracefulExitSegmentTransfer_LastFailedCode_Raw(item.LastFailedCode),
@ -227,16 +168,9 @@ func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item grac
}
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: to be removed when graceful_exit_transfer_queue is dropped.
if key != nil {
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), dbx.GracefulExitTransferQueue_Path(key),
dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum)))
return Error.Wrap(err)
}
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
@ -246,26 +180,17 @@ func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID st
}
// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (err error) {
func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: to be removed when graceful_exit_transfer_queue is dropped
if !usesSegmentTransferQueue {
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
return Error.Wrap(err)
}
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
return Error.Wrap(err)
}
// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries by nodeID.
func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (err error) {
func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO: to be removed when graceful_exit_transfer_queue is dropped.
if !usesSegmentTransferQueue {
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
return Error.Wrap(err)
}
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
return Error.Wrap(err)
@ -297,26 +222,8 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
if err != nil {
return 0, Error.Wrap(err)
}
//TODO: remove when graceful_exit_transfer_queue is dropped.
statement = `
DELETE FROM graceful_exit_transfer_queue
WHERE node_id IN (
SELECT node_id FROM graceful_exit_transfer_queue INNER JOIN nodes
ON graceful_exit_transfer_queue.node_id = nodes.id
WHERE nodes.exit_finished_at IS NOT NULL
AND nodes.exit_finished_at < $1
)`
res, err = db.db.ExecContext(ctx, statement, before)
if err != nil {
return 0, Error.Wrap(err)
}
countOldQueue, err := res.RowsAffected()
if err != nil {
return 0, Error.Wrap(err)
}
return count + countOldQueue, nil
return count, nil
case dbutil.Cockroach:
nodesQuery := `
@ -332,12 +239,7 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
WHERE node_id = $1
LIMIT $2
`
//TODO: remove when graceful_exit_transfer_queue is dropped.
deleteStmtOldQueue := `
DELETE FROM graceful_exit_transfer_queue
WHERE node_id = $1
LIMIT $2
`
var (
deleteCount int64
offset int
@ -383,21 +285,6 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
break
}
}
//TODO: remove when graceful_exit_transfer_queue is dropped.
for {
res, err := db.db.ExecContext(ctx, deleteStmtOldQueue, id.Bytes(), batchSize)
if err != nil {
return deleteCount, Error.Wrap(err)
}
count, err := res.RowsAffected()
if err != nil {
return deleteCount, Error.Wrap(err)
}
deleteCount += count
if count < int64(batchSize) {
break
}
}
}
return deleteCount, nil
}
@ -488,26 +375,9 @@ func (db *gracefulexitDB) DeleteBatchExitProgress(ctx context.Context, nodeIDs [
}
// GetTransferQueueItem gets a graceful exit transfer queue entry.
func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) {
func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: to be removed when graceful_exit_transfer_queue is dropped
if key != nil {
dbxTransferQueue, err := db.db.Get_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx,
dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()),
dbx.GracefulExitTransferQueue_Path(key),
dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum)))
if err != nil {
return nil, Error.Wrap(err)
}
transferQueueItem, err := dbxToTransferQueueItem(dbxTransferQueue)
if err != nil {
return nil, Error.Wrap(err)
}
return transferQueueItem, Error.Wrap(err)
}
dbxTransferQueue, err := db.db.Get_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
@ -527,25 +397,10 @@ func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj
}
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) {
func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
defer mon.Task()(&ctx)(&err)
var sql string
if !usesSegmentTransferQueue {
// TODO: to be removed when graceful_exit_transfer_queue is dropped
sql = `
SELECT
node_id, path, piece_num,
root_piece_id, durability_ratio,
queued_at, requested_at, last_failed_at,
last_failed_code, failed_count, finished_at,
order_limit_send_count
FROM graceful_exit_transfer_queue
WHERE node_id = ?
AND finished_at is NULL
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
} else {
sql = `
sql := `
SELECT
node_id, stream_id, position,
piece_num, root_piece_id, durability_ratio,
@ -556,14 +411,13 @@ func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID
WHERE node_id = ?
AND finished_at is NULL
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
}
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue)
transferQueueItemRows, err := scanRows(rows)
if err != nil {
return nil, Error.Wrap(err)
}
@ -572,25 +426,10 @@ func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID
}
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending.
func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) {
func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
defer mon.Task()(&ctx)(&err)
var sql string
if !usesSegmentTransferQueue {
// TODO: to be removed when graceful_exit_transfer_queue is dropped
sql = `
SELECT
node_id, path, piece_num,
root_piece_id, durability_ratio, queued_at,
requested_at, last_failed_at, last_failed_code,
failed_count, finished_at, order_limit_send_count
FROM graceful_exit_transfer_queue
WHERE node_id = ?
AND finished_at is NULL
AND last_failed_at is NULL
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
} else {
sql = `
sql := `
SELECT
node_id, stream_id, position,
piece_num, root_piece_id, durability_ratio,
@ -602,14 +441,13 @@ func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID sto
AND finished_at is NULL
AND last_failed_at is NULL
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
}
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue)
transferQueueItemRows, err := scanRows(rows)
if err != nil {
return nil, Error.Wrap(err)
}
@ -618,26 +456,10 @@ func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID sto
}
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) {
func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
defer mon.Task()(&ctx)(&err)
var sql string
if !usesSegmentTransferQueue {
// TODO: to be removed when graceful_exit_transfer_queue is dropped
sql = `
SELECT
node_id, path, piece_num,
root_piece_id, durability_ratio, queued_at,
requested_at, last_failed_at, last_failed_code,
failed_count, finished_at, order_limit_send_count
FROM graceful_exit_transfer_queue
WHERE node_id = ?
AND finished_at is NULL
AND last_failed_at is not NULL
AND failed_count < ?
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
} else {
sql = `
sql := `
SELECT
node_id, stream_id, position,
piece_num, root_piece_id, durability_ratio,
@ -650,14 +472,13 @@ func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.
AND last_failed_at is not NULL
AND failed_count < ?
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
}
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), maxFailures, limit, offset)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue)
transferQueueItemRows, err := scanRows(rows)
if err != nil {
return nil, Error.Wrap(err)
}
@ -666,25 +487,15 @@ func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.
}
// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
defer mon.Task()(&ctx)(&err)
var sql string
if key != nil {
// TODO: to be removed when graceful_exit_transfer_queue is dropped
sql = `UPDATE graceful_exit_transfer_queue SET order_limit_send_count = graceful_exit_transfer_queue.order_limit_send_count + 1
WHERE node_id = ?
AND path = ?
AND piece_num = ?`
_, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, key, pieceNum)
} else {
sql = `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1
sql := `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1
WHERE node_id = ?
AND stream_id = ?
AND position = ?
AND piece_num = ?`
_, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum)
}
_, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum)
return Error.Wrap(err)
}
@ -725,56 +536,16 @@ func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Cont
nodesItemsCount[nodeID] = n
}
err = rows.Err()
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nodesItemsCount, Error.Wrap(err)
}
// TODO: remove when graceful_exit_transfer_queue is dropped
query = `SELECT n.id, count(getq.node_id)
FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq
ON n.id = getq.node_id
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
WHERE n.exit_finished_at IS NOT NULL
AND n.exit_finished_at < ?
GROUP BY n.id`
statement = db.db.Rebind(query)
rowsOldTransfers, err := db.db.QueryContext(ctx, statement, before)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, Error.Wrap(rowsOldTransfers.Close())) }()
for rowsOldTransfers.Next() {
var (
nodeID storj.NodeID
n int64
)
err := rowsOldTransfers.Scan(&nodeID, &n)
if err != nil {
return nil, Error.Wrap(err)
}
nodesItemsCount[nodeID] = n
}
return nodesItemsCount, Error.Wrap(rowsOldTransfers.Err())
return nodesItemsCount, Error.Wrap(rows.Err())
}
func scanRows(rows tagsql.Rows, usesSegmentTransferQueue bool) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) {
func scanRows(rows tagsql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) {
for rows.Next() {
transferQueueItem := &gracefulexit.TransferQueueItem{}
var pieceIDBytes []byte
if !usesSegmentTransferQueue {
// TODO: to be removed when graceful_exit_transfer_queue is dropped
err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.Key, &transferQueueItem.PieceNum, &pieceIDBytes,
&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
} else {
err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes,
&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
}
err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes,
&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
if err != nil {
return nil, Error.Wrap(err)
@ -791,45 +562,6 @@ func scanRows(rows tagsql.Rows, usesSegmentTransferQueue bool) (transferQueueIte
return transferQueueItemRows, Error.Wrap(rows.Err())
}
func dbxToTransferQueueItem(dbxTransferQueue *dbx.GracefulExitTransferQueue) (item *gracefulexit.TransferQueueItem, err error) {
nID, err := storj.NodeIDFromBytes(dbxTransferQueue.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
item = &gracefulexit.TransferQueueItem{
NodeID: nID,
Key: dbxTransferQueue.Path,
PieceNum: int32(dbxTransferQueue.PieceNum),
DurabilityRatio: dbxTransferQueue.DurabilityRatio,
QueuedAt: dbxTransferQueue.QueuedAt,
OrderLimitSendCount: dbxTransferQueue.OrderLimitSendCount,
}
if dbxTransferQueue.RootPieceId != nil {
item.RootPieceID, err = storj.PieceIDFromBytes(dbxTransferQueue.RootPieceId)
if err != nil {
return nil, err
}
}
if dbxTransferQueue.LastFailedCode != nil {
item.LastFailedCode = dbxTransferQueue.LastFailedCode
}
if dbxTransferQueue.FailedCount != nil {
item.FailedCount = dbxTransferQueue.FailedCount
}
if dbxTransferQueue.RequestedAt != nil && !dbxTransferQueue.RequestedAt.IsZero() {
item.RequestedAt = dbxTransferQueue.RequestedAt
}
if dbxTransferQueue.LastFailedAt != nil && !dbxTransferQueue.LastFailedAt.IsZero() {
item.LastFailedAt = dbxTransferQueue.LastFailedAt
}
if dbxTransferQueue.FinishedAt != nil && !dbxTransferQueue.FinishedAt.IsZero() {
item.FinishedAt = dbxTransferQueue.FinishedAt
}
return item, nil
}
func dbxSegmentTransferToTransferQueueItem(dbxSegmentTransfer *dbx.GracefulExitSegmentTransfer) (item *gracefulexit.TransferQueueItem, err error) {
nID, err := storj.NodeIDFromBytes(dbxSegmentTransfer.NodeId)
if err != nil {

View File

@ -100,7 +100,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, true)
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
@ -110,7 +110,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
exitingNode.GracefulExit.Chore.TestWaitForWorkers()
// check that there are no more items to process
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, true)
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 0)

View File

@ -69,7 +69,7 @@ func TestWorkerSuccess(t *testing.T) {
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
@ -84,7 +84,6 @@ func TestWorkerSuccess(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, progress.PiecesFailed, 0)
require.EqualValues(t, progress.PiecesTransferred, 1)
require.True(t, progress.UsesSegmentTransferQueue)
exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
@ -143,7 +142,7 @@ func TestWorkerTimeout(t *testing.T) {
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, true)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)