From 7fb86617fc267c63c9a936992ba1a0770060c8a7 Mon Sep 17 00:00:00 2001 From: Ivan Fraixedes Date: Wed, 10 Feb 2021 19:09:49 +0100 Subject: [PATCH] satellite/satellitedb: Use CRDB AS OF SYSTEM & batch for GE Use the 'AS OF SYSTEM TIME' Cockroach DB clause for the Graceful Exit (a.k.a GE) queries that count the delete the GE queue items of nodes which have already exited the network. Split the subquery used for deleting all the transfer queue items of nodes which has exited when CRDB is used and batch the queries because CRDB struggles when executing in a single query unlike Postgres. The new test which has been added to this commit to verify the CRDB batch logic for deleting all the transfer queue items of the exited nodes has raised that the Enqueue method has to run in baches when CRDB is used otherwise CRDB has return the error "driver: bad connection" when a big a amount of items are passed to be enqueued. This error didn't happen with the current test implementation it was with an initial one that it was creating a big amount of exited nodes and transfer queue items for those nodes. Change-Id: I6a099cdbc515a240596bc93141fea3182c2e50a9 --- cmd/satellite/gracefulexit.go | 11 +- cmd/satellite/main.go | 3 +- private/testplanet/satellite.go | 3 + satellite/gracefulexit/chore_test.go | 3 +- satellite/gracefulexit/common.go | 3 + satellite/gracefulexit/db.go | 10 +- satellite/gracefulexit/db_test.go | 3 +- satellite/gracefulexit/gracefulexit_test.go | 346 +++++++++++++++++--- satellite/gracefulexit/pathcollector.go | 2 +- satellite/satellitedb/database.go | 5 +- satellite/satellitedb/gracefulexit.go | 199 ++++++++--- scripts/testdata/satellite-config.yaml.lock | 6 + 12 files changed, 488 insertions(+), 106 deletions(-) diff --git a/cmd/satellite/gracefulexit.go b/cmd/satellite/gracefulexit.go index f08438b47..a3156dd10 100644 --- a/cmd/satellite/gracefulexit.go +++ b/cmd/satellite/gracefulexit.go @@ -150,8 +150,9 @@ func verifyGracefulExitReceipt(ctx context.Context, identity *identity.FullIdent return writeVerificationMessage(true, completed.SatelliteId, completed.NodeId, completed.Completed) } -func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) { - db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"}) +func cleanupGEOrphanedData(ctx context.Context, before time.Time, config gracefulexit.Config) (err error) { + db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database, + satellitedb.Options{ApplicationName: "satellite-gracefulexit"}) if err != nil { return errs.New("error connecting to master database on satellite: %+v", err) } @@ -159,7 +160,7 @@ func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) { err = errs.Combine(err, db.Close()) }() - nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before) + nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before, config.AsOfSystemTimeInterval) if err != nil { return err } @@ -211,12 +212,12 @@ func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) { return nil } - queueTotal, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before) + queueTotal, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before, config.AsOfSystemTimeInterval, config.TransferQueueBatchSize) if err != nil { fmt.Println("Error, NO ITEMS have been deleted from transfer queue") return err } - progressTotal, err := db.GracefulExit().DeleteFinishedExitProgress(ctx, before) + progressTotal, err := db.GracefulExit().DeleteFinishedExitProgress(ctx, before, config.AsOfSystemTimeInterval) if err != nil { fmt.Printf("Error, %d stale entries were deleted from exit progress table. More stale entries might remain.\n", progressTotal) return err diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index be871904f..b13473067 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -754,8 +754,7 @@ func cmdConsistencyGECleanup(cmd *cobra.Command, args []string) error { if before.After(time.Now()) { return errs.New("before flag value cannot be newer than the current time.") } - - return cleanupGEOrphanedData(ctx, before.UTC()) + return cleanupGEOrphanedData(ctx, before.UTC(), runCfg.GracefulExit) } func cmdRestoreTrash(cmd *cobra.Command, args []string) error { diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index bb9af21de..6074642de 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -602,6 +602,9 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int RecvTimeout: time.Minute * 1, MaxOrderLimitSendCount: 3, NodeMinAgeInMonths: 0, + + AsOfSystemTimeInterval: 0, + TransferQueueBatchSize: 1000, }, Metrics: metrics.Config{}, } diff --git a/satellite/gracefulexit/chore_test.go b/satellite/gracefulexit/chore_test.go index 92988a525..75f155f3f 100644 --- a/satellite/gracefulexit/chore_test.go +++ b/satellite/gracefulexit/chore_test.go @@ -269,7 +269,8 @@ func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) { } transferQueueItems = append(transferQueueItems, item) } - err := db.Enqueue(ctx, transferQueueItems) + batchSize := 1000 + err := db.Enqueue(ctx, transferQueueItems, batchSize) require.NoError(b, err) } } diff --git a/satellite/gracefulexit/common.go b/satellite/gracefulexit/common.go index b299f8dc5..71043f0ec 100644 --- a/satellite/gracefulexit/common.go +++ b/satellite/gracefulexit/common.go @@ -38,4 +38,7 @@ type Config struct { RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"2h"` MaxOrderLimitSendCount int `help:"maximum number of order limits a satellite sends to a node before marking piece transfer failed" default:"10"` NodeMinAgeInMonths int `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6"` + + AsOfSystemTimeInterval time.Duration `help:"interval for AS OF SYSTEM TIME clause (crdb specific) to read from db at a specific time in the past " default:"-10s"` + TransferQueueBatchSize int `help:"batch size (crdb specific) for deleting and adding items to the transfer queue" default:"1000"` } diff --git a/satellite/gracefulexit/db.go b/satellite/gracefulexit/db.go index 13e992da3..c13c28a80 100644 --- a/satellite/gracefulexit/db.go +++ b/satellite/gracefulexit/db.go @@ -46,7 +46,7 @@ type DB interface { GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error) // Enqueue batch inserts graceful exit transfer queue entries it does not exist. - Enqueue(ctx context.Context, items []TransferQueueItem) error + Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error // UpdateTransferQueueItem creates a graceful exit transfer queue entry. UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entry. @@ -58,12 +58,12 @@ type DB interface { // DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer // queue items whose nodes have finished the exit before the indicated time // returning the total number of deleted items. - DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time) (count int64, err error) + DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (count int64, err error) // DeleteFinishedExitProgress deletes exit progress entries for nodes that // finished exiting before the indicated time, returns number of deleted entries. - DeleteFinishedExitProgress(ctx context.Context, before time.Time) (count int64, err error) + DeleteFinishedExitProgress(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (count int64, err error) // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. - GetFinishedExitNodes(ctx context.Context, before time.Time) (finishedNodes []storj.NodeID, err error) + GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) // GetTransferQueueItem gets a graceful exit transfer queue entry. GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (*TransferQueueItem, error) // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. @@ -77,5 +77,5 @@ type DB interface { // CountFinishedTransferQueueItemsByNode return a map of the nodes which has // finished the exit before the indicated time but there are at least one item // left in the transfer queue. - CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (map[storj.NodeID]int64, error) + CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (map[storj.NodeID]int64, error) } diff --git a/satellite/gracefulexit/db_test.go b/satellite/gracefulexit/db_test.go index 9fcd5d4f1..c918746ae 100644 --- a/satellite/gracefulexit/db_test.go +++ b/satellite/gracefulexit/db_test.go @@ -100,7 +100,8 @@ func TestTransferQueueItem(t *testing.T) { // test basic create, update, get delete { - err := geDB.Enqueue(ctx, items) + batchSize := 1000 + err := geDB.Enqueue(ctx, items, batchSize) require.NoError(t, err) for _, tqi := range items { diff --git a/satellite/gracefulexit/gracefulexit_test.go b/satellite/gracefulexit/gracefulexit_test.go index 93e2e9dd7..092b4c6cf 100644 --- a/satellite/gracefulexit/gracefulexit_test.go +++ b/satellite/gracefulexit/gracefulexit_test.go @@ -4,6 +4,7 @@ package gracefulexit_test import ( + "fmt" "math/rand" "testing" "time" @@ -11,12 +12,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/satellitedb/satellitedbtest" ) func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) { @@ -46,15 +50,16 @@ func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) { timestamp = timestamp.Add(time.Hour * 24) } threeDays := currentTime.AddDate(0, 0, -days/2).Add(-time.Millisecond) - finishedNodes, err := geDB.GetFinishedExitNodes(ctx, threeDays) + disableAsOfSystemTime := time.Second * 0 + finishedNodes, err := geDB.GetFinishedExitNodes(ctx, threeDays, disableAsOfSystemTime) require.NoError(t, err) require.Len(t, finishedNodes, 3) - finishedNodes, err = geDB.GetFinishedExitNodes(ctx, currentTime) + finishedNodes, err = geDB.GetFinishedExitNodes(ctx, currentTime, disableAsOfSystemTime) require.NoError(t, err) require.Len(t, finishedNodes, 6) - count, err := geDB.DeleteFinishedExitProgress(ctx, threeDays) + count, err := geDB.DeleteFinishedExitProgress(ctx, threeDays, disableAsOfSystemTime) require.NoError(t, err) require.EqualValues(t, 3, count) @@ -71,6 +76,22 @@ func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) { }) } +func TestGracefulExit_HandleAsOfSystemTimeBadInput(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + gracefulexitDB := planet.Satellites[0].DB.GracefulExit() + now := time.Now().UTC() + // explicitly set as of system time to invalid time values and run queries to ensure queries don't break + badTime1 := -1 * time.Nanosecond + _, err := gracefulexitDB.CountFinishedTransferQueueItemsByNode(ctx, now, badTime1) + require.NoError(t, err) + badTime2 := 1 * time.Second + _, err = gracefulexitDB.DeleteFinishedExitProgress(ctx, now, badTime2) + require.NoError(t, err) + }) +} + func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 7, @@ -80,7 +101,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { currentTime = time.Now().UTC() ) - // mark some of the storagenodes as successful exit + // Mark some of the storagenodes as successful exit nodeSuccessful1 := planet.StorageNodes[1] _, err := cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ NodeID: nodeSuccessful1.ID(), @@ -111,14 +132,14 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { }) require.NoError(t, err) - // mark some of the storagenodes as failed exit + // Mark some of the storagenodes as failed exit nodeFailed1 := planet.StorageNodes[4] _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ NodeID: nodeFailed1.ID(), ExitInitiatedAt: currentTime.Add(-time.Hour), ExitLoopCompletedAt: currentTime.Add(-28 * time.Minute), ExitFinishedAt: currentTime.Add(-20 * time.Minute), - ExitSuccess: true, + ExitSuccess: false, }) require.NoError(t, err) @@ -128,7 +149,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { ExitInitiatedAt: currentTime.Add(-time.Hour), ExitLoopCompletedAt: currentTime.Add(-17 * time.Minute), ExitFinishedAt: currentTime.Add(-15 * time.Minute), - ExitSuccess: true, + ExitSuccess: false, }) require.NoError(t, err) @@ -142,17 +163,21 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { }) require.NoError(t, err) - // add some items to the transfer queue for the exited nodes - queueItems, nodesItems := generateTransferQueueItems(t, []*testplanet.StorageNode{ + queueItemsPerNode := 500 + // Add some items to the transfer queue for the exited nodes + queueItems, nodesItems := generateTransferQueueItems(t, queueItemsPerNode, []*testplanet.StorageNode{ nodeSuccessful1, nodeSuccessful2, nodeSuccessful3, nodeFailed1, nodeFailed2, }) gracefulExitDB := planet.Satellites[0].DB.GracefulExit() - err = gracefulExitDB.Enqueue(ctx, queueItems) + batchSize := 1000 + + err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize) require.NoError(t, err) - // count nodes exited before 15 minutes ago - nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute)) + asOfSystemTime := -1 * time.Microsecond + // Count nodes exited before 15 minutes ago + nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 3, "invalid number of nodes which have exited 15 minutes ago") @@ -160,8 +185,8 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { assert.EqualValues(t, nodesItems[id], n, "unexpected number of items") } - // count nodes exited before 4 minutes ago - nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute)) + // Count nodes exited before 4 minutes ago + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 5, "invalid number of nodes which have exited 4 minutes ago") @@ -169,16 +194,16 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { assert.EqualValues(t, nodesItems[id], n, "unexpected number of items") } - // delete items of nodes exited before 15 minutes ago - count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute)) + // Delete items of nodes exited before 15 minutes ago + count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime, batchSize) require.NoError(t, err) expectedNumDeletedItems := nodesItems[nodeSuccessful1.ID()] + nodesItems[nodeSuccessful2.ID()] + nodesItems[nodeFailed1.ID()] - require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items") + require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items") - // check that only a few nodes have exited are left with items - nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute)) + // Check that only a few nodes have exited are left with items + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 2, "invalid number of exited nodes with items") @@ -189,36 +214,283 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { assert.NotEqual(t, nodeFailed1.ID(), id, "node shouldn't have items") } - // delete items of there rest exited nodes - count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute)) + // Delete the rest of the nodes' items + count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute), asOfSystemTime, batchSize) require.NoError(t, err) expectedNumDeletedItems = nodesItems[nodeSuccessful3.ID()] + nodesItems[nodeFailed2.ID()] - require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items") + require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items") - // check that there aren't more exited nodes with items - nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute)) + // Check that there aren't more exited nodes with items + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime) require.NoError(t, err) require.Len(t, nodes, 0, "invalid number of exited nodes with items") }) } -func generateTransferQueueItems(t *testing.T, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) { - getNodeID := func() storj.NodeID { - n := rand.Intn(len(nodes)) - return nodes[n].ID() +// TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batch +// ensures that deletion works as expected using different batch sizes. +func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + var testCases = []struct { + name string + batchSize int + transferQueueItemsPerNode int + numExitedNodes int + }{ + {"less than complete batch, odd batch", 333, 3, 30}, + {"less than complete batch, even batch", 8888, 222, 40}, + {"over complete batch, odd batch", 3000, 200, 25}, + {"over complete batch, even batch", 1000, 110, 10}, + {"exact batch, odd batch", 1125, 25, 45}, + {"exact batch, even batch", 7200, 1200, 6}, + } + for _, tt := range testCases { + tt := tt + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + var ( + gracefulExitDB = db.GracefulExit() + currentTime = time.Now().UTC() + batchSize = tt.batchSize + numItems = tt.transferQueueItemsPerNode + numExitedNodes = tt.numExitedNodes + ) + + exitedNodeIDs := generateExitedNodes(t, ctx, db, currentTime, numExitedNodes) + queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...) + + // Add some items to the transfer queue for the exited nodes. + err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize) + require.NoError(t, err) + + disableAsOfSystemTime := time.Second * 0 + // Count exited nodes + nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes + count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.EqualValues(t, len(queueItems), count, "invalid number of deleted items") + + // Count exited nodes. At this time there shouldn't be any exited node with + // items in the queue + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes. At this time there shouldn't be any + count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.Zero(t, count, "invalid number of deleted items") + }) + } +} + +func generateExitedNodes(t *testing.T, ctx *testcontext.Context, db satellite.DB, currentTime time.Time, numExitedNodes int) (exitedNodeIDs storj.NodeIDList) { + const ( + addr = "127.0.1.0:8080" + lastNet = "127.0.0" + ) + var ( + cache = db.OverlayCache() + nodeIDsMap = make(map[storj.NodeID]struct{}) + ) + for i := 0; i < numExitedNodes; i++ { + nodeID := generateNodeIDFromPostiveInt(t, i) + exitedNodeIDs = append(exitedNodeIDs, nodeID) + if _, ok := nodeIDsMap[nodeID]; ok { + fmt.Printf("this %v already exists\n", nodeID.Bytes()) + } + nodeIDsMap[nodeID] = struct{}{} + + info := overlay.NodeCheckInInfo{ + NodeID: nodeID, + Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC}, + LastIPPort: addr, + LastNet: lastNet, + Version: &pb.NodeVersion{Version: "v1.0.0"}, + Capacity: &pb.NodeCapacity{}, + IsUp: true, + } + err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + + exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1)) * time.Minute) + _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ + NodeID: nodeID, + ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute), + ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute), + ExitFinishedAt: exitFinishedAt, + ExitSuccess: true, + }) + require.NoError(t, err) + } + require.Equal(t, numExitedNodes, len(nodeIDsMap), "map") + return exitedNodeIDs +} + +// TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch verifies that +// the CRDB batch logic for delete all the transfer queue items of exited nodes +// works as expected. +func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + const ( + addr = "127.0.1.0:8080" + lastNet = "127.0.0" + ) + var ( + numNonExitedNodes = rand.Intn(20) + 1 + numExitedNodes = rand.Intn(10) + 20 + cache = db.OverlayCache() + ) + + for i := 0; i < numNonExitedNodes; i++ { + info := overlay.NodeCheckInInfo{ + NodeID: generateNodeIDFromPostiveInt(t, i), + Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC}, + LastIPPort: addr, + LastNet: lastNet, + Version: &pb.NodeVersion{Version: "v1.0.0"}, + Capacity: &pb.NodeCapacity{}, + IsUp: true, + } + err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + } + + var ( + currentTime = time.Now().UTC() + exitedNodeIDs = make([]storj.NodeID, 0, numNonExitedNodes) + nodeIDsMap = make(map[storj.NodeID]struct{}) + ) + for i := numNonExitedNodes; i < (numNonExitedNodes + numExitedNodes); i++ { + nodeID := generateNodeIDFromPostiveInt(t, i) + exitedNodeIDs = append(exitedNodeIDs, nodeID) + if _, ok := nodeIDsMap[nodeID]; ok { + fmt.Printf("this %v already exists\n", nodeID.Bytes()) + } + nodeIDsMap[nodeID] = struct{}{} + + info := overlay.NodeCheckInInfo{ + NodeID: nodeID, + Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC}, + LastIPPort: addr, + LastNet: lastNet, + Version: &pb.NodeVersion{Version: "v1.0.0"}, + Capacity: &pb.NodeCapacity{}, + IsUp: true, + } + err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{}) + require.NoError(t, err) + + exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1)) * time.Minute) + _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ + NodeID: nodeID, + ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute), + ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute), + ExitFinishedAt: exitFinishedAt, + ExitSuccess: true, + }) + require.NoError(t, err) + } + + require.Equal(t, numExitedNodes, len(nodeIDsMap), "map") + + gracefulExitDB := db.GracefulExit() + batchSize := 1000 + + queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...) + // Add some items to the transfer queue for the exited nodes. + err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize) + require.NoError(t, err) + + disableAsOfSystemTime := time.Second * 0 + // Count exited nodes + nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes + count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.EqualValues(t, len(queueItems), count, "invalid number of deleted items") + + // Count exited nodes. At this time there shouldn't be any exited node with + // items in the queue + nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime) + require.NoError(t, err) + require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes") + + // Delete items of the exited nodes. At this time there shouldn't be any + count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize) + require.NoError(t, err) + require.Zero(t, count, "invalid number of deleted items") + }) +} + +// generateTransferQueueItems generates a random number of transfer queue items, +// between 10 and 120, for each passed node. +func generateTransferQueueItems(t *testing.T, itemsPerNode int, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) { + nodeIDs := make([]storj.NodeID, len(nodes)) + for i, n := range nodes { + nodeIDs[i] = n.ID() } - var ( - items = make([]gracefulexit.TransferQueueItem, rand.Intn(100)+10) - nodesItems = make(map[storj.NodeID]int64, len(items)) - ) - for i, item := range items { - item.NodeID = getNodeID() - item.Key = metabase.SegmentKey{byte(i)} - item.PieceNum = int32(i + 1) - items[i] = item + items := generateNTransferQueueItemsPerNode(t, itemsPerNode, nodeIDs...) + + nodesItems := make(map[storj.NodeID]int64, len(nodes)) + for _, item := range items { nodesItems[item.NodeID]++ } return items, nodesItems } + +// generateNTransferQueueItemsPerNode generates n queue items for each nodeID. +func generateNTransferQueueItemsPerNode(t *testing.T, n int, nodeIDs ...storj.NodeID) []gracefulexit.TransferQueueItem { + items := make([]gracefulexit.TransferQueueItem, 0) + for _, nodeID := range nodeIDs { + for i := 0; i < n; i++ { + items = append(items, gracefulexit.TransferQueueItem{ + NodeID: nodeID, + Key: metabase.SegmentKey{byte(rand.Int31n(256))}, + PieceNum: rand.Int31(), + }) + } + } + return items +} + +// generateNodeIDFromPostiveInt generates a specific node ID for val; each val +// value produces a different node ID. +func generateNodeIDFromPostiveInt(t *testing.T, val int) storj.NodeID { + t.Helper() + + if val < 0 { + t.Fatal("cannot generate a node from a negative integer") + } + + nodeID := storj.NodeID{} + idx := 0 + for { + m := val & 255 + nodeID[idx] = byte(m) + + q := val >> 8 + if q == 0 { + break + } + if q < 256 { + nodeID[idx+1] = byte(q) + break + } + + val = q + idx++ + } + + return nodeID +} diff --git a/satellite/gracefulexit/pathcollector.go b/satellite/gracefulexit/pathcollector.go index b5ea811a5..ea8bcc13c 100644 --- a/satellite/gracefulexit/pathcollector.go +++ b/satellite/gracefulexit/pathcollector.go @@ -115,7 +115,7 @@ func (collector *PathCollector) InlineSegment(ctx context.Context, segment *meta func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) { if len(collector.buffer) >= limit { - err = collector.db.Enqueue(ctx, collector.buffer) + err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize) collector.buffer = collector.buffer[:0] return errs.Wrap(err) diff --git a/satellite/satellitedb/database.go b/satellite/satellitedb/database.go index 34e06ca9a..fe8137edc 100644 --- a/satellite/satellitedb/database.go +++ b/satellite/satellitedb/database.go @@ -159,9 +159,10 @@ func (dbc *satelliteDBCollection) getByName(name string) *satelliteDB { } // AsOfSystemTimeClause returns the "AS OF SYSTEM TIME" clause if the DB implementation -// is CockroachDB and the interval is less than 0. +// is CockroachDB and the interval is less than or equal to a negative microsecond +// (CRDB does not support intervals in the negative nanoseconds). func (db *satelliteDB) AsOfSystemTimeClause(interval time.Duration) (asOf string) { - if db.implementation == dbutil.Cockroach && interval < 0 { + if db.implementation == dbutil.Cockroach && interval <= -time.Microsecond { asOf = " AS OF SYSTEM TIME '" + interval.String() + "' " } diff --git a/satellite/satellitedb/gracefulexit.go b/satellite/satellitedb/gracefulexit.go index c95e9c66b..3c1823313 100644 --- a/satellite/satellitedb/gracefulexit.go +++ b/satellite/satellitedb/gracefulexit.go @@ -14,6 +14,7 @@ import ( "github.com/zeebo/errs" "storj.io/common/storj" + "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/private/tagsql" "storj.io/storj/satellite/gracefulexit" @@ -75,8 +76,8 @@ func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID) return progress, Error.Wrap(err) } -// Enqueue batch inserts graceful exit transfer queue entries it does not exist. -func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem) (err error) { +// Enqueue batch inserts graceful exit transfer queue entries if it does not exist. +func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) { defer mon.Task()(&ctx)(&err) sort.Slice(items, func(i, k int) bool { @@ -87,25 +88,38 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran return compare < 0 }) - var nodeIDs []storj.NodeID - var keys [][]byte - var pieceNums []int32 - var rootPieceIDs [][]byte - var durabilities []float64 - for _, item := range items { - nodeIDs = append(nodeIDs, item.NodeID) - keys = append(keys, item.Key) - pieceNums = append(pieceNums, item.PieceNum) - rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes()) - durabilities = append(durabilities, item.DurabilityRatio) - } + for i := 0; i < len(items); i += batchSize { + lowerBound := i + upperBound := lowerBound + batchSize - _, err = db.db.ExecContext(ctx, db.db.Rebind(` + if upperBound > len(items) { + upperBound = len(items) + } + + var nodeIDs []storj.NodeID + var keys [][]byte + var pieceNums []int32 + var rootPieceIDs [][]byte + var durabilities []float64 + for _, item := range items[lowerBound:upperBound] { + nodeIDs = append(nodeIDs, item.NodeID) + keys = append(keys, item.Key) + pieceNums = append(pieceNums, item.PieceNum) + rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes()) + durabilities = append(durabilities, item.DurabilityRatio) + } + + _, err = db.db.ExecContext(ctx, db.db.Rebind(` INSERT INTO graceful_exit_transfer_queue(node_id, path, piece_num, root_piece_id, durability_ratio, queued_at) SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]), unnest($4::bytea[]), unnest($5::float8[]), $6 ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC()) - return Error.Wrap(err) + if err != nil { + return Error.Wrap(err) + } + } + + return nil } // UpdateTransferQueueItem creates a graceful exit transfer queue entry. @@ -161,39 +175,119 @@ func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, // queue items whose nodes have finished the exit before the indicated time // returning the total number of deleted items. func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( - ctx context.Context, before time.Time) (_ int64, err error) { + ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (_ int64, err error) { defer mon.Task()(&ctx)(&err) - statement := db.db.Rebind( - `DELETE FROM graceful_exit_transfer_queue - WHERE node_id IN ( + switch db.db.implementation { + case dbutil.Postgres: + statement := ` + DELETE FROM graceful_exit_transfer_queue + WHERE node_id IN ( + SELECT id + FROM nodes + WHERE exit_finished_at IS NOT NULL + AND exit_finished_at < $1 + )` + res, err := db.db.ExecContext(ctx, statement, before) + if err != nil { + return 0, Error.Wrap(err) + } + + count, err := res.RowsAffected() + if err != nil { + return 0, Error.Wrap(err) + } + + return count, nil + + case dbutil.Cockroach: + asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) + + nodesQuery := ` SELECT id - FROM nodes + FROM nodes ` + asOf + ` WHERE exit_finished_at IS NOT NULL - AND exit_finished_at < ? - )`, + AND exit_finished_at < $1 + LIMIT $2 OFFSET $3 + ` + deleteStmt := ` + DELETE FROM graceful_exit_transfer_queue + WHERE node_id = $1 + LIMIT $2 + ` + var ( + deleteCount int64 + offset int + ) + for { + var nodeIDs storj.NodeIDList + deleteItems := func() (int64, error) { + // Select exited nodes + rows, err := db.db.QueryContext(ctx, nodesQuery, before, batchSize, offset) + if err != nil { + return deleteCount, Error.Wrap(err) + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + count := 0 + for rows.Next() { + var id storj.NodeID + if err = rows.Scan(&id); err != nil { + return deleteCount, Error.Wrap(err) + } + nodeIDs = append(nodeIDs, id) + count++ + } + + if count == batchSize { + offset += count + } else { + offset = -1 // indicates that there aren't more nodes to query + } + + for _, id := range nodeIDs { + for { + res, err := db.db.ExecContext(ctx, deleteStmt, id.Bytes(), batchSize) + if err != nil { + return deleteCount, Error.Wrap(err) + } + count, err := res.RowsAffected() + if err != nil { + return deleteCount, Error.Wrap(err) + } + deleteCount += count + if count < int64(batchSize) { + break + } + } + } + return deleteCount, nil + } + deleteCount, err = deleteItems() + if err != nil { + return deleteCount, err + } + // when offset is negative means that we have get already all the nodes + // which have exited + if offset < 0 { + break + } + } + return deleteCount, nil + } + + return 0, Error.New("unsupported implementation: %s", + dbutil.SchemeForImplementation(db.db.implementation), ) - - res, err := db.db.ExecContext(ctx, statement, before) - if err != nil { - return 0, Error.Wrap(err) - } - - count, err := res.RowsAffected() - if err != nil { - return 0, Error.Wrap(err) - } - - return count, nil } // DeleteFinishedExitProgress deletes exit progress entries for nodes that // finished exiting before the indicated time, returns number of deleted entries. func (db *gracefulexitDB) DeleteFinishedExitProgress( - ctx context.Context, before time.Time) (_ int64, err error) { + ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ int64, err error) { defer mon.Task()(&ctx)(&err) - finishedNodes, err := db.GetFinishedExitNodes(ctx, before) + finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemTimeInterval) if err != nil { return 0, err } @@ -201,14 +295,14 @@ func (db *gracefulexitDB) DeleteFinishedExitProgress( } // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. -func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time) (finishedNodes []storj.NodeID, err error) { +func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) { defer mon.Task()(&ctx)(&err) - stmt := db.db.Rebind( - ` SELECT id FROM nodes - WHERE exit_finished_at IS NOT NULL - AND exit_finished_at < ?`, - ) - rows, err := db.db.Query(ctx, stmt, before.UTC()) + asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) + stmt := `SELECT id FROM nodes ` + asOf + ` + WHERE exit_finished_at IS NOT NULL + AND exit_finished_at < ? + ` + rows, err := db.db.Query(ctx, db.db.Rebind(stmt), before.UTC()) if err != nil { return nil, Error.Wrap(err) } @@ -360,18 +454,19 @@ func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, node // CountFinishedTransferQueueItemsByNode return a map of the nodes which has // finished the exit before the indicated time but there are at least one item // left in the transfer queue. -func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (_ map[storj.NodeID]int64, err error) { +func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ map[storj.NodeID]int64, err error) { defer mon.Task()(&ctx)(&err) - statement := db.db.Rebind( - `SELECT n.id, count(getq.node_id) - FROM nodes as n LEFT JOIN graceful_exit_transfer_queue as getq - ON n.id = getq.node_id + asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) + + query := `SELECT n.id, count(getq.node_id) + FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq + ON n.id = getq.node_id ` + asOf + ` WHERE n.exit_finished_at IS NOT NULL AND n.exit_finished_at < ? - GROUP BY n.id - HAVING count(getq.node_id) > 0`, - ) + GROUP BY n.id` + + statement := db.db.Rebind(query) rows, err := db.db.QueryContext(ctx, statement, before) if err != nil { diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index fb41bfdea..cb5477fcb 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -244,6 +244,9 @@ contact.external-address: "" # if true, skip the first run of GC # garbage-collection.skip-first: true +# interval for AS OF SYSTEM TIME clause (crdb specific) to read from db at a specific time in the past +# graceful-exit.as-of-system-time-interval: -10s + # size of the buffer used to batch inserts into the transfer queue. # graceful-exit.chore-batch-size: 500 @@ -274,6 +277,9 @@ contact.external-address: "" # the minimum duration for receiving a stream from a storage node before timing out # graceful-exit.recv-timeout: 2h0m0s +# batch size (crdb specific) for deleting and adding items to the transfer queue +# graceful-exit.transfer-queue-batch-size: 1000 + # path to the certificate chain for this identity identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert