satellite/gracefulexit: allow use of graceful_exit_segment_transfer_queue
For being able to use the segment metainfo loop, graceful exit transfers have to include the segment stream_id/position instead of the path. For this, we created a new table graceful_exit_segment_transfer_queue that will replace the graceful_exit_transfer_queue. The table has been created in a previous migration. This change gives access to this table. Graceful Exit doesn't use the table yet, this will be done in a next change. Change-Id: I6c09cff4cc45f0529813a8898ddb2d14aadb2cb8
This commit is contained in:
parent
73cdefbc41
commit
b0d98b1c1a
@ -92,7 +92,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// remove all items from the transfer queue
|
||||
err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID)
|
||||
err := chore.db.DeleteTransferQueueItems(ctx, node.NodeID, false)
|
||||
if err != nil {
|
||||
chore.log.Error("error deleting node from transfer queue", zap.Error(err))
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ func TestChore(t *testing.T) {
|
||||
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 3)
|
||||
for _, incomplete := range incompleteTransfers {
|
||||
@ -97,7 +97,7 @@ func TestChore(t *testing.T) {
|
||||
if node.ID() == exitingNode.ID() {
|
||||
continue
|
||||
}
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, node.ID(), 20, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 0)
|
||||
}
|
||||
@ -116,7 +116,7 @@ func TestChore(t *testing.T) {
|
||||
err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 3)
|
||||
|
||||
@ -129,7 +129,7 @@ func TestChore(t *testing.T) {
|
||||
require.False(t, exitStatus.ExitSuccess)
|
||||
require.NotNil(t, exitStatus.ExitFinishedAt)
|
||||
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 0)
|
||||
|
||||
@ -223,7 +223,7 @@ func TestDurabilityRatio(t *testing.T) {
|
||||
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompleteTransfers, 2)
|
||||
for _, incomplete := range incompleteTransfers {
|
||||
@ -270,7 +270,7 @@ func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) {
|
||||
transferQueueItems = append(transferQueueItems, item)
|
||||
}
|
||||
batchSize := 1000
|
||||
err := db.Enqueue(ctx, transferQueueItems, batchSize)
|
||||
err := db.Enqueue(ctx, transferQueueItems, batchSize, false)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
@ -8,22 +8,26 @@ import (
|
||||
"time"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
)
|
||||
|
||||
// Progress represents the persisted graceful exit progress record.
|
||||
type Progress struct {
|
||||
NodeID storj.NodeID
|
||||
BytesTransferred int64
|
||||
PiecesTransferred int64
|
||||
PiecesFailed int64
|
||||
UpdatedAt time.Time
|
||||
NodeID storj.NodeID
|
||||
BytesTransferred int64
|
||||
PiecesTransferred int64
|
||||
PiecesFailed int64
|
||||
UpdatedAt time.Time
|
||||
UsesSegmentTransferQueue bool
|
||||
}
|
||||
|
||||
// TransferQueueItem represents the persisted graceful exit queue record.
|
||||
type TransferQueueItem struct {
|
||||
NodeID storj.NodeID
|
||||
Key metabase.SegmentKey
|
||||
StreamID uuid.UUID
|
||||
Position metabase.SegmentPosition
|
||||
PieceNum int32
|
||||
RootPieceID storj.PieceID
|
||||
DurabilityRatio float64
|
||||
@ -46,15 +50,15 @@ type DB interface {
|
||||
GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error)
|
||||
|
||||
// Enqueue batch inserts graceful exit transfer queue entries it does not exist.
|
||||
Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error
|
||||
Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int, usesSegmentTransferQueue bool) error
|
||||
// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
|
||||
UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error
|
||||
UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem, usesSegmentTransferQueue bool) error
|
||||
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
|
||||
DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) error
|
||||
DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
|
||||
// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
|
||||
DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
|
||||
DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) error
|
||||
// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries.
|
||||
DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
|
||||
DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) error
|
||||
// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
|
||||
// queue items whose nodes have finished the exit before the indicated time
|
||||
// returning the total number of deleted items.
|
||||
@ -65,15 +69,15 @@ type DB interface {
|
||||
// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
|
||||
GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error)
|
||||
// GetTransferQueueItem gets a graceful exit transfer queue entry.
|
||||
GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (*TransferQueueItem, error)
|
||||
GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) (*TransferQueueItem, error)
|
||||
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
|
||||
GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
|
||||
GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error)
|
||||
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending.
|
||||
GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
|
||||
GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error)
|
||||
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
|
||||
GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error)
|
||||
GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64, usesSegmentTransferQueue bool) ([]*TransferQueueItem, error)
|
||||
// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
|
||||
IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) error
|
||||
IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
|
||||
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has
|
||||
// finished the exit before the indicated time but there are at least one item
|
||||
// left in the transfer queue.
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
@ -55,6 +56,7 @@ func TestProgress(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: remove this test when graceful_exit_transfer_queue is dropped.
|
||||
func TestTransferQueueItem(t *testing.T) {
|
||||
// test basic graceful exit transfer queue crud
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
@ -101,11 +103,11 @@ func TestTransferQueueItem(t *testing.T) {
|
||||
// test basic create, update, get delete
|
||||
{
|
||||
batchSize := 1000
|
||||
err := geDB.Enqueue(ctx, items, batchSize)
|
||||
err := geDB.Enqueue(ctx, items, batchSize, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tqi := range items {
|
||||
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.PieceNum)
|
||||
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tqi.RootPieceID, item.RootPieceID)
|
||||
require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio)
|
||||
@ -114,10 +116,10 @@ func TestTransferQueueItem(t *testing.T) {
|
||||
item.DurabilityRatio = 1.2
|
||||
item.RequestedAt = &now
|
||||
|
||||
err = geDB.UpdateTransferQueueItem(ctx, *item)
|
||||
err = geDB.UpdateTransferQueueItem(ctx, *item, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.PieceNum)
|
||||
latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, item.RootPieceID, latestItem.RootPieceID)
|
||||
@ -125,23 +127,23 @@ func TestTransferQueueItem(t *testing.T) {
|
||||
require.WithinDuration(t, now, *latestItem.RequestedAt, time.Second)
|
||||
}
|
||||
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0)
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 2)
|
||||
}
|
||||
|
||||
// mark the first item finished and test that only 1 item gets returned from the GetIncomplete
|
||||
{
|
||||
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key1, 1)
|
||||
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key1, uuid.UUID{}, metabase.SegmentPosition{}, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
now := time.Now()
|
||||
item.FinishedAt = &now
|
||||
|
||||
err = geDB.UpdateTransferQueueItem(ctx, *item)
|
||||
err = geDB.UpdateTransferQueueItem(ctx, *item, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0)
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
for _, queueItem := range queueItems {
|
||||
@ -152,38 +154,285 @@ func TestTransferQueueItem(t *testing.T) {
|
||||
|
||||
// test delete finished queue items. Only key1 should be removed
|
||||
{
|
||||
err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1)
|
||||
err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// key1 should no longer exist for nodeID1
|
||||
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, key1, 1)
|
||||
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, key1, uuid.UUID{}, metabase.SegmentPosition{}, 1)
|
||||
require.Error(t, err)
|
||||
|
||||
// key2 should still exist for nodeID1
|
||||
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, key2, 2)
|
||||
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// test delete all for a node
|
||||
{
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0)
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 2)
|
||||
|
||||
err = geDB.DeleteTransferQueueItems(ctx, nodeID2)
|
||||
err = geDB.DeleteTransferQueueItems(ctx, nodeID2, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0)
|
||||
queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
}
|
||||
|
||||
// test increment order limit send count
|
||||
err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, key2, 2)
|
||||
err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// get queue item for key2 since that still exists
|
||||
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key2, 2)
|
||||
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, key2, uuid.UUID{}, metabase.SegmentPosition{}, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, item.OrderLimitSendCount)
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: remove this test when graceful_exit_transfer_queue is dropped.
|
||||
func TestBothTransferQueueItem(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
geDB := db.GracefulExit()
|
||||
|
||||
progress1 := gracefulexit.Progress{
|
||||
NodeID: testrand.NodeID(),
|
||||
UsesSegmentTransferQueue: false,
|
||||
}
|
||||
progress2 := gracefulexit.Progress{
|
||||
NodeID: testrand.NodeID(),
|
||||
UsesSegmentTransferQueue: true,
|
||||
}
|
||||
progress := []gracefulexit.Progress{progress1, progress2}
|
||||
key1 := metabase.SegmentKey(testrand.Bytes(memory.B * 32))
|
||||
key2 := metabase.SegmentKey(testrand.Bytes(memory.B * 32))
|
||||
// root piece IDs for path 1 and 2
|
||||
rootPieceID1 := testrand.PieceID()
|
||||
rootPieceID2 := testrand.PieceID()
|
||||
// root piece IDs for segments
|
||||
rootPieceID3 := testrand.PieceID()
|
||||
rootPieceID4 := testrand.PieceID()
|
||||
streamID1 := testrand.UUID()
|
||||
streamID2 := testrand.UUID()
|
||||
position1 := metabase.SegmentPosition{Part: 1, Index: 2}
|
||||
position2 := metabase.SegmentPosition{Part: 2, Index: 3}
|
||||
|
||||
itemsInTransferQueue := []gracefulexit.TransferQueueItem{
|
||||
{
|
||||
NodeID: progress1.NodeID,
|
||||
Key: key1,
|
||||
PieceNum: 1,
|
||||
RootPieceID: rootPieceID1,
|
||||
DurabilityRatio: 0.9,
|
||||
},
|
||||
{
|
||||
NodeID: progress1.NodeID,
|
||||
Key: key2,
|
||||
PieceNum: 2,
|
||||
RootPieceID: rootPieceID2,
|
||||
DurabilityRatio: 1.1,
|
||||
},
|
||||
}
|
||||
itemsInSegmentTransferQueue := []gracefulexit.TransferQueueItem{
|
||||
{
|
||||
NodeID: progress2.NodeID,
|
||||
StreamID: streamID1,
|
||||
Position: position1,
|
||||
PieceNum: 2,
|
||||
RootPieceID: rootPieceID3,
|
||||
DurabilityRatio: 0.9,
|
||||
},
|
||||
{
|
||||
NodeID: progress2.NodeID,
|
||||
StreamID: streamID2,
|
||||
Position: position2,
|
||||
PieceNum: 1,
|
||||
RootPieceID: rootPieceID4,
|
||||
DurabilityRatio: 1.1,
|
||||
},
|
||||
}
|
||||
|
||||
{
|
||||
batchSize := 1000
|
||||
err := geDB.Enqueue(ctx, itemsInTransferQueue, batchSize, false)
|
||||
require.NoError(t, err)
|
||||
err = geDB.Enqueue(ctx, itemsInSegmentTransferQueue, batchSize, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tqi := range append(itemsInTransferQueue, itemsInSegmentTransferQueue...) {
|
||||
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tqi.RootPieceID, item.RootPieceID)
|
||||
require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio)
|
||||
}
|
||||
|
||||
// check that we get nothing if we don't use the right transfer queue
|
||||
for _, p := range progress {
|
||||
queueItems, err := geDB.GetIncomplete(ctx, p.NodeID, 10, 0, !p.UsesSegmentTransferQueue)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
}
|
||||
}
|
||||
|
||||
// test delete
|
||||
{
|
||||
for _, p := range progress {
|
||||
// check that we have the right number of items before trying to delete
|
||||
queueItems, err := geDB.GetIncomplete(ctx, p.NodeID, 10, 0, p.UsesSegmentTransferQueue)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 2)
|
||||
|
||||
err = geDB.DeleteTransferQueueItems(ctx, p.NodeID, p.UsesSegmentTransferQueue)
|
||||
require.NoError(t, err)
|
||||
|
||||
queueItems, err = geDB.GetIncomplete(ctx, p.NodeID, 10, 0, p.UsesSegmentTransferQueue)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func TestSegmentTransferQueueItem(t *testing.T) {
|
||||
// test basic graceful exit transfer queue crud
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
geDB := db.GracefulExit()
|
||||
|
||||
nodeID1 := testrand.NodeID()
|
||||
nodeID2 := testrand.NodeID()
|
||||
streamID1 := testrand.UUID()
|
||||
streamID2 := testrand.UUID()
|
||||
position1 := metabase.SegmentPosition{Part: 1, Index: 2}
|
||||
position2 := metabase.SegmentPosition{Part: 2, Index: 3}
|
||||
|
||||
// root piece IDs for segments 1 and 2
|
||||
rootPieceID1 := testrand.PieceID()
|
||||
rootPieceID2 := testrand.PieceID()
|
||||
items := []gracefulexit.TransferQueueItem{
|
||||
{
|
||||
NodeID: nodeID1,
|
||||
StreamID: streamID1,
|
||||
Position: position1,
|
||||
PieceNum: 1,
|
||||
RootPieceID: rootPieceID1,
|
||||
DurabilityRatio: 0.9,
|
||||
},
|
||||
{
|
||||
NodeID: nodeID1,
|
||||
StreamID: streamID2,
|
||||
Position: position2,
|
||||
PieceNum: 2,
|
||||
RootPieceID: rootPieceID2,
|
||||
DurabilityRatio: 1.1,
|
||||
},
|
||||
{
|
||||
NodeID: nodeID2,
|
||||
StreamID: streamID1,
|
||||
Position: position1,
|
||||
PieceNum: 2,
|
||||
RootPieceID: rootPieceID1,
|
||||
DurabilityRatio: 0.9,
|
||||
},
|
||||
{
|
||||
NodeID: nodeID2,
|
||||
StreamID: streamID2,
|
||||
Position: position2,
|
||||
PieceNum: 1,
|
||||
RootPieceID: rootPieceID2,
|
||||
DurabilityRatio: 1.1,
|
||||
},
|
||||
}
|
||||
|
||||
// test basic create, update, get delete
|
||||
{
|
||||
batchSize := 1000
|
||||
err := geDB.Enqueue(ctx, items, batchSize, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tqi := range items {
|
||||
item, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tqi.RootPieceID, item.RootPieceID)
|
||||
require.Equal(t, tqi.DurabilityRatio, item.DurabilityRatio)
|
||||
|
||||
now := time.Now()
|
||||
item.DurabilityRatio = 1.2
|
||||
item.RequestedAt = &now
|
||||
|
||||
err = geDB.UpdateTransferQueueItem(ctx, *item, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
latestItem, err := geDB.GetTransferQueueItem(ctx, tqi.NodeID, tqi.Key, tqi.StreamID, tqi.Position, tqi.PieceNum)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, item.RootPieceID, latestItem.RootPieceID)
|
||||
require.Equal(t, item.DurabilityRatio, latestItem.DurabilityRatio)
|
||||
require.WithinDuration(t, now, *latestItem.RequestedAt, time.Second)
|
||||
}
|
||||
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 2)
|
||||
}
|
||||
|
||||
// mark the first item finished and test that only 1 item gets returned from the GetIncomplete
|
||||
{
|
||||
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID1, position1, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
now := time.Now()
|
||||
item.FinishedAt = &now
|
||||
|
||||
err = geDB.UpdateTransferQueueItem(ctx, *item, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID1, 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
for _, queueItem := range queueItems {
|
||||
require.Equal(t, nodeID1, queueItem.NodeID)
|
||||
require.Equal(t, streamID2, queueItem.StreamID)
|
||||
require.Equal(t, position2, queueItem.Position)
|
||||
}
|
||||
}
|
||||
|
||||
// test delete finished queue items. Only key1 should be removed
|
||||
{
|
||||
err := geDB.DeleteFinishedTransferQueueItems(ctx, nodeID1, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// key1 should no longer exist for nodeID1
|
||||
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID1, position1, 1)
|
||||
require.Error(t, err)
|
||||
|
||||
// key2 should still exist for nodeID1
|
||||
_, err = geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID2, position2, 2)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// test delete all for a node
|
||||
{
|
||||
queueItems, err := geDB.GetIncomplete(ctx, nodeID2, 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 2)
|
||||
|
||||
err = geDB.DeleteTransferQueueItems(ctx, nodeID2, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
queueItems, err = geDB.GetIncomplete(ctx, nodeID2, 10, 0, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
}
|
||||
|
||||
// test increment order limit send count
|
||||
err := geDB.IncrementOrderLimitSendCount(ctx, nodeID1, nil, streamID2, position2, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// get queue item for key2 since that still exists
|
||||
item, err := geDB.GetTransferQueueItem(ctx, nodeID1, nil, streamID2, position2, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, item.OrderLimitSendCount)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"storj.io/common/signing"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/orders"
|
||||
@ -176,14 +177,14 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
|
||||
|
||||
loopErr := incompleteLoop.Run(ctx, func(ctx context.Context) error {
|
||||
if pending.Length() == 0 {
|
||||
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0)
|
||||
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0, false)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return pending.DoneSending(err)
|
||||
}
|
||||
|
||||
if len(incomplete) == 0 {
|
||||
incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0)
|
||||
incomplete, err = endpoint.db.GetIncompleteFailed(ctx, nodeID, endpoint.config.MaxFailuresPerPiece, endpoint.config.EndpointBatchSize, 0, false)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return pending.DoneSending(err)
|
||||
@ -325,7 +326,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.PieceNum)
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -336,7 +337,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
segment, err := endpoint.getValidSegment(ctx, incomplete.Key, incomplete.RootPieceID)
|
||||
if err != nil {
|
||||
endpoint.log.Warn("invalid segment", zap.Error(err))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.PieceNum)
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -346,7 +347,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
|
||||
nodePiece, err := endpoint.getNodePiece(ctx, segment, incomplete)
|
||||
if err != nil {
|
||||
deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.PieceNum)
|
||||
deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
if deleteErr != nil {
|
||||
return Error.Wrap(deleteErr)
|
||||
}
|
||||
@ -360,7 +361,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, incomplete.PieceNum)
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -422,7 +423,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream pb.DRPCS
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.Key, incomplete.PieceNum)
|
||||
err = endpoint.db.IncrementOrderLimitSendCount(ctx, nodeID, incomplete.Key, uuid.UUID{}, metabase.SegmentPosition{}, incomplete.PieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -466,7 +467,7 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Key, int32(transfer.PieceNum))
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -489,7 +490,7 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream pb.DRPCSat
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Key, int32(transfer.PieceNum))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, exitingNodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -535,7 +536,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
return nil
|
||||
}
|
||||
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Key, int32(transfer.PieceNum))
|
||||
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, nodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -566,7 +567,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
}
|
||||
}
|
||||
if nodePiece == (metabase.Piece{}) {
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, int32(transfer.PieceNum))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -583,7 +584,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, int32(transfer.PieceNum))
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.Key, uuid.UUID{}, metabase.SegmentPosition{}, int32(transfer.PieceNum))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -593,7 +594,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
|
||||
transferQueueItem.LastFailedAt = &now
|
||||
transferQueueItem.FailedCount = &failedCount
|
||||
transferQueueItem.LastFailedCode = &errorCode
|
||||
err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem)
|
||||
err = endpoint.db.UpdateTransferQueueItem(ctx, *transferQueueItem, false)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -630,7 +631,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto
|
||||
}
|
||||
|
||||
// remove remaining items from the queue
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID)
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID, false)
|
||||
if err != nil {
|
||||
return true, Error.Wrap(err)
|
||||
}
|
||||
@ -658,7 +659,7 @@ func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSate
|
||||
}
|
||||
|
||||
// remove remaining items from the queue after notifying nodes about their exit status
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID)
|
||||
err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID, false)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
@ -304,7 +304,7 @@ func TestRecvTimeout(t *testing.T) {
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
@ -1031,7 +1031,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incomplete, 2)
|
||||
|
||||
@ -1071,7 +1071,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
})
|
||||
@ -1142,7 +1142,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
|
||||
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incomplete, 1)
|
||||
// TODO: change to this when an object part can be overwritten
|
||||
@ -1185,7 +1185,7 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
})
|
||||
@ -1321,7 +1321,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
_, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0)
|
||||
_, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 1, 0, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
var messageCount int
|
||||
@ -1365,7 +1365,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
|
||||
require.Equal(t, messageCount, maxOrderLimitSendCount)
|
||||
|
||||
// make sure not responding piece not in queue
|
||||
incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
|
||||
incompletes, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, incompletes, 0)
|
||||
|
||||
@ -1525,7 +1525,7 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun
|
||||
satellite.GracefulExit.Chore.Loop.TriggerWait()
|
||||
|
||||
// make sure all the pieces are in the transfer queue
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0)
|
||||
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// connect to satellite again to start receiving transfers
|
||||
|
@ -172,7 +172,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
|
||||
gracefulExitDB := planet.Satellites[0].DB.GracefulExit()
|
||||
batchSize := 1000
|
||||
|
||||
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
|
||||
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
asOfSystemTime := -1 * time.Microsecond
|
||||
@ -259,7 +259,7 @@ func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(
|
||||
queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...)
|
||||
|
||||
// Add some items to the transfer queue for the exited nodes.
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
disableAsOfSystemTime := time.Second * 0
|
||||
@ -404,7 +404,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) {
|
||||
|
||||
queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...)
|
||||
// Add some items to the transfer queue for the exited nodes.
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
|
||||
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
disableAsOfSystemTime := time.Second * 0
|
||||
|
@ -120,7 +120,7 @@ func (collector *PathCollector) flush(ctx context.Context, limit int) (err error
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(collector.buffer) >= limit {
|
||||
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize)
|
||||
err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize, false)
|
||||
collector.buffer = collector.buffer[:0]
|
||||
|
||||
return errs.Wrap(err)
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/pgutil"
|
||||
"storj.io/private/tagsql"
|
||||
@ -66,27 +67,43 @@ func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID)
|
||||
}
|
||||
|
||||
progress := &gracefulexit.Progress{
|
||||
NodeID: nID,
|
||||
BytesTransferred: dbxProgress.BytesTransferred,
|
||||
PiecesTransferred: dbxProgress.PiecesTransferred,
|
||||
PiecesFailed: dbxProgress.PiecesFailed,
|
||||
UpdatedAt: dbxProgress.UpdatedAt,
|
||||
NodeID: nID,
|
||||
BytesTransferred: dbxProgress.BytesTransferred,
|
||||
PiecesTransferred: dbxProgress.PiecesTransferred,
|
||||
PiecesFailed: dbxProgress.PiecesFailed,
|
||||
UpdatedAt: dbxProgress.UpdatedAt,
|
||||
UsesSegmentTransferQueue: dbxProgress.UsesSegmentTransferQueue,
|
||||
}
|
||||
|
||||
return progress, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// Enqueue batch inserts graceful exit transfer queue entries if it does not exist.
|
||||
func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) {
|
||||
func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int, usesSegmentTransferQueue bool) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
sort.Slice(items, func(i, k int) bool {
|
||||
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
|
||||
if compare == 0 {
|
||||
return bytes.Compare(items[i].Key, items[k].Key) < 0
|
||||
}
|
||||
return compare < 0
|
||||
})
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
if !usesSegmentTransferQueue {
|
||||
sort.Slice(items, func(i, k int) bool {
|
||||
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
|
||||
if compare == 0 {
|
||||
return bytes.Compare(items[i].Key, items[k].Key) < 0
|
||||
}
|
||||
return compare < 0
|
||||
})
|
||||
} else {
|
||||
sort.Slice(items, func(i, k int) bool {
|
||||
compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
|
||||
if compare == 0 {
|
||||
compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:])
|
||||
if compare == 0 {
|
||||
return items[i].Position.Encode() < items[k].Position.Encode()
|
||||
}
|
||||
return compare < 0
|
||||
}
|
||||
return compare < 0
|
||||
})
|
||||
}
|
||||
|
||||
for i := 0; i < len(items); i += batchSize {
|
||||
lowerBound := i
|
||||
@ -98,76 +115,159 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran
|
||||
|
||||
var nodeIDs []storj.NodeID
|
||||
var keys [][]byte
|
||||
var streamIds [][]byte
|
||||
var positions []int64
|
||||
var pieceNums []int32
|
||||
var rootPieceIDs [][]byte
|
||||
var durabilities []float64
|
||||
|
||||
for _, item := range items[lowerBound:upperBound] {
|
||||
item := item
|
||||
nodeIDs = append(nodeIDs, item.NodeID)
|
||||
keys = append(keys, item.Key)
|
||||
if !usesSegmentTransferQueue {
|
||||
keys = append(keys, item.Key)
|
||||
} else {
|
||||
streamIds = append(streamIds, item.StreamID[:])
|
||||
positions = append(positions, int64(item.Position.Encode()))
|
||||
}
|
||||
pieceNums = append(pieceNums, item.PieceNum)
|
||||
rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes())
|
||||
durabilities = append(durabilities, item.DurabilityRatio)
|
||||
}
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
if !usesSegmentTransferQueue {
|
||||
_, err = db.db.ExecContext(ctx, db.db.Rebind(`
|
||||
INSERT INTO graceful_exit_transfer_queue (
|
||||
node_id, path, piece_num,
|
||||
root_piece_id, durability_ratio, queued_at
|
||||
) SELECT
|
||||
unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]),
|
||||
unnest($4::bytea[]), unnest($5::float8[]), $6
|
||||
ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums),
|
||||
pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC())
|
||||
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
_, err = db.db.ExecContext(ctx, db.db.Rebind(`
|
||||
INSERT INTO graceful_exit_transfer_queue(node_id, path, piece_num, root_piece_id, durability_ratio, queued_at)
|
||||
SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]), unnest($4::bytea[]), unnest($5::float8[]), $6
|
||||
ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC())
|
||||
INSERT INTO graceful_exit_segment_transfer_queue (
|
||||
node_id, stream_id, position, piece_num,
|
||||
root_piece_id, durability_ratio, queued_at
|
||||
) SELECT
|
||||
unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int8[]),
|
||||
unnest($4::int4[]), unnest($5::bytea[]), unnest($6::float8[]),
|
||||
$7
|
||||
ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(streamIds), pgutil.Int8Array(positions),
|
||||
pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities),
|
||||
time.Now().UTC())
|
||||
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
|
||||
func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem) (err error) {
|
||||
func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem, usesSegmentTransferQueue bool) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
update := dbx.GracefulExitTransferQueue_Update_Fields{
|
||||
DurabilityRatio: dbx.GracefulExitTransferQueue_DurabilityRatio(item.DurabilityRatio),
|
||||
LastFailedCode: dbx.GracefulExitTransferQueue_LastFailedCode_Raw(item.LastFailedCode),
|
||||
FailedCount: dbx.GracefulExitTransferQueue_FailedCount_Raw(item.FailedCount),
|
||||
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped.
|
||||
if !usesSegmentTransferQueue {
|
||||
update := dbx.GracefulExitTransferQueue_Update_Fields{
|
||||
DurabilityRatio: dbx.GracefulExitTransferQueue_DurabilityRatio(item.DurabilityRatio),
|
||||
LastFailedCode: dbx.GracefulExitTransferQueue_LastFailedCode_Raw(item.LastFailedCode),
|
||||
FailedCount: dbx.GracefulExitTransferQueue_FailedCount_Raw(item.FailedCount),
|
||||
}
|
||||
|
||||
if item.RequestedAt != nil {
|
||||
update.RequestedAt = dbx.GracefulExitTransferQueue_RequestedAt_Raw(item.RequestedAt)
|
||||
}
|
||||
if item.LastFailedAt != nil {
|
||||
update.LastFailedAt = dbx.GracefulExitTransferQueue_LastFailedAt_Raw(item.LastFailedAt)
|
||||
}
|
||||
if item.FinishedAt != nil {
|
||||
update.FinishedAt = dbx.GracefulExitTransferQueue_FinishedAt_Raw(item.FinishedAt)
|
||||
}
|
||||
|
||||
return db.db.UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx,
|
||||
dbx.GracefulExitTransferQueue_NodeId(item.NodeID.Bytes()),
|
||||
dbx.GracefulExitTransferQueue_Path(item.Key),
|
||||
dbx.GracefulExitTransferQueue_PieceNum(int(item.PieceNum)),
|
||||
update,
|
||||
)
|
||||
}
|
||||
|
||||
update := dbx.GracefulExitSegmentTransfer_Update_Fields{
|
||||
DurabilityRatio: dbx.GracefulExitSegmentTransfer_DurabilityRatio(item.DurabilityRatio),
|
||||
LastFailedCode: dbx.GracefulExitSegmentTransfer_LastFailedCode_Raw(item.LastFailedCode),
|
||||
FailedCount: dbx.GracefulExitSegmentTransfer_FailedCount_Raw(item.FailedCount),
|
||||
}
|
||||
|
||||
if item.RequestedAt != nil {
|
||||
update.RequestedAt = dbx.GracefulExitTransferQueue_RequestedAt_Raw(item.RequestedAt)
|
||||
update.RequestedAt = dbx.GracefulExitSegmentTransfer_RequestedAt_Raw(item.RequestedAt)
|
||||
}
|
||||
if item.LastFailedAt != nil {
|
||||
update.LastFailedAt = dbx.GracefulExitTransferQueue_LastFailedAt_Raw(item.LastFailedAt)
|
||||
update.LastFailedAt = dbx.GracefulExitSegmentTransfer_LastFailedAt_Raw(item.LastFailedAt)
|
||||
}
|
||||
if item.FinishedAt != nil {
|
||||
update.FinishedAt = dbx.GracefulExitTransferQueue_FinishedAt_Raw(item.FinishedAt)
|
||||
update.FinishedAt = dbx.GracefulExitSegmentTransfer_FinishedAt_Raw(item.FinishedAt)
|
||||
}
|
||||
|
||||
return db.db.UpdateNoReturn_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx,
|
||||
dbx.GracefulExitTransferQueue_NodeId(item.NodeID.Bytes()),
|
||||
dbx.GracefulExitTransferQueue_Path(item.Key),
|
||||
dbx.GracefulExitTransferQueue_PieceNum(int(item.PieceNum)),
|
||||
return db.db.UpdateNoReturn_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
|
||||
dbx.GracefulExitSegmentTransfer_NodeId(item.NodeID.Bytes()),
|
||||
dbx.GracefulExitSegmentTransfer_StreamId(item.StreamID[:]),
|
||||
dbx.GracefulExitSegmentTransfer_Position(item.Position.Encode()),
|
||||
dbx.GracefulExitSegmentTransfer_PieceNum(int(item.PieceNum)),
|
||||
update,
|
||||
)
|
||||
}
|
||||
|
||||
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
|
||||
func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (err error) {
|
||||
func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), dbx.GracefulExitTransferQueue_Path(key),
|
||||
dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum)))
|
||||
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped.
|
||||
if key != nil {
|
||||
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()), dbx.GracefulExitTransferQueue_Path(key),
|
||||
dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum)))
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
|
||||
dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
|
||||
dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
|
||||
dbx.GracefulExitSegmentTransfer_Position(position.Encode()), dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum)))
|
||||
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
|
||||
func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||||
func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
if !usesSegmentTransferQueue {
|
||||
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
|
||||
return Error.Wrap(err)
|
||||
|
||||
}
|
||||
|
||||
// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries by nodeID.
|
||||
func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||||
func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID, usesSegmentTransferQueue bool) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped.
|
||||
if !usesSegmentTransferQueue {
|
||||
_, err = db.db.Delete_GracefulExitTransferQueue_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()))
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -181,12 +281,12 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
|
||||
switch db.db.impl {
|
||||
case dbutil.Postgres:
|
||||
statement := `
|
||||
DELETE FROM graceful_exit_transfer_queue
|
||||
DELETE FROM graceful_exit_segment_transfer_queue
|
||||
WHERE node_id IN (
|
||||
SELECT id
|
||||
FROM nodes
|
||||
WHERE exit_finished_at IS NOT NULL
|
||||
AND exit_finished_at < $1
|
||||
SELECT node_id FROM graceful_exit_segment_transfer_queue INNER JOIN nodes
|
||||
ON graceful_exit_segment_transfer_queue.node_id = nodes.id
|
||||
WHERE nodes.exit_finished_at IS NOT NULL
|
||||
AND nodes.exit_finished_at < $1
|
||||
)`
|
||||
res, err := db.db.ExecContext(ctx, statement, before)
|
||||
if err != nil {
|
||||
@ -197,8 +297,26 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
//TODO: remove when graceful_exit_transfer_queue is dropped.
|
||||
statement = `
|
||||
DELETE FROM graceful_exit_transfer_queue
|
||||
WHERE node_id IN (
|
||||
SELECT node_id FROM graceful_exit_transfer_queue INNER JOIN nodes
|
||||
ON graceful_exit_transfer_queue.node_id = nodes.id
|
||||
WHERE nodes.exit_finished_at IS NOT NULL
|
||||
AND nodes.exit_finished_at < $1
|
||||
)`
|
||||
res, err = db.db.ExecContext(ctx, statement, before)
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return count, nil
|
||||
countOldQueue, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return count + countOldQueue, nil
|
||||
|
||||
case dbutil.Cockroach:
|
||||
nodesQuery := `
|
||||
@ -210,10 +328,16 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
|
||||
LIMIT $2 OFFSET $3
|
||||
`
|
||||
deleteStmt := `
|
||||
DELETE FROM graceful_exit_transfer_queue
|
||||
DELETE FROM graceful_exit_segment_transfer_queue
|
||||
WHERE node_id = $1
|
||||
LIMIT $2
|
||||
`
|
||||
//TODO: remove when graceful_exit_transfer_queue is dropped.
|
||||
deleteStmtOldQueue := `
|
||||
DELETE FROM graceful_exit_transfer_queue
|
||||
WHERE node_id = $1
|
||||
LIMIT $2
|
||||
`
|
||||
var (
|
||||
deleteCount int64
|
||||
offset int
|
||||
@ -259,6 +383,21 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
|
||||
break
|
||||
}
|
||||
}
|
||||
//TODO: remove when graceful_exit_transfer_queue is dropped.
|
||||
for {
|
||||
res, err := db.db.ExecContext(ctx, deleteStmtOldQueue, id.Bytes(), batchSize)
|
||||
if err != nil {
|
||||
return deleteCount, Error.Wrap(err)
|
||||
}
|
||||
count, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return deleteCount, Error.Wrap(err)
|
||||
}
|
||||
deleteCount += count
|
||||
if count < int64(batchSize) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return deleteCount, nil
|
||||
}
|
||||
@ -349,39 +488,82 @@ func (db *gracefulexitDB) DeleteBatchExitProgress(ctx context.Context, nodeIDs [
|
||||
}
|
||||
|
||||
// GetTransferQueueItem gets a graceful exit transfer queue entry.
|
||||
func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) {
|
||||
func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
dbxTransferQueue, err := db.db.Get_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx,
|
||||
dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()),
|
||||
dbx.GracefulExitTransferQueue_Path(key),
|
||||
dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum)))
|
||||
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
if key != nil {
|
||||
dbxTransferQueue, err := db.db.Get_GracefulExitTransferQueue_By_NodeId_And_Path_And_PieceNum(ctx,
|
||||
dbx.GracefulExitTransferQueue_NodeId(nodeID.Bytes()),
|
||||
dbx.GracefulExitTransferQueue_Path(key),
|
||||
dbx.GracefulExitTransferQueue_PieceNum(int(pieceNum)))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
transferQueueItem, err := dbxToTransferQueueItem(dbxTransferQueue)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return transferQueueItem, Error.Wrap(err)
|
||||
}
|
||||
|
||||
dbxTransferQueue, err := db.db.Get_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
|
||||
dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
|
||||
dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
|
||||
dbx.GracefulExitSegmentTransfer_Position(position.Encode()),
|
||||
dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum)))
|
||||
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
transferQueueItem, err := dbxToTransferQueueItem(dbxTransferQueue)
|
||||
transferQueueItem, err := dbxSegmentTransferToTransferQueueItem(dbxTransferQueue)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return transferQueueItem, Error.Wrap(err)
|
||||
|
||||
}
|
||||
|
||||
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
|
||||
func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
|
||||
func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
sql := `SELECT node_id, path, piece_num, root_piece_id, durability_ratio, queued_at, requested_at, last_failed_at, last_failed_code, failed_count, finished_at, order_limit_send_count
|
||||
|
||||
var sql string
|
||||
if !usesSegmentTransferQueue {
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
sql = `
|
||||
SELECT
|
||||
node_id, path, piece_num,
|
||||
root_piece_id, durability_ratio,
|
||||
queued_at, requested_at, last_failed_at,
|
||||
last_failed_code, failed_count, finished_at,
|
||||
order_limit_send_count
|
||||
FROM graceful_exit_transfer_queue
|
||||
WHERE node_id = ?
|
||||
AND finished_at is NULL
|
||||
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
||||
} else {
|
||||
sql = `
|
||||
SELECT
|
||||
node_id, stream_id, position,
|
||||
piece_num, root_piece_id, durability_ratio,
|
||||
queued_at, requested_at, last_failed_at,
|
||||
last_failed_code, failed_count, finished_at,
|
||||
order_limit_send_count
|
||||
FROM graceful_exit_segment_transfer_queue
|
||||
WHERE node_id = ?
|
||||
AND finished_at is NULL
|
||||
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
||||
}
|
||||
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
|
||||
transferQueueItemRows, err := scanRows(rows)
|
||||
transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -390,21 +572,44 @@ func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID
|
||||
}
|
||||
|
||||
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending.
|
||||
func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
|
||||
func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
sql := `SELECT node_id, path, piece_num, root_piece_id, durability_ratio, queued_at, requested_at, last_failed_at, last_failed_code, failed_count, finished_at, order_limit_send_count
|
||||
|
||||
var sql string
|
||||
if !usesSegmentTransferQueue {
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
sql = `
|
||||
SELECT
|
||||
node_id, path, piece_num,
|
||||
root_piece_id, durability_ratio, queued_at,
|
||||
requested_at, last_failed_at, last_failed_code,
|
||||
failed_count, finished_at, order_limit_send_count
|
||||
FROM graceful_exit_transfer_queue
|
||||
WHERE node_id = ?
|
||||
AND finished_at is NULL
|
||||
AND last_failed_at is NULL
|
||||
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
||||
} else {
|
||||
sql = `
|
||||
SELECT
|
||||
node_id, stream_id, position,
|
||||
piece_num, root_piece_id, durability_ratio,
|
||||
queued_at, requested_at, last_failed_at,
|
||||
last_failed_code, failed_count, finished_at,
|
||||
order_limit_send_count
|
||||
FROM graceful_exit_segment_transfer_queue
|
||||
WHERE node_id = ?
|
||||
AND finished_at is NULL
|
||||
AND last_failed_at is NULL
|
||||
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
||||
}
|
||||
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
|
||||
transferQueueItemRows, err := scanRows(rows)
|
||||
transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -413,22 +618,46 @@ func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID sto
|
||||
}
|
||||
|
||||
// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
|
||||
func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
|
||||
func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64, usesSegmentTransferQueue bool) (_ []*gracefulexit.TransferQueueItem, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
sql := `SELECT node_id, path, piece_num, root_piece_id, durability_ratio, queued_at, requested_at, last_failed_at, last_failed_code, failed_count, finished_at, order_limit_send_count
|
||||
|
||||
var sql string
|
||||
if !usesSegmentTransferQueue {
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
sql = `
|
||||
SELECT
|
||||
node_id, path, piece_num,
|
||||
root_piece_id, durability_ratio, queued_at,
|
||||
requested_at, last_failed_at, last_failed_code,
|
||||
failed_count, finished_at, order_limit_send_count
|
||||
FROM graceful_exit_transfer_queue
|
||||
WHERE node_id = ?
|
||||
AND finished_at is NULL
|
||||
AND last_failed_at is not NULL
|
||||
AND failed_count < ?
|
||||
AND finished_at is NULL
|
||||
AND last_failed_at is not NULL
|
||||
AND failed_count < ?
|
||||
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
||||
} else {
|
||||
sql = `
|
||||
SELECT
|
||||
node_id, stream_id, position,
|
||||
piece_num, root_piece_id, durability_ratio,
|
||||
queued_at, requested_at, last_failed_at,
|
||||
last_failed_code, failed_count, finished_at,
|
||||
order_limit_send_count
|
||||
FROM graceful_exit_segment_transfer_queue
|
||||
WHERE node_id = ?
|
||||
AND finished_at is NULL
|
||||
AND last_failed_at is not NULL
|
||||
AND failed_count < ?
|
||||
ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
|
||||
}
|
||||
rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), maxFailures, limit, offset)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
||||
|
||||
transferQueueItemRows, err := scanRows(rows)
|
||||
transferQueueItemRows, err := scanRows(rows, usesSegmentTransferQueue)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -437,16 +666,26 @@ func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.
|
||||
}
|
||||
|
||||
// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
|
||||
func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (err error) {
|
||||
func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
sql := db.db.Rebind(
|
||||
`UPDATE graceful_exit_transfer_queue SET order_limit_send_count = graceful_exit_transfer_queue.order_limit_send_count + 1
|
||||
WHERE node_id = ?
|
||||
AND path = ?
|
||||
AND piece_num = ?`,
|
||||
)
|
||||
_, err = db.db.ExecContext(ctx, sql, nodeID, key, pieceNum)
|
||||
var sql string
|
||||
if key != nil {
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
sql = `UPDATE graceful_exit_transfer_queue SET order_limit_send_count = graceful_exit_transfer_queue.order_limit_send_count + 1
|
||||
WHERE node_id = ?
|
||||
AND path = ?
|
||||
AND piece_num = ?`
|
||||
_, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, key, pieceNum)
|
||||
} else {
|
||||
sql = `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1
|
||||
WHERE node_id = ?
|
||||
AND stream_id = ?
|
||||
AND position = ?
|
||||
AND piece_num = ?`
|
||||
_, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum)
|
||||
}
|
||||
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -457,7 +696,7 @@ func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Cont
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
query := `SELECT n.id, count(getq.node_id)
|
||||
FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq
|
||||
FROM nodes as n INNER JOIN graceful_exit_segment_transfer_queue as getq
|
||||
ON n.id = getq.node_id
|
||||
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
|
||||
WHERE n.exit_finished_at IS NOT NULL
|
||||
@ -486,16 +725,57 @@ func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Cont
|
||||
nodesItemsCount[nodeID] = n
|
||||
}
|
||||
|
||||
return nodesItemsCount, Error.Wrap(rows.Err())
|
||||
err = rows.Err()
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return nodesItemsCount, Error.Wrap(err)
|
||||
}
|
||||
// TODO: remove when graceful_exit_transfer_queue is dropped
|
||||
query = `SELECT n.id, count(getq.node_id)
|
||||
FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq
|
||||
ON n.id = getq.node_id
|
||||
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
|
||||
WHERE n.exit_finished_at IS NOT NULL
|
||||
AND n.exit_finished_at < ?
|
||||
GROUP BY n.id`
|
||||
|
||||
statement = db.db.Rebind(query)
|
||||
|
||||
rowsOldTransfers, err := db.db.QueryContext(ctx, statement, before)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
defer func() { err = errs.Combine(err, Error.Wrap(rowsOldTransfers.Close())) }()
|
||||
|
||||
for rowsOldTransfers.Next() {
|
||||
var (
|
||||
nodeID storj.NodeID
|
||||
n int64
|
||||
)
|
||||
err := rowsOldTransfers.Scan(&nodeID, &n)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
nodesItemsCount[nodeID] = n
|
||||
}
|
||||
return nodesItemsCount, Error.Wrap(rowsOldTransfers.Err())
|
||||
}
|
||||
|
||||
func scanRows(rows tagsql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) {
|
||||
func scanRows(rows tagsql.Rows, usesSegmentTransferQueue bool) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) {
|
||||
for rows.Next() {
|
||||
transferQueueItem := &gracefulexit.TransferQueueItem{}
|
||||
var pieceIDBytes []byte
|
||||
err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.Key, &transferQueueItem.PieceNum, &pieceIDBytes,
|
||||
&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
|
||||
&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
|
||||
if !usesSegmentTransferQueue {
|
||||
// TODO: to be removed when graceful_exit_transfer_queue is dropped
|
||||
err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.Key, &transferQueueItem.PieceNum, &pieceIDBytes,
|
||||
&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
|
||||
&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
|
||||
} else {
|
||||
err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes,
|
||||
&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
|
||||
&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -549,3 +829,50 @@ func dbxToTransferQueueItem(dbxTransferQueue *dbx.GracefulExitTransferQueue) (it
|
||||
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func dbxSegmentTransferToTransferQueueItem(dbxSegmentTransfer *dbx.GracefulExitSegmentTransfer) (item *gracefulexit.TransferQueueItem, err error) {
|
||||
nID, err := storj.NodeIDFromBytes(dbxSegmentTransfer.NodeId)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
streamID, err := uuid.FromBytes(dbxSegmentTransfer.StreamId)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
position := metabase.SegmentPositionFromEncoded(dbxSegmentTransfer.Position)
|
||||
|
||||
item = &gracefulexit.TransferQueueItem{
|
||||
NodeID: nID,
|
||||
StreamID: streamID,
|
||||
Position: position,
|
||||
PieceNum: int32(dbxSegmentTransfer.PieceNum),
|
||||
DurabilityRatio: dbxSegmentTransfer.DurabilityRatio,
|
||||
QueuedAt: dbxSegmentTransfer.QueuedAt,
|
||||
OrderLimitSendCount: dbxSegmentTransfer.OrderLimitSendCount,
|
||||
}
|
||||
if dbxSegmentTransfer.RootPieceId != nil {
|
||||
item.RootPieceID, err = storj.PieceIDFromBytes(dbxSegmentTransfer.RootPieceId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if dbxSegmentTransfer.LastFailedCode != nil {
|
||||
item.LastFailedCode = dbxSegmentTransfer.LastFailedCode
|
||||
}
|
||||
if dbxSegmentTransfer.FailedCount != nil {
|
||||
item.FailedCount = dbxSegmentTransfer.FailedCount
|
||||
}
|
||||
if dbxSegmentTransfer.RequestedAt != nil && !dbxSegmentTransfer.RequestedAt.IsZero() {
|
||||
item.RequestedAt = dbxSegmentTransfer.RequestedAt
|
||||
}
|
||||
if dbxSegmentTransfer.LastFailedAt != nil && !dbxSegmentTransfer.LastFailedAt.IsZero() {
|
||||
item.LastFailedAt = dbxSegmentTransfer.LastFailedAt
|
||||
}
|
||||
if dbxSegmentTransfer.FinishedAt != nil && !dbxSegmentTransfer.FinishedAt.IsZero() {
|
||||
item.FinishedAt = dbxSegmentTransfer.FinishedAt
|
||||
}
|
||||
|
||||
return item, nil
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
|
||||
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
@ -110,7 +110,7 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
|
||||
exitingNode.GracefulExit.Chore.TestWaitForWorkers()
|
||||
|
||||
// check that there are no more items to process
|
||||
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
|
||||
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 0)
|
||||
|
||||
|
@ -69,7 +69,7 @@ func TestWorkerSuccess(t *testing.T) {
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
@ -142,7 +142,7 @@ func TestWorkerTimeout(t *testing.T) {
|
||||
require.Len(t, exitingNodes, 1)
|
||||
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
||||
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
|
||||
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, queueItems, 1)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user