cmd/satellite: Add command for GE data cleanup

Add a command to the satellite for cleaning up the Graceful Exit (a.k.a
GE) transfer queue items of nodes that have exited.

The commit adds to the GE satellite DB a couple of new methods, and its
corresponding test, for performing the operations of the new command.

Change-Id: I29a572a59689d63b24990ac13c52e76d65aaa917
This commit is contained in:
Ivan Fraixedes 2021-01-14 16:57:04 +01:00 committed by Ivan Fraixedes
parent 1cf3d89a56
commit 076804eac9
5 changed files with 376 additions and 17 deletions

View File

@ -11,6 +11,7 @@ import (
"io"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
@ -27,7 +28,7 @@ import (
// generateGracefulExitCSV creates a report with graceful exit data for exiting or exited nodes in a given period.
func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Time, end time.Time, output io.Writer) error {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), gracefulExitCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
db, err := satellitedb.Open(ctx, zap.L().Named("db"), reportsGracefulExitCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
@ -149,6 +150,77 @@ func verifyGracefulExitReceipt(ctx context.Context, identity *identity.FullIdent
return writeVerificationMessage(true, completed.SatelliteId, completed.NodeId, completed.Completed)
}
func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before)
if err != nil {
return err
}
if len(nodesItems) == 0 {
fmt.Printf("There isn't any item left in the DB for nodes exited before %s\n", before.Format("2006-01-02"))
return nil
}
{ // print the nodesItems
fmt.Println(" Node ID | Num. Items ")
fmt.Println("----------------------------------------------------------------------------------------")
var totalItems int64
for id, n := range nodesItems {
sid := id.String()
// 61 is the char positions between the beginning of the line and the next
// column separator, and 24 the char positions for the second column
// length. Measuring the length of the first column value (node ID), we
// calculate how many positions to shift to start printing in the next
// column and then we tell the Printf to align the value to the right by
// 24 positions which is where the column ends and where last column char
// value should end.
fmt.Printf(fmt.Sprintf(" %%s %%%dd\n", 24+61-len(sid)), sid, n)
totalItems += n
}
fmt.Println("----------------------------------------------------------------------------------------")
fmt.Printf(" Total | %22d \n\n", totalItems)
}
_, err = fmt.Printf("Confirm that you want to delete the above items from the DB? (confirm with 'yes') ")
if err != nil {
return err
}
var confirm string
n, err := fmt.Scanln(&confirm)
if err != nil {
if n != 0 {
return err
}
// fmt.Scanln cannot handle empty input
confirm = "n"
}
if strings.ToLower(confirm) != "yes" {
fmt.Println("Aborted, NO ITEMS have been deleted")
return nil
}
total, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before)
if err != nil {
fmt.Println("Error, NO ITEMS have been deleted")
return err
}
fmt.Printf("%d number of items have been deleted from\n", total)
return nil
}
func checkIDs(satelliteID storj.NodeID, providedSNID storj.NodeID, receiptSatelliteID storj.NodeID, receiptSNID storj.NodeID) error {
if satelliteID != receiptSatelliteID {
return errs.New("satellite ID (%v) does not match receipt satellite ID (%v).", satelliteID, receiptSatelliteID)

View File

@ -137,19 +137,19 @@ var (
Args: cobra.MinimumNArgs(3),
RunE: cmdValueAttribution,
}
gracefulExitCmd = &cobra.Command{
reportsGracefulExitCmd = &cobra.Command{
Use: "graceful-exit [start] [end]",
Short: "Generate a graceful exit report",
Long: "Generate a node usage report for a given period to use for payments. Format dates using YYYY-MM-DD. The end date is exclusive.",
Args: cobra.MinimumNArgs(2),
RunE: cmdGracefulExit,
RunE: cmdReportsGracefulExit,
}
verifyGracefulExitReceiptCmd = &cobra.Command{
reportsVerifyGEReceiptCmd = &cobra.Command{
Use: "verify-exit-receipt [storage node ID] [receipt]",
Short: "Verify a graceful exit receipt",
Long: "Verify a graceful exit receipt is valid.",
Args: cobra.MinimumNArgs(2),
RunE: cmdVerifyGracefulExitReceipt,
RunE: reportsVerifyGEReceipt,
}
compensationCmd = &cobra.Command{
Use: "compensation",
@ -220,6 +220,17 @@ var (
Long: "Ensures that we have a stripe customer for every satellite user.",
RunE: cmdStripeCustomer,
}
consistencyCmd = &cobra.Command{
Use: "consistency",
Short: "Readdress DB consistency issues",
Long: "Readdress DB consistency issues and perform data cleanups for improving the DB performance.",
}
consistencyGECleanupCmd = &cobra.Command{
Use: "ge-cleanup-orphaned-data",
Short: "Cleanup Graceful Exit orphaned data",
Long: "Cleanup Graceful Exit data which is lingering in the transfer queue DB table on nodes which has finished the exit.",
RunE: cmdConsistencyGECleanup,
}
runCfg Satellite
setupCfg Satellite
@ -248,13 +259,18 @@ var (
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
Output string `help:"destination of report output" default:""`
}
gracefulExitCfg struct {
reportsGracefulExitCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
Output string `help:"destination of report output" default:""`
Completed bool `help:"whether to output (initiated and completed) or (initiated and not completed)" default:"false"`
}
verifyGracefulExitReceiptCfg struct {
reportsVerifyGracefulExitReceiptCfg struct {
}
consistencyGECleanupCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
Before string `help:"select only exited nodes before this UTC date formatted like YYYY-MM. Date cannot be newer than the current time (required)"`
}
confDir string
identityDir string
)
@ -276,10 +292,11 @@ func init() {
rootCmd.AddCommand(reportsCmd)
rootCmd.AddCommand(compensationCmd)
rootCmd.AddCommand(billingCmd)
rootCmd.AddCommand(consistencyCmd)
reportsCmd.AddCommand(nodeUsageCmd)
reportsCmd.AddCommand(partnerAttributionCmd)
reportsCmd.AddCommand(gracefulExitCmd)
reportsCmd.AddCommand(verifyGracefulExitReceiptCmd)
reportsCmd.AddCommand(reportsGracefulExitCmd)
reportsCmd.AddCommand(reportsVerifyGEReceiptCmd)
compensationCmd.AddCommand(generateInvoicesCmd)
compensationCmd.AddCommand(recordPeriodCmd)
compensationCmd.AddCommand(recordOneOffPaymentsCmd)
@ -289,6 +306,7 @@ func init() {
billingCmd.AddCommand(createCustomerInvoicesCmd)
billingCmd.AddCommand(finalizeCustomerInvoicesCmd)
billingCmd.AddCommand(stripeCustomerCmd)
consistencyCmd.AddCommand(consistencyGECleanupCmd)
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runMigrationCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
@ -301,8 +319,8 @@ func init() {
process.Bind(generateInvoicesCmd, &generateInvoicesCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(recordPeriodCmd, &recordPeriodCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(recordOneOffPaymentsCmd, &recordOneOffPaymentsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(gracefulExitCmd, &gracefulExitCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(verifyGracefulExitReceiptCmd, &verifyGracefulExitReceiptCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(reportsGracefulExitCmd, &reportsGracefulExitCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(reportsVerifyGEReceiptCmd, &reportsVerifyGracefulExitReceiptCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(partnerAttributionCmd, &partnerAttribtionCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(prepareCustomerInvoiceRecordsCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(createCustomerInvoiceItemsCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
@ -310,6 +328,11 @@ func init() {
process.Bind(createCustomerInvoicesCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(finalizeCustomerInvoicesCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(stripeCustomerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(consistencyGECleanupCmd, &consistencyGECleanupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
if err := consistencyGECleanupCmd.MarkFlagRequired("before"); err != nil {
panic(err)
}
}
func cmdRun(cmd *cobra.Command, args []string) (err error) {
@ -490,7 +513,7 @@ func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
return w.Flush()
}
func cmdVerifyGracefulExitReceipt(cmd *cobra.Command, args []string) (err error) {
func reportsVerifyGEReceipt(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
identity, err := runCfg.Identity.Load()
@ -507,7 +530,7 @@ func cmdVerifyGracefulExitReceipt(cmd *cobra.Command, args []string) (err error)
return verifyGracefulExitReceipt(ctx, identity, nodeID, args[1])
}
func cmdGracefulExit(cmd *cobra.Command, args []string) (err error) {
func cmdReportsGracefulExit(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd)
start, end, err := reports.ParseRange(args[0], args[1])
@ -516,12 +539,12 @@ func cmdGracefulExit(cmd *cobra.Command, args []string) (err error) {
}
// send output to stdout
if gracefulExitCfg.Output == "" {
return generateGracefulExitCSV(ctx, gracefulExitCfg.Completed, start, end, os.Stdout)
if reportsGracefulExitCfg.Output == "" {
return generateGracefulExitCSV(ctx, reportsGracefulExitCfg.Completed, start, end, os.Stdout)
}
// send output to file
file, err := os.Create(gracefulExitCfg.Output)
file, err := os.Create(reportsGracefulExitCfg.Output)
if err != nil {
return err
}
@ -530,7 +553,7 @@ func cmdGracefulExit(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, file.Close())
}()
return generateGracefulExitCSV(ctx, gracefulExitCfg.Completed, start, end, file)
return generateGracefulExitCSV(ctx, reportsGracefulExitCfg.Completed, start, end, file)
}
func cmdNodeUsage(cmd *cobra.Command, args []string) (err error) {
@ -706,6 +729,21 @@ func cmdStripeCustomer(cmd *cobra.Command, args []string) (err error) {
return generateStripeCustomers(ctx)
}
func cmdConsistencyGECleanup(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
before, err := time.Parse("2006-01-02", consistencyGECleanupCfg.Before)
if err != nil {
return errs.New("before flag value isn't of the expected format. %+v", err)
}
if before.After(time.Now()) {
return errs.New("before flag value cannot be newer than the current time.")
}
return cleanupGEOrphanedData(ctx, before.UTC())
}
func main() {
process.ExecCustomDebug(rootCmd)
}

View File

@ -55,6 +55,10 @@ type DB interface {
DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries.
DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
// queue items whose nodes have finished the exit before the indicated time
// returning the total number of deleted items.
DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time) (count int64, err error)
// GetTransferQueueItem gets a graceful exit transfer queue entry.
GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (*TransferQueueItem, error)
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
@ -65,4 +69,8 @@ type DB interface {
GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error)
// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) error
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has
// finished the exit before the indicated time but there are at least one item
// left in the transfer queue.
CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (map[storj.NodeID]int64, error)
}

View File

@ -153,6 +153,36 @@ func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context,
return Error.Wrap(err)
}
// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
// queue items whose nodes have finished the exit before the indicated time
// returning the total number of deleted items.
func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
ctx context.Context, before time.Time) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind(
`DELETE FROM graceful_exit_transfer_queue
WHERE node_id IN (
SELECT id
FROM nodes
WHERE exit_finished_at IS NOT NULL
AND exit_finished_at < ?
)`,
)
res, err := db.db.ExecContext(ctx, statement, before)
if err != nil {
return 0, Error.Wrap(err)
}
count, err := res.RowsAffected()
if err != nil {
return 0, Error.Wrap(err)
}
return count, nil
}
// GetTransferQueueItem gets a graceful exit transfer queue entry.
func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) {
defer mon.Task()(&ctx)(&err)
@ -255,6 +285,45 @@ func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, node
return Error.Wrap(err)
}
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has
// finished the exit before the indicated time but there are at least one item
// left in the transfer queue.
func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (_ map[storj.NodeID]int64, err error) {
defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind(
`SELECT n.id, count(getq.node_id)
FROM nodes as n LEFT JOIN graceful_exit_transfer_queue as getq
ON n.id = getq.node_id
WHERE n.exit_finished_at IS NOT NULL
AND n.exit_finished_at < ?
GROUP BY n.id
HAVING count(getq.node_id) > 0`,
)
rows, err := db.db.QueryContext(ctx, statement, before)
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, Error.Wrap(rows.Close())) }()
nodesItemsCount := make(map[storj.NodeID]int64)
for rows.Next() {
var (
nodeID storj.NodeID
n int64
)
err := rows.Scan(&nodeID, &n)
if err != nil {
return nil, Error.Wrap(err)
}
nodesItemsCount[nodeID] = n
}
return nodesItemsCount, Error.Wrap(rows.Err())
}
func scanRows(rows tagsql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) {
for rows.Next() {
transferQueueItem := &gracefulexit.TransferQueueItem{}

View File

@ -0,0 +1,172 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb_test
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/overlay"
)
func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 7,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
var (
cache = planet.Satellites[0].DB.OverlayCache()
currentTime = time.Now()
)
// mark some of the storagenodes as successful exit
nodeSuccessful1 := planet.StorageNodes[1]
_, err := cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeSuccessful1.ID(),
ExitInitiatedAt: currentTime.Add(-time.Hour),
ExitLoopCompletedAt: currentTime.Add(-30 * time.Minute),
ExitFinishedAt: currentTime.Add(-25 * time.Minute),
ExitSuccess: true,
})
require.NoError(t, err)
nodeSuccessful2 := planet.StorageNodes[2]
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeSuccessful2.ID(),
ExitInitiatedAt: currentTime.Add(-time.Hour),
ExitLoopCompletedAt: currentTime.Add(-17 * time.Minute),
ExitFinishedAt: currentTime.Add(-16 * time.Minute),
ExitSuccess: true,
})
require.NoError(t, err)
nodeSuccessful3 := planet.StorageNodes[3]
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeSuccessful3.ID(),
ExitInitiatedAt: currentTime.Add(-time.Hour),
ExitLoopCompletedAt: currentTime.Add(-9 * time.Minute),
ExitFinishedAt: currentTime.Add(-5 * time.Minute),
ExitSuccess: true,
})
require.NoError(t, err)
// mark some of the storagenodes as failed exit
nodeFailed1 := planet.StorageNodes[4]
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeFailed1.ID(),
ExitInitiatedAt: currentTime.Add(-time.Hour),
ExitLoopCompletedAt: currentTime.Add(-28 * time.Minute),
ExitFinishedAt: currentTime.Add(-20 * time.Minute),
ExitSuccess: true,
})
require.NoError(t, err)
nodeFailed2 := planet.StorageNodes[5]
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeFailed2.ID(),
ExitInitiatedAt: time.Now().Add(-time.Hour),
ExitLoopCompletedAt: time.Now().Add(-17 * time.Minute),
ExitFinishedAt: time.Now().Add(-15 * time.Minute),
ExitSuccess: true,
})
require.NoError(t, err)
nodeWithoutItems := planet.StorageNodes[6]
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeWithoutItems.ID(),
ExitInitiatedAt: time.Now().Add(-time.Hour),
ExitLoopCompletedAt: time.Now().Add(-35 * time.Minute),
ExitFinishedAt: time.Now().Add(-32 * time.Minute),
ExitSuccess: false,
})
require.NoError(t, err)
// add some items to the transfer queue for the exited nodes
queueItems, nodesItems := generateTransferQueueItems(t, []*testplanet.StorageNode{
nodeSuccessful1, nodeSuccessful2, nodeSuccessful3, nodeFailed1, nodeFailed2,
})
gracefulExitDB := planet.Satellites[0].DB.GracefulExit()
err = gracefulExitDB.Enqueue(ctx, queueItems)
require.NoError(t, err)
// count nodes exited before 15 minutes ago
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute))
require.NoError(t, err)
require.Len(t, nodes, 3, "invalid number of nodes which have exited 15 minutes ago")
for id, n := range nodes {
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
}
// count nodes exited before 4 minutes ago
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute))
require.NoError(t, err)
require.Len(t, nodes, 5, "invalid number of nodes which have exited 4 minutes ago")
for id, n := range nodes {
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
}
// delete items of nodes exited before 15 minutes ago
count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute))
require.NoError(t, err)
expectedNumDeletedItems := nodesItems[nodeSuccessful1.ID()] +
nodesItems[nodeSuccessful2.ID()] +
nodesItems[nodeFailed1.ID()]
require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items")
// check that only a few nodes have exited are left with items
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute))
require.NoError(t, err)
require.Len(t, nodes, 2, "invalid number of exited nodes with items")
for id, n := range nodes {
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
assert.NotEqual(t, nodeSuccessful1.ID(), id, "node shouldn't have items")
assert.NotEqual(t, nodeSuccessful2.ID(), id, "node shouldn't have items")
assert.NotEqual(t, nodeFailed1.ID(), id, "node shouldn't have items")
}
// delete items of there rest exited nodes
count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute))
require.NoError(t, err)
expectedNumDeletedItems = nodesItems[nodeSuccessful3.ID()] + nodesItems[nodeFailed2.ID()]
require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items")
// check that there aren't more exited nodes with items
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute))
require.NoError(t, err)
require.Len(t, nodes, 0, "invalid number of exited nodes with items")
})
}
func generateTransferQueueItems(t *testing.T, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) {
getNodeID := func() storj.NodeID {
n := rand.Intn(len(nodes))
return nodes[n].ID()
}
var (
items = make([]gracefulexit.TransferQueueItem, rand.Intn(100)+10)
nodesItems = make(map[storj.NodeID]int64, len(items))
)
for i, item := range items {
item.NodeID = getNodeID()
item.Key = metabase.SegmentKey{byte(rand.Int31n(256))}
item.PieceNum = rand.Int31()
items[i] = item
nodesItems[item.NodeID]++
}
return items, nodesItems
}