diff --git a/cmd/satellite/gracefulexit.go b/cmd/satellite/gracefulexit.go index f08438b47..a3156dd10 100644 --- a/cmd/satellite/gracefulexit.go +++ b/cmd/satellite/gracefulexit.go @@ -150,8 +150,9 @@ func verifyGracefulExitReceipt(ctx context.Context, identity *identity.FullIdent return writeVerificationMessage(true, completed.SatelliteId, completed.NodeId, completed.Completed) } -func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) { - db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"}) +func cleanupGEOrphanedData(ctx context.Context, before time.Time, config gracefulexit.Config) (err error) { + db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database, + satellitedb.Options{ApplicationName: "satellite-gracefulexit"}) if err != nil { return errs.New("error connecting to master database on satellite: %+v", err) } @@ -159,7 +160,7 @@ func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) { err = errs.Combine(err, db.Close()) }() - nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before) + nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before, config.AsOfSystemTimeInterval) if err != nil { return err } @@ -211,12 +212,12 @@ func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) { return nil } - queueTotal, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before) + queueTotal, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before, config.AsOfSystemTimeInterval, config.TransferQueueBatchSize) if err != nil { fmt.Println("Error, NO ITEMS have been deleted from transfer queue") return err } - progressTotal, err := db.GracefulExit().DeleteFinishedExitProgress(ctx, before) + progressTotal, err := db.GracefulExit().DeleteFinishedExitProgress(ctx, before, config.AsOfSystemTimeInterval) if err != nil { fmt.Printf("Error, %d stale entries were deleted from exit progress table. More stale entries might remain.\n", progressTotal) return err diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index be871904f..b13473067 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -754,8 +754,7 @@ func cmdConsistencyGECleanup(cmd *cobra.Command, args []string) error { if before.After(time.Now()) { return errs.New("before flag value cannot be newer than the current time.") } - - return cleanupGEOrphanedData(ctx, before.UTC()) + return cleanupGEOrphanedData(ctx, before.UTC(), runCfg.GracefulExit) } func cmdRestoreTrash(cmd *cobra.Command, args []string) error { diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index bb9af21de..6074642de 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -602,6 +602,9 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int RecvTimeout: time.Minute * 1, MaxOrderLimitSendCount: 3, NodeMinAgeInMonths: 0, + + AsOfSystemTimeInterval: 0, + TransferQueueBatchSize: 1000, }, Metrics: metrics.Config{}, } diff --git a/satellite/gracefulexit/chore_test.go b/satellite/gracefulexit/chore_test.go index 92988a525..75f155f3f 100644 --- a/satellite/gracefulexit/chore_test.go +++ b/satellite/gracefulexit/chore_test.go @@ -269,7 +269,8 @@ func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) { } transferQueueItems = append(transferQueueItems, item) } - err := db.Enqueue(ctx, transferQueueItems) + batchSize := 1000 + err := db.Enqueue(ctx, transferQueueItems, batchSize) require.NoError(b, err) } } diff --git a/satellite/gracefulexit/common.go b/satellite/gracefulexit/common.go index b299f8dc5..71043f0ec 100644 --- a/satellite/gracefulexit/common.go +++ b/satellite/gracefulexit/common.go @@ -38,4 +38,7 @@ type Config struct { RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"2h"` MaxOrderLimitSendCount int `help:"maximum number of order limits a satellite sends to a node before marking piece transfer failed" default:"10"` NodeMinAgeInMonths int `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6"` + + AsOfSystemTimeInterval time.Duration `help:"interval for AS OF SYSTEM TIME clause (crdb specific) to read from db at a specific time in the past " default:"-10s"` + TransferQueueBatchSize int `help:"batch size (crdb specific) for deleting and adding items to the transfer queue" default:"1000"` } diff --git a/satellite/gracefulexit/db.go b/satellite/gracefulexit/db.go index 13e992da3..c13c28a80 100644 --- a/satellite/gracefulexit/db.go +++ b/satellite/gracefulexit/db.go @@ -46,7 +46,7 @@ type DB interface { GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error) // Enqueue batch inserts graceful exit transfer queue entries it does not exist. - Enqueue(ctx context.Context, items []TransferQueueItem) error + Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error // UpdateTransferQueueItem creates a graceful exit transfer queue entry. UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entry. @@ -58,12 +58,12 @@ type DB interface { // DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer // queue items whose nodes have finished the exit before the indicated time // returning the total number of deleted items. - DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time) (count int64, err error) + DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (count int64, err error) // DeleteFinishedExitProgress deletes exit progress entries for nodes that // finished exiting before the indicated time, returns number of deleted entries. - DeleteFinishedExitProgress(ctx context.Context, before time.Time) (count int64, err error) + DeleteFinishedExitProgress(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (count int64, err error) // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. - GetFinishedExitNodes(ctx context.Context, before time.Time) (finishedNodes []storj.NodeID, err error) + GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) // GetTransferQueueItem gets a graceful exit transfer queue entry. GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (*TransferQueueItem, error) // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. @@ -77,5 +77,5 @@ type DB interface { // CountFinishedTransferQueueItemsByNode return a map of the nodes which has // finished the exit before the indicated time but there are at least one item // left in the transfer queue. - CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (map[storj.NodeID]int64, error) + CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (map[storj.NodeID]int64, error) } diff --git a/satellite/gracefulexit/db_test.go b/satellite/gracefulexit/db_test.go index 9fcd5d4f1..c918746ae 100644 --- a/satellite/gracefulexit/db_test.go +++ b/satellite/gracefulexit/db_test.go @@ -100,7 +100,8 @@ func TestTransferQueueItem(t *testing.T) { // test basic create, update, get delete { - err := geDB.Enqueue(ctx, items) + batchSize := 1000 + err := geDB.Enqueue(ctx, items, batchSize) require.NoError(t, err) for _, tqi := range items { diff --git a/satellite/gracefulexit/gracefulexit_test.go b/satellite/gracefulexit/gracefulexit_test.go index 93e2e9dd7..092b4c6cf 100644 --- a/satellite/gracefulexit/gracefulexit_test.go +++ b/satellite/gracefulexit/gracefulexit_test.go @@ -4,6 +4,7 @@ package gracefulexit_test import ( + "fmt" "math/rand" "testing" "time" @@ -11,12 +12,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/satellitedb/satellitedbtest" ) func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) { @@ -46,15 +50,16 @@ func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) { timestamp = timestamp.Add(time.Hour * 24) } threeDays := currentTime.AddDate(0, 0, -days/2).Add(-time.Millisecond) - finishedNodes, err := geDB.GetFinishedExitNodes(ctx, threeDays) + disableAsOfSystemTime := time.Second * 0 + finishedNodes, err := geDB.GetFinishedExitNodes(ctx, threeDays, disableAsOfSystemTime) require.NoError(t, err) require.Len(t, finishedNodes, 3) - finishedNodes, err = geDB.GetFinishedExitNodes(ctx, currentTime) + finishedNodes, err = geDB.GetFinishedExitNodes(ctx, currentTime, disableAsOfSystemTime) require.NoError(t, err) require.Len(t, finishedNodes, 6) - count, err := geDB.DeleteFinishedExitProgress(ctx, threeDays) + count, err := geDB.DeleteFinishedExitProgress(ctx, threeDays, disableAsOfSystemTime) require.NoError(t, err) require.EqualValues(t, 3, count) @@ -71,6 +76,22 @@ func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) { }) } +func TestGracefulExit_HandleAsOfSystemTimeBadInput(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + gracefulexitDB := planet.Satellites[0].DB.GracefulExit() + now := time.Now().UTC() + // explicitly set as of system time to invalid time values and run queries to ensure queries don't break + badTime1 := -1 * time.Nanosecond + _, err := gracefulexitDB.CountFinishedTransferQueueItemsByNode(ctx, now, badTime1) + require.NoError(t, err) + badTime2 := 1 * time.Second + _, err = gracefulexitDB.DeleteFinishedExitProgress(ctx, now, badTime2) + require.NoError(t, err) + }) +} + func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 7, @@ -80,7 +101,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { currentTime = time.Now().UTC() ) - // mark some of the storagenodes as successful exit + // Mark some of the storagenodes as successful exit nodeSuccessful1 := planet.StorageNodes[1] _, err := cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ NodeID: nodeSuccessful1.ID(), @@ -111,14 +132,14 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { }) require.NoError(t, err) - // mark some of the storagenodes as failed exit + // Mark some of the storagenodes as failed exit nodeFailed1 := planet.StorageNodes[4] _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ NodeID: nodeFailed1.ID(), ExitInitiatedAt: currentTime.Add(-time.Hour), ExitLoopCompletedAt: currentTime.Add(-28 * time.Minute), ExitFinishedAt: currentTime.Add(-20 * time.Minute), - ExitSuccess: true, + ExitSuccess: false, }) require.NoError(t, err) @@ -128,7 +149,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { ExitInitiatedAt: currentTime.Add(-time.Hour), ExitLoopCompletedAt: currentTime.Add(-17 * time.Minute), ExitFinishedAt: currentTime.Add(-15 * time.Minute), - ExitSuccess: true, + ExitSuccess: false, }) require.NoError(t, err) @@ -142,17 +163,21 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { }) require.NoError(t, err) - // add some items to the transfer queue for the exited nodes - queueItems, nodesItems := generateTransferQueueItems(t, []*testplanet.StorageNode{ + queueItemsPerNode := 500 + // Add some items to the transfer queue for the exited nodes + queueItems, nodesItems := generateTransferQueueItems(t, queueItemsPerNode, []*testplanet.StorageNode{ nodeSuccessful1, nodeSuccessful2, nodeSuccessful3, nodeFailed1, nodeFailed2, }) gracefulExitDB := planet.Satellites[0].DB.GracefulExit() - err = gracefulExitDB.Enqueue(ctx, queueItems) + batchSize := 1000 + + err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize) require.NoError(t, err) - // count nodes exited before 15 minutes ago - nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute)) + asOfSystemTime := -1 * time.Microsecond + // Count nodes exited before 15 minutes ago + nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 3, "invalid number of nodes which have exited 15 minutes ago") @@ -160,8 +185,8 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { assert.EqualValues(t, nodesItems[id], n, "unexpected number of items") } - // count nodes exited before 4 minutes ago - nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute)) + // Count nodes exited before 4 minutes ago + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 5, "invalid number of nodes which have exited 4 minutes ago") @@ -169,16 +194,16 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { assert.EqualValues(t, nodesItems[id], n, "unexpected number of items") } - // delete items of nodes exited before 15 minutes ago - count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute)) + // Delete items of nodes exited before 15 minutes ago + count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime, batchSize) require.NoError(t, err) expectedNumDeletedItems := nodesItems[nodeSuccessful1.ID()] + nodesItems[nodeSuccessful2.ID()] + nodesItems[nodeFailed1.ID()] - require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items") + require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items") - // check that only a few nodes have exited are left with items - nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute)) + // Check that only a few nodes have exited are left with items + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 2, "invalid number of exited nodes with items") @@ -189,36 +214,283 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { assert.NotEqual(t, nodeFailed1.ID(), id, "node shouldn't have items") } - // delete items of there rest exited nodes - count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute)) + // Delete the rest of the nodes' items + count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute), asOfSystemTime, batchSize) require.NoError(t, err) expectedNumDeletedItems = nodesItems[nodeSuccessful3.ID()] + nodesItems[nodeFailed2.ID()] - require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items") + require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items") - // check that there aren't more exited nodes with items - nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute)) + // Check that there aren't more exited nodes with items + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 0, "invalid number of exited nodes with items") }) } -func generateTransferQueueItems(t *testing.T, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) { - getNodeID := func() storj.NodeID { - n := rand.Intn(len(nodes)) - return nodes[n].ID() +// TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batch +// ensures that deletion works as expected using different batch sizes. +func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + var testCases = []struct { + name string + batchSize int + transferQueueItemsPerNode int + numExitedNodes int + }{ + {"less than complete batch, odd batch", 333, 3, 30}, + {"less than complete batch, even batch", 8888, 222, 40}, + {"over complete batch, odd batch", 3000, 200, 25}, + {"over complete batch, even batch", 1000, 110, 10}, + {"exact batch, odd batch", 1125, 25, 45}, + {"exact batch, even batch", 7200, 1200, 6}, + } + for _, tt := range testCases { + tt := tt + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + var ( + gracefulExitDB = db.GracefulExit() + currentTime = time.Now().UTC() + batchSize = tt.batchSize + numItems = tt.transferQueueItemsPerNode + numExitedNodes = tt.numExitedNodes + ) + + exitedNodeIDs := generateExitedNodes(t, ctx, db, currentTime, numExitedNodes) + queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...) + + // Add some items to the transfer queue for the exited nodes. + err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize) + require.NoError(t, err) + + disableAsOfSystemTime := time.Second * 0 + // Count exited nodes + nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes + count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.EqualValues(t, len(queueItems), count, "invalid number of deleted items") + + // Count exited nodes. At this time there shouldn't be any exited node with + // items in the queue + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes. At this time there shouldn't be any + count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.Zero(t, count, "invalid number of deleted items") + }) + } +} + +func generateExitedNodes(t *testing.T, ctx *testcontext.Context, db satellite.DB, currentTime time.Time, numExitedNodes int) (exitedNodeIDs storj.NodeIDList) { + const ( + addr = "127.0.1.0:8080" + lastNet = "127.0.0" + ) + var ( + cache = db.OverlayCache() + nodeIDsMap = make(map[storj.NodeID]struct{}) + ) + for i := 0; i < numExitedNodes; i++ { + nodeID := generateNodeIDFromPostiveInt(t, i) + exitedNodeIDs = append(exitedNodeIDs, nodeID) + if _, ok := nodeIDsMap[nodeID]; ok { + fmt.Printf("this %v already exists\n", nodeID.Bytes()) + } + nodeIDsMap[nodeID] = struct{}{} + + info := overlay.NodeCheckInInfo{ + NodeID: nodeID, + Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC}, + LastIPPort: addr, + LastNet: lastNet, + Version: &pb.NodeVersion{Version: "v1.0.0"}, + Capacity: &pb.NodeCapacity{}, + IsUp: true, + } + err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + + exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1)) * time.Minute) + _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ + NodeID: nodeID, + ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute), + ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute), + ExitFinishedAt: exitFinishedAt, + ExitSuccess: true, + }) + require.NoError(t, err) + } + require.Equal(t, numExitedNodes, len(nodeIDsMap), "map") + return exitedNodeIDs +} + +// TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch verifies that +// the CRDB batch logic for delete all the transfer queue items of exited nodes +// works as expected. +func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + const ( + addr = "127.0.1.0:8080" + lastNet = "127.0.0" + ) + var ( + numNonExitedNodes = rand.Intn(20) + 1 + numExitedNodes = rand.Intn(10) + 20 + cache = db.OverlayCache() + ) + + for i := 0; i < numNonExitedNodes; i++ { + info := overlay.NodeCheckInInfo{ + NodeID: generateNodeIDFromPostiveInt(t, i), + Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC}, + LastIPPort: addr, + LastNet: lastNet, + Version: &pb.NodeVersion{Version: "v1.0.0"}, + Capacity: &pb.NodeCapacity{}, + IsUp: true, + } + err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + } + + var ( + currentTime = time.Now().UTC() + exitedNodeIDs = make([]storj.NodeID, 0, numNonExitedNodes) + nodeIDsMap = make(map[storj.NodeID]struct{}) + ) + for i := numNonExitedNodes; i < (numNonExitedNodes + numExitedNodes); i++ { + nodeID := generateNodeIDFromPostiveInt(t, i) + exitedNodeIDs = append(exitedNodeIDs, nodeID) + if _, ok := nodeIDsMap[nodeID]; ok { + fmt.Printf("this %v already exists\n", nodeID.Bytes()) + } + nodeIDsMap[nodeID] = struct{}{} + + info := overlay.NodeCheckInInfo{ + NodeID: nodeID, + Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC}, + LastIPPort: addr, + LastNet: lastNet, + Version: &pb.NodeVersion{Version: "v1.0.0"}, + Capacity: &pb.NodeCapacity{}, + IsUp: true, + } + err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + + exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1)) * time.Minute) + _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ + NodeID: nodeID, + ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute), + ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute), + ExitFinishedAt: exitFinishedAt, + ExitSuccess: true, + }) + require.NoError(t, err) + } + + require.Equal(t, numExitedNodes, len(nodeIDsMap), "map") + + gracefulExitDB := db.GracefulExit() + batchSize := 1000 + + queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...) + // Add some items to the transfer queue for the exited nodes. + err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize) + require.NoError(t, err) + + disableAsOfSystemTime := time.Second * 0 + // Count exited nodes + nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes + count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.EqualValues(t, len(queueItems), count, "invalid number of deleted items") + + // Count exited nodes. At this time there shouldn't be any exited node with + // items in the queue + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes. At this time there shouldn't be any + count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.Zero(t, count, "invalid number of deleted items") + }) +} + +// generateTransferQueueItems generates a random number of transfer queue items, +// between 10 and 120, for each passed node. +func generateTransferQueueItems(t *testing.T, itemsPerNode int, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) { + nodeIDs := make([]storj.NodeID, len(nodes)) + for i, n := range nodes { + nodeIDs[i] = n.ID() } - var ( - items = make([]gracefulexit.TransferQueueItem, rand.Intn(100)+10) - nodesItems = make(map[storj.NodeID]int64, len(items)) - ) - for i, item := range items { - item.NodeID = getNodeID() - item.Key = metabase.SegmentKey{byte(i)} - item.PieceNum = int32(i + 1) - items[i] = item + items := generateNTransferQueueItemsPerNode(t, itemsPerNode, nodeIDs...) + + nodesItems := make(map[storj.NodeID]int64, len(nodes)) + for _, item := range items { nodesItems[item.NodeID]++ } return items, nodesItems } + +// generateNTransferQueueItemsPerNode generates n queue items for each nodeID. +func generateNTransferQueueItemsPerNode(t *testing.T, n int, nodeIDs ...storj.NodeID) []gracefulexit.TransferQueueItem { + items := make([]gracefulexit.TransferQueueItem, 0) + for _, nodeID := range nodeIDs { + for i := 0; i < n; i++ { + items = append(items, gracefulexit.TransferQueueItem{ + NodeID: nodeID, + Key: metabase.SegmentKey{byte(rand.Int31n(256))}, + PieceNum: rand.Int31(), + }) + } + } + return items +} + +// generateNodeIDFromPostiveInt generates a specific node ID for val; each val +// value produces a different node ID. +func generateNodeIDFromPostiveInt(t *testing.T, val int) storj.NodeID { + t.Helper() + + if val < 0 { + t.Fatal("cannot generate a node from a negative integer") + } + + nodeID := storj.NodeID{} + idx := 0 + for { + m := val & 255 + nodeID[idx] = byte(m) + + q := val >> 8 + if q == 0 { + break + } + if q < 256 { + nodeID[idx+1] = byte(q) + break + } + + val = q + idx++ + } + + return nodeID +} diff --git a/satellite/gracefulexit/pathcollector.go b/satellite/gracefulexit/pathcollector.go index b5ea811a5..ea8bcc13c 100644 --- a/satellite/gracefulexit/pathcollector.go +++ b/satellite/gracefulexit/pathcollector.go @@ -115,7 +115,7 @@ func (collector *PathCollector) InlineSegment(ctx context.Context, segment *meta func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) { if len(collector.buffer) >= limit { - err = collector.db.Enqueue(ctx, collector.buffer) + err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize) collector.buffer = collector.buffer[:0] return errs.Wrap(err) diff --git a/satellite/satellitedb/database.go b/satellite/satellitedb/database.go index 34e06ca9a..fe8137edc 100644 --- a/satellite/satellitedb/database.go +++ b/satellite/satellitedb/database.go @@ -159,9 +159,10 @@ func (dbc *satelliteDBCollection) getByName(name string) *satelliteDB { } // AsOfSystemTimeClause returns the "AS OF SYSTEM TIME" clause if the DB implementation -// is CockroachDB and the interval is less than 0. +// is CockroachDB and the interval is less than or equal to a negative microsecond +// (CRDB does not support intervals in the negative nanoseconds). func (db *satelliteDB) AsOfSystemTimeClause(interval time.Duration) (asOf string) { - if db.implementation == dbutil.Cockroach && interval < 0 { + if db.implementation == dbutil.Cockroach && interval <= -time.Microsecond { asOf = " AS OF SYSTEM TIME '" + interval.String() + "' " } diff --git a/satellite/satellitedb/gracefulexit.go b/satellite/satellitedb/gracefulexit.go index c95e9c66b..3c1823313 100644 --- a/satellite/satellitedb/gracefulexit.go +++ b/satellite/satellitedb/gracefulexit.go @@ -14,6 +14,7 @@ import ( "github.com/zeebo/errs" "storj.io/common/storj" + "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/private/tagsql" "storj.io/storj/satellite/gracefulexit" @@ -75,8 +76,8 @@ func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID) return progress, Error.Wrap(err) } -// Enqueue batch inserts graceful exit transfer queue entries it does not exist. -func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem) (err error) { +// Enqueue batch inserts graceful exit transfer queue entries if it does not exist. +func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) { defer mon.Task()(&ctx)(&err) sort.Slice(items, func(i, k int) bool { @@ -87,25 +88,38 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran return compare < 0 }) - var nodeIDs []storj.NodeID - var keys [][]byte - var pieceNums []int32 - var rootPieceIDs [][]byte - var durabilities []float64 - for _, item := range items { - nodeIDs = append(nodeIDs, item.NodeID) - keys = append(keys, item.Key) - pieceNums = append(pieceNums, item.PieceNum) - rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes()) - durabilities = append(durabilities, item.DurabilityRatio) - } + for i := 0; i < len(items); i += batchSize { + lowerBound := i + upperBound := lowerBound + batchSize - _, err = db.db.ExecContext(ctx, db.db.Rebind(` + if upperBound > len(items) { + upperBound = len(items) + } + + var nodeIDs []storj.NodeID + var keys [][]byte + var pieceNums []int32 + var rootPieceIDs [][]byte + var durabilities []float64 + for _, item := range items[lowerBound:upperBound] { + nodeIDs = append(nodeIDs, item.NodeID) + keys = append(keys, item.Key) + pieceNums = append(pieceNums, item.PieceNum) + rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes()) + durabilities = append(durabilities, item.DurabilityRatio) + } + + _, err = db.db.ExecContext(ctx, db.db.Rebind(` INSERT INTO graceful_exit_transfer_queue(node_id, path, piece_num, root_piece_id, durability_ratio, queued_at) SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]), unnest($4::bytea[]), unnest($5::float8[]), $6 ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC()) - return Error.Wrap(err) + if err != nil { + return Error.Wrap(err) + } + } + + return nil } // UpdateTransferQueueItem creates a graceful exit transfer queue entry. @@ -161,39 +175,119 @@ func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, // queue items whose nodes have finished the exit before the indicated time // returning the total number of deleted items. func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( - ctx context.Context, before time.Time) (_ int64, err error) { + ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (_ int64, err error) { defer mon.Task()(&ctx)(&err) - statement := db.db.Rebind( - `DELETE FROM graceful_exit_transfer_queue - WHERE node_id IN ( + switch db.db.implementation { + case dbutil.Postgres: + statement := ` + DELETE FROM graceful_exit_transfer_queue + WHERE node_id IN ( + SELECT id + FROM nodes + WHERE exit_finished_at IS NOT NULL + AND exit_finished_at < $1 + )` + res, err := db.db.ExecContext(ctx, statement, before) + if err != nil { + return 0, Error.Wrap(err) + } + + count, err := res.RowsAffected() + if err != nil { + return 0, Error.Wrap(err) + } + + return count, nil + + case dbutil.Cockroach: + asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) + + nodesQuery := ` SELECT id - FROM nodes + FROM nodes ` + asOf + ` WHERE exit_finished_at IS NOT NULL - AND exit_finished_at < ? - )`, + AND exit_finished_at < $1 + LIMIT $2 OFFSET $3 + ` + deleteStmt := ` + DELETE FROM graceful_exit_transfer_queue + WHERE node_id = $1 + LIMIT $2 + ` + var ( + deleteCount int64 + offset int + ) + for { + var nodeIDs storj.NodeIDList + deleteItems := func() (int64, error) { + // Select exited nodes + rows, err := db.db.QueryContext(ctx, nodesQuery, before, batchSize, offset) + if err != nil { + return deleteCount, Error.Wrap(err) + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + count := 0 + for rows.Next() { + var id storj.NodeID + if err = rows.Scan(&id); err != nil { + return deleteCount, Error.Wrap(err) + } + nodeIDs = append(nodeIDs, id) + count++ + } + + if count == batchSize { + offset += count + } else { + offset = -1 // indicates that there aren't more nodes to query + } + + for _, id := range nodeIDs { + for { + res, err := db.db.ExecContext(ctx, deleteStmt, id.Bytes(), batchSize) + if err != nil { + return deleteCount, Error.Wrap(err) + } + count, err := res.RowsAffected() + if err != nil { + return deleteCount, Error.Wrap(err) + } + deleteCount += count + if count < int64(batchSize) { + break + } + } + } + return deleteCount, nil + } + deleteCount, err = deleteItems() + if err != nil { + return deleteCount, err + } + // when offset is negative means that we have get already all the nodes + // which have exited + if offset < 0 { + break + } + } + return deleteCount, nil + } + + return 0, Error.New("unsupported implementation: %s", + dbutil.SchemeForImplementation(db.db.implementation), ) - - res, err := db.db.ExecContext(ctx, statement, before) - if err != nil { - return 0, Error.Wrap(err) - } - - count, err := res.RowsAffected() - if err != nil { - return 0, Error.Wrap(err) - } - - return count, nil } // DeleteFinishedExitProgress deletes exit progress entries for nodes that // finished exiting before the indicated time, returns number of deleted entries. func (db *gracefulexitDB) DeleteFinishedExitProgress( - ctx context.Context, before time.Time) (_ int64, err error) { + ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ int64, err error) { defer mon.Task()(&ctx)(&err) - finishedNodes, err := db.GetFinishedExitNodes(ctx, before) + finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemTimeInterval) if err != nil { return 0, err } @@ -201,14 +295,14 @@ func (db *gracefulexitDB) DeleteFinishedExitProgress( } // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. -func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time) (finishedNodes []storj.NodeID, err error) { +func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) { defer mon.Task()(&ctx)(&err) - stmt := db.db.Rebind( - ` SELECT id FROM nodes - WHERE exit_finished_at IS NOT NULL - AND exit_finished_at < ?`, - ) - rows, err := db.db.Query(ctx, stmt, before.UTC()) + asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) + stmt := `SELECT id FROM nodes ` + asOf + ` + WHERE exit_finished_at IS NOT NULL + AND exit_finished_at < ? + ` + rows, err := db.db.Query(ctx, db.db.Rebind(stmt), before.UTC()) if err != nil { return nil, Error.Wrap(err) } @@ -360,18 +454,19 @@ func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, node // CountFinishedTransferQueueItemsByNode return a map of the nodes which has // finished the exit before the indicated time but there are at least one item // left in the transfer queue. -func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (_ map[storj.NodeID]int64, err error) { +func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ map[storj.NodeID]int64, err error) { defer mon.Task()(&ctx)(&err) - statement := db.db.Rebind( - `SELECT n.id, count(getq.node_id) - FROM nodes as n LEFT JOIN graceful_exit_transfer_queue as getq - ON n.id = getq.node_id + asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) + + query := `SELECT n.id, count(getq.node_id) + FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq + ON n.id = getq.node_id ` + asOf + ` WHERE n.exit_finished_at IS NOT NULL AND n.exit_finished_at < ? - GROUP BY n.id - HAVING count(getq.node_id) > 0`, - ) + GROUP BY n.id` + + statement := db.db.Rebind(query) rows, err := db.db.QueryContext(ctx, statement, before) if err != nil { diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index fb41bfdea..cb5477fcb 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -244,6 +244,9 @@ contact.external-address: "" # if true, skip the first run of GC # garbage-collection.skip-first: true +# interval for AS OF SYSTEM TIME clause (crdb specific) to read from db at a specific time in the past +# graceful-exit.as-of-system-time-interval: -10s + # size of the buffer used to batch inserts into the transfer queue. # graceful-exit.chore-batch-size: 500 @@ -274,6 +277,9 @@ contact.external-address: "" # the minimum duration for receiving a stream from a storage node before timing out # graceful-exit.recv-timeout: 2h0m0s +# batch size (crdb specific) for deleting and adding items to the transfer queue +# graceful-exit.transfer-queue-batch-size: 1000 + # path to the certificate chain for this identity identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert