c00ecae75c
Remove the logic associated to the old transfer queue. A new transfer queue (gracefulexit_segment_transfer_queue) has been created for migration to segmentLoop. Transfers from the old queue were not moved to the new queue. Instead, it was still used for nodes which have initiated graceful exit before migration. There is no such node left, so we can remove all this logic. In a next step, we will drop the table. Change-Id: I3aa9bc29b76065d34b57a73f6e9c9d0297587f54
499 lines
18 KiB
Go
499 lines
18 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package gracefulexit_test
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/gracefulexit"
|
|
"storj.io/storj/satellite/metabase"
|
|
"storj.io/storj/satellite/overlay"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 6,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellite := planet.Satellites[0]
|
|
geDB := planet.Satellites[0].DB.GracefulExit()
|
|
|
|
days := 6
|
|
currentTime := time.Now().UTC()
|
|
// Set timestamp back by the number of days we want to save
|
|
timestamp := currentTime.AddDate(0, 0, -days).Truncate(time.Millisecond)
|
|
|
|
for i := 0; i < days; i++ {
|
|
nodeID := planet.StorageNodes[i].ID()
|
|
err := geDB.IncrementProgress(ctx, nodeID, 100, 100, 100)
|
|
require.NoError(t, err)
|
|
|
|
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeID,
|
|
ExitFinishedAt: timestamp,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Advance time by 24 hours
|
|
timestamp = timestamp.Add(time.Hour * 24)
|
|
}
|
|
threeDays := currentTime.AddDate(0, 0, -days/2).Add(-time.Millisecond)
|
|
disableAsOfSystemTime := time.Second * 0
|
|
finishedNodes, err := geDB.GetFinishedExitNodes(ctx, threeDays, disableAsOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.Len(t, finishedNodes, 3)
|
|
|
|
finishedNodes, err = geDB.GetFinishedExitNodes(ctx, currentTime, disableAsOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.Len(t, finishedNodes, 6)
|
|
|
|
count, err := geDB.DeleteFinishedExitProgress(ctx, threeDays, disableAsOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, 3, count)
|
|
|
|
// Check that first three nodes were removed from exit progress table
|
|
for i, node := range planet.StorageNodes {
|
|
progress, err := geDB.GetProgress(ctx, node.ID())
|
|
if i < 3 {
|
|
require.True(t, gracefulexit.ErrNodeNotFound.Has(err))
|
|
require.Nil(t, progress)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestGracefulExit_HandleAsOfSystemTimeBadInput(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 1,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
gracefulexitDB := planet.Satellites[0].DB.GracefulExit()
|
|
now := time.Now().UTC()
|
|
// explicitly set as of system time to invalid time values and run queries to ensure queries don't break
|
|
badTime1 := -1 * time.Nanosecond
|
|
_, err := gracefulexitDB.CountFinishedTransferQueueItemsByNode(ctx, now, badTime1)
|
|
require.NoError(t, err)
|
|
badTime2 := 1 * time.Second
|
|
_, err = gracefulexitDB.DeleteFinishedExitProgress(ctx, now, badTime2)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 7,
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
var (
|
|
cache = planet.Satellites[0].DB.OverlayCache()
|
|
currentTime = time.Now().UTC()
|
|
)
|
|
|
|
// Mark some of the storagenodes as successful exit
|
|
nodeSuccessful1 := planet.StorageNodes[1]
|
|
_, err := cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeSuccessful1.ID(),
|
|
ExitInitiatedAt: currentTime.Add(-time.Hour),
|
|
ExitLoopCompletedAt: currentTime.Add(-30 * time.Minute),
|
|
ExitFinishedAt: currentTime.Add(-25 * time.Minute),
|
|
ExitSuccess: true,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
nodeSuccessful2 := planet.StorageNodes[2]
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeSuccessful2.ID(),
|
|
ExitInitiatedAt: currentTime.Add(-time.Hour),
|
|
ExitLoopCompletedAt: currentTime.Add(-17 * time.Minute),
|
|
ExitFinishedAt: currentTime.Add(-16 * time.Minute),
|
|
ExitSuccess: true,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
nodeSuccessful3 := planet.StorageNodes[3]
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeSuccessful3.ID(),
|
|
ExitInitiatedAt: currentTime.Add(-time.Hour),
|
|
ExitLoopCompletedAt: currentTime.Add(-9 * time.Minute),
|
|
ExitFinishedAt: currentTime.Add(-6 * time.Minute),
|
|
ExitSuccess: true,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Mark some of the storagenodes as failed exit
|
|
nodeFailed1 := planet.StorageNodes[4]
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeFailed1.ID(),
|
|
ExitInitiatedAt: currentTime.Add(-time.Hour),
|
|
ExitLoopCompletedAt: currentTime.Add(-28 * time.Minute),
|
|
ExitFinishedAt: currentTime.Add(-20 * time.Minute),
|
|
ExitSuccess: false,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
nodeFailed2 := planet.StorageNodes[5]
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeFailed2.ID(),
|
|
ExitInitiatedAt: currentTime.Add(-time.Hour),
|
|
ExitLoopCompletedAt: currentTime.Add(-17 * time.Minute),
|
|
ExitFinishedAt: currentTime.Add(-15 * time.Minute),
|
|
ExitSuccess: false,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
nodeWithoutItems := planet.StorageNodes[6]
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeWithoutItems.ID(),
|
|
ExitInitiatedAt: currentTime.Add(-time.Hour),
|
|
ExitLoopCompletedAt: currentTime.Add(-35 * time.Minute),
|
|
ExitFinishedAt: currentTime.Add(-32 * time.Minute),
|
|
ExitSuccess: false,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
queueItemsPerNode := 500
|
|
// Add some items to the transfer queue for the exited nodes
|
|
queueItems, nodesItems := generateTransferQueueItems(t, queueItemsPerNode, []*testplanet.StorageNode{
|
|
nodeSuccessful1, nodeSuccessful2, nodeSuccessful3, nodeFailed1, nodeFailed2,
|
|
})
|
|
|
|
gracefulExitDB := planet.Satellites[0].DB.GracefulExit()
|
|
batchSize := 1000
|
|
|
|
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
|
|
require.NoError(t, err)
|
|
|
|
asOfSystemTime := -1 * time.Microsecond
|
|
// Count nodes exited before 15 minutes ago
|
|
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.Len(t, nodes, 3, "invalid number of nodes which have exited 15 minutes ago")
|
|
|
|
for id, n := range nodes {
|
|
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
|
|
}
|
|
|
|
// Count nodes exited before 4 minutes ago
|
|
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute), asOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.Len(t, nodes, 5, "invalid number of nodes which have exited 4 minutes ago")
|
|
|
|
for id, n := range nodes {
|
|
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
|
|
}
|
|
|
|
// Delete items of nodes exited before 15 minutes ago
|
|
count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime, batchSize)
|
|
require.NoError(t, err)
|
|
expectedNumDeletedItems := nodesItems[nodeSuccessful1.ID()] +
|
|
nodesItems[nodeSuccessful2.ID()] +
|
|
nodesItems[nodeFailed1.ID()]
|
|
require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items")
|
|
|
|
// Check that only a few nodes have exited are left with items
|
|
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.Len(t, nodes, 2, "invalid number of exited nodes with items")
|
|
|
|
for id, n := range nodes {
|
|
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
|
|
assert.NotEqual(t, nodeSuccessful1.ID(), id, "node shouldn't have items")
|
|
assert.NotEqual(t, nodeSuccessful2.ID(), id, "node shouldn't have items")
|
|
assert.NotEqual(t, nodeFailed1.ID(), id, "node shouldn't have items")
|
|
}
|
|
|
|
// Delete the rest of the nodes' items
|
|
count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute), asOfSystemTime, batchSize)
|
|
require.NoError(t, err)
|
|
expectedNumDeletedItems = nodesItems[nodeSuccessful3.ID()] + nodesItems[nodeFailed2.ID()]
|
|
require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items")
|
|
|
|
// Check that there aren't more exited nodes with items
|
|
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.Len(t, nodes, 0, "invalid number of exited nodes with items")
|
|
})
|
|
}
|
|
|
|
// TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batch
|
|
// ensures that deletion works as expected using different batch sizes.
|
|
func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
var testCases = []struct {
|
|
name string
|
|
batchSize int
|
|
transferQueueItemsPerNode int
|
|
numExitedNodes int
|
|
}{
|
|
{"less than complete batch, odd batch", 333, 3, 30},
|
|
{"less than complete batch, even batch", 8888, 222, 40},
|
|
{"over complete batch, odd batch", 3000, 200, 25},
|
|
{"over complete batch, even batch", 1000, 110, 10},
|
|
{"exact batch, odd batch", 1125, 25, 45},
|
|
{"exact batch, even batch", 7200, 1200, 6},
|
|
}
|
|
for _, tt := range testCases {
|
|
tt := tt
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
var (
|
|
gracefulExitDB = db.GracefulExit()
|
|
currentTime = time.Now().UTC()
|
|
batchSize = tt.batchSize
|
|
numItems = tt.transferQueueItemsPerNode
|
|
numExitedNodes = tt.numExitedNodes
|
|
)
|
|
|
|
exitedNodeIDs := generateExitedNodes(t, ctx, db, currentTime, numExitedNodes)
|
|
queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...)
|
|
|
|
// Add some items to the transfer queue for the exited nodes.
|
|
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
|
|
require.NoError(t, err)
|
|
|
|
disableAsOfSystemTime := time.Second * 0
|
|
// Count exited nodes
|
|
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes")
|
|
|
|
// Delete items of the exited nodes
|
|
count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, len(queueItems), count, "invalid number of deleted items")
|
|
|
|
// Count exited nodes. At this time there shouldn't be any exited node with
|
|
// items in the queue
|
|
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes")
|
|
|
|
// Delete items of the exited nodes. At this time there shouldn't be any
|
|
count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize)
|
|
require.NoError(t, err)
|
|
require.Zero(t, count, "invalid number of deleted items")
|
|
})
|
|
}
|
|
}
|
|
|
|
func generateExitedNodes(t *testing.T, ctx *testcontext.Context, db satellite.DB, currentTime time.Time, numExitedNodes int) (exitedNodeIDs storj.NodeIDList) {
|
|
const (
|
|
addr = "127.0.1.0:8080"
|
|
lastNet = "127.0.0"
|
|
)
|
|
var (
|
|
cache = db.OverlayCache()
|
|
nodeIDsMap = make(map[storj.NodeID]struct{})
|
|
)
|
|
for i := 0; i < numExitedNodes; i++ {
|
|
nodeID := generateNodeIDFromPostiveInt(t, i)
|
|
exitedNodeIDs = append(exitedNodeIDs, nodeID)
|
|
if _, ok := nodeIDsMap[nodeID]; ok {
|
|
fmt.Printf("this %v already exists\n", nodeID.Bytes())
|
|
}
|
|
nodeIDsMap[nodeID] = struct{}{}
|
|
|
|
info := overlay.NodeCheckInInfo{
|
|
NodeID: nodeID,
|
|
Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC},
|
|
LastIPPort: addr,
|
|
LastNet: lastNet,
|
|
Version: &pb.NodeVersion{Version: "v1.0.0"},
|
|
Capacity: &pb.NodeCapacity{},
|
|
IsUp: true,
|
|
}
|
|
err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
|
|
exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1) * int64(time.Minute)))
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeID,
|
|
ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute),
|
|
ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute),
|
|
ExitFinishedAt: exitFinishedAt,
|
|
ExitSuccess: true,
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
require.Equal(t, numExitedNodes, len(nodeIDsMap), "map")
|
|
return exitedNodeIDs
|
|
}
|
|
|
|
// TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch verifies that
|
|
// the CRDB batch logic for delete all the transfer queue items of exited nodes
|
|
// works as expected.
|
|
func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
const (
|
|
addr = "127.0.1.0:8080"
|
|
lastNet = "127.0.0"
|
|
)
|
|
var (
|
|
numNonExitedNodes = rand.Intn(20) + 1
|
|
numExitedNodes = rand.Intn(10) + 20
|
|
cache = db.OverlayCache()
|
|
)
|
|
|
|
for i := 0; i < numNonExitedNodes; i++ {
|
|
info := overlay.NodeCheckInInfo{
|
|
NodeID: generateNodeIDFromPostiveInt(t, i),
|
|
Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC},
|
|
LastIPPort: addr,
|
|
LastNet: lastNet,
|
|
Version: &pb.NodeVersion{Version: "v1.0.0"},
|
|
Capacity: &pb.NodeCapacity{},
|
|
IsUp: true,
|
|
}
|
|
err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
var (
|
|
currentTime = time.Now().UTC()
|
|
exitedNodeIDs = make([]storj.NodeID, 0, numNonExitedNodes)
|
|
nodeIDsMap = make(map[storj.NodeID]struct{})
|
|
)
|
|
for i := numNonExitedNodes; i < (numNonExitedNodes + numExitedNodes); i++ {
|
|
nodeID := generateNodeIDFromPostiveInt(t, i)
|
|
exitedNodeIDs = append(exitedNodeIDs, nodeID)
|
|
if _, ok := nodeIDsMap[nodeID]; ok {
|
|
fmt.Printf("this %v already exists\n", nodeID.Bytes())
|
|
}
|
|
nodeIDsMap[nodeID] = struct{}{}
|
|
|
|
info := overlay.NodeCheckInInfo{
|
|
NodeID: nodeID,
|
|
Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC},
|
|
LastIPPort: addr,
|
|
LastNet: lastNet,
|
|
Version: &pb.NodeVersion{Version: "v1.0.0"},
|
|
Capacity: &pb.NodeCapacity{},
|
|
IsUp: true,
|
|
}
|
|
err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
|
|
exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1) * int64(time.Minute)))
|
|
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
|
|
NodeID: nodeID,
|
|
ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute),
|
|
ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute),
|
|
ExitFinishedAt: exitFinishedAt,
|
|
ExitSuccess: true,
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.Equal(t, numExitedNodes, len(nodeIDsMap), "map")
|
|
|
|
gracefulExitDB := db.GracefulExit()
|
|
batchSize := 1000
|
|
|
|
queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...)
|
|
// Add some items to the transfer queue for the exited nodes.
|
|
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
|
|
require.NoError(t, err)
|
|
|
|
disableAsOfSystemTime := time.Second * 0
|
|
// Count exited nodes
|
|
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes")
|
|
|
|
// Delete items of the exited nodes
|
|
count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, len(queueItems), count, "invalid number of deleted items")
|
|
|
|
// Count exited nodes. At this time there shouldn't be any exited node with
|
|
// items in the queue
|
|
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes")
|
|
|
|
// Delete items of the exited nodes. At this time there shouldn't be any
|
|
count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize)
|
|
require.NoError(t, err)
|
|
require.Zero(t, count, "invalid number of deleted items")
|
|
})
|
|
}
|
|
|
|
// generateTransferQueueItems generates a random number of transfer queue items,
|
|
// between 10 and 120, for each passed node.
|
|
func generateTransferQueueItems(t *testing.T, itemsPerNode int, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) {
|
|
nodeIDs := make([]storj.NodeID, len(nodes))
|
|
for i, n := range nodes {
|
|
nodeIDs[i] = n.ID()
|
|
}
|
|
|
|
items := generateNTransferQueueItemsPerNode(t, itemsPerNode, nodeIDs...)
|
|
|
|
nodesItems := make(map[storj.NodeID]int64, len(nodes))
|
|
for _, item := range items {
|
|
nodesItems[item.NodeID]++
|
|
}
|
|
|
|
return items, nodesItems
|
|
}
|
|
|
|
// generateNTransferQueueItemsPerNode generates n queue items for each nodeID.
|
|
func generateNTransferQueueItemsPerNode(t *testing.T, n int, nodeIDs ...storj.NodeID) []gracefulexit.TransferQueueItem {
|
|
items := make([]gracefulexit.TransferQueueItem, 0)
|
|
for _, nodeID := range nodeIDs {
|
|
for i := 0; i < n; i++ {
|
|
items = append(items, gracefulexit.TransferQueueItem{
|
|
NodeID: nodeID,
|
|
StreamID: testrand.UUID(),
|
|
Position: metabase.SegmentPositionFromEncoded(rand.Uint64()),
|
|
PieceNum: rand.Int31(),
|
|
})
|
|
}
|
|
}
|
|
return items
|
|
}
|
|
|
|
// generateNodeIDFromPostiveInt generates a specific node ID for val; each val
|
|
// value produces a different node ID.
|
|
func generateNodeIDFromPostiveInt(t *testing.T, val int) storj.NodeID {
|
|
t.Helper()
|
|
|
|
if val < 0 {
|
|
t.Fatal("cannot generate a node from a negative integer")
|
|
}
|
|
|
|
nodeID := storj.NodeID{}
|
|
idx := 0
|
|
for {
|
|
m := val & 255
|
|
nodeID[idx] = byte(m)
|
|
|
|
q := val >> 8
|
|
if q == 0 {
|
|
break
|
|
}
|
|
if q < 256 {
|
|
nodeID[idx+1] = byte(q)
|
|
break
|
|
}
|
|
|
|
val = q
|
|
idx++
|
|
}
|
|
|
|
return nodeID
|
|
}
|