satellite/satellitedb: Use CRDB AS OF SYSTEM & batch for GE

Use the 'AS OF SYSTEM TIME' Cockroach DB clause for the Graceful Exit
(a.k.a GE) queries that count the delete the GE queue items of nodes
which have already exited the network.

Split the subquery used for deleting all the transfer queue items of
nodes which has exited when CRDB is used and batch the queries because
CRDB struggles when executing in a single query unlike Postgres.

The new test which has been added to this commit to verify the CRDB
batch logic for deleting all the transfer queue items of the exited
nodes has raised that the Enqueue method has to run in baches when CRDB
is used otherwise CRDB has return the error "driver: bad connection"
when a big a amount of items are passed to be enqueued. This error
didn't happen with the current test implementation it was with an
initial one that it was creating a big amount of exited nodes and
transfer queue items for those nodes.

Change-Id: I6a099cdbc515a240596bc93141fea3182c2e50a9
This commit is contained in:
Ivan Fraixedes 2021-02-10 19:09:49 +01:00 committed by Natalie Villasana
parent 6161436d8b
commit 7fb86617fc
12 changed files with 488 additions and 106 deletions

View File

@ -150,8 +150,9 @@ func verifyGracefulExitReceipt(ctx context.Context, identity *identity.FullIdent
return writeVerificationMessage(true, completed.SatelliteId, completed.NodeId, completed.Completed) return writeVerificationMessage(true, completed.SatelliteId, completed.NodeId, completed.Completed)
} }
func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) { func cleanupGEOrphanedData(ctx context.Context, before time.Time, config gracefulexit.Config) (err error) {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"}) db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database,
satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil { if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err) return errs.New("error connecting to master database on satellite: %+v", err)
} }
@ -159,7 +160,7 @@ func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) {
err = errs.Combine(err, db.Close()) err = errs.Combine(err, db.Close())
}() }()
nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before) nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before, config.AsOfSystemTimeInterval)
if err != nil { if err != nil {
return err return err
} }
@ -211,12 +212,12 @@ func cleanupGEOrphanedData(ctx context.Context, before time.Time) (err error) {
return nil return nil
} }
queueTotal, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before) queueTotal, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before, config.AsOfSystemTimeInterval, config.TransferQueueBatchSize)
if err != nil { if err != nil {
fmt.Println("Error, NO ITEMS have been deleted from transfer queue") fmt.Println("Error, NO ITEMS have been deleted from transfer queue")
return err return err
} }
progressTotal, err := db.GracefulExit().DeleteFinishedExitProgress(ctx, before) progressTotal, err := db.GracefulExit().DeleteFinishedExitProgress(ctx, before, config.AsOfSystemTimeInterval)
if err != nil { if err != nil {
fmt.Printf("Error, %d stale entries were deleted from exit progress table. More stale entries might remain.\n", progressTotal) fmt.Printf("Error, %d stale entries were deleted from exit progress table. More stale entries might remain.\n", progressTotal)
return err return err

View File

@ -754,8 +754,7 @@ func cmdConsistencyGECleanup(cmd *cobra.Command, args []string) error {
if before.After(time.Now()) { if before.After(time.Now()) {
return errs.New("before flag value cannot be newer than the current time.") return errs.New("before flag value cannot be newer than the current time.")
} }
return cleanupGEOrphanedData(ctx, before.UTC(), runCfg.GracefulExit)
return cleanupGEOrphanedData(ctx, before.UTC())
} }
func cmdRestoreTrash(cmd *cobra.Command, args []string) error { func cmdRestoreTrash(cmd *cobra.Command, args []string) error {

View File

@ -602,6 +602,9 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
RecvTimeout: time.Minute * 1, RecvTimeout: time.Minute * 1,
MaxOrderLimitSendCount: 3, MaxOrderLimitSendCount: 3,
NodeMinAgeInMonths: 0, NodeMinAgeInMonths: 0,
AsOfSystemTimeInterval: 0,
TransferQueueBatchSize: 1000,
}, },
Metrics: metrics.Config{}, Metrics: metrics.Config{},
} }

View File

@ -269,7 +269,8 @@ func batch(ctx context.Context, b *testing.B, db gracefulexit.DB, size int) {
} }
transferQueueItems = append(transferQueueItems, item) transferQueueItems = append(transferQueueItems, item)
} }
err := db.Enqueue(ctx, transferQueueItems) batchSize := 1000
err := db.Enqueue(ctx, transferQueueItems, batchSize)
require.NoError(b, err) require.NoError(b, err)
} }
} }

View File

@ -38,4 +38,7 @@ type Config struct {
RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"2h"` RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"2h"`
MaxOrderLimitSendCount int `help:"maximum number of order limits a satellite sends to a node before marking piece transfer failed" default:"10"` MaxOrderLimitSendCount int `help:"maximum number of order limits a satellite sends to a node before marking piece transfer failed" default:"10"`
NodeMinAgeInMonths int `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6"` NodeMinAgeInMonths int `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6"`
AsOfSystemTimeInterval time.Duration `help:"interval for AS OF SYSTEM TIME clause (crdb specific) to read from db at a specific time in the past " default:"-10s"`
TransferQueueBatchSize int `help:"batch size (crdb specific) for deleting and adding items to the transfer queue" default:"1000"`
} }

View File

@ -46,7 +46,7 @@ type DB interface {
GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error) GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error)
// Enqueue batch inserts graceful exit transfer queue entries it does not exist. // Enqueue batch inserts graceful exit transfer queue entries it does not exist.
Enqueue(ctx context.Context, items []TransferQueueItem) error Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error
// UpdateTransferQueueItem creates a graceful exit transfer queue entry. // UpdateTransferQueueItem creates a graceful exit transfer queue entry.
UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error
// DeleteTransferQueueItem deletes a graceful exit transfer queue entry. // DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
@ -58,12 +58,12 @@ type DB interface {
// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer // DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
// queue items whose nodes have finished the exit before the indicated time // queue items whose nodes have finished the exit before the indicated time
// returning the total number of deleted items. // returning the total number of deleted items.
DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time) (count int64, err error) DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (count int64, err error)
// DeleteFinishedExitProgress deletes exit progress entries for nodes that // DeleteFinishedExitProgress deletes exit progress entries for nodes that
// finished exiting before the indicated time, returns number of deleted entries. // finished exiting before the indicated time, returns number of deleted entries.
DeleteFinishedExitProgress(ctx context.Context, before time.Time) (count int64, err error) DeleteFinishedExitProgress(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (count int64, err error)
// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
GetFinishedExitNodes(ctx context.Context, before time.Time) (finishedNodes []storj.NodeID, err error) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error)
// GetTransferQueueItem gets a graceful exit transfer queue entry. // GetTransferQueueItem gets a graceful exit transfer queue entry.
GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (*TransferQueueItem, error) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, key metabase.SegmentKey, pieceNum int32) (*TransferQueueItem, error)
// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
@ -77,5 +77,5 @@ type DB interface {
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has // CountFinishedTransferQueueItemsByNode return a map of the nodes which has
// finished the exit before the indicated time but there are at least one item // finished the exit before the indicated time but there are at least one item
// left in the transfer queue. // left in the transfer queue.
CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (map[storj.NodeID]int64, error) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (map[storj.NodeID]int64, error)
} }

View File

@ -100,7 +100,8 @@ func TestTransferQueueItem(t *testing.T) {
// test basic create, update, get delete // test basic create, update, get delete
{ {
err := geDB.Enqueue(ctx, items) batchSize := 1000
err := geDB.Enqueue(ctx, items, batchSize)
require.NoError(t, err) require.NoError(t, err)
for _, tqi := range items { for _, tqi := range items {

View File

@ -4,6 +4,7 @@
package gracefulexit_test package gracefulexit_test
import ( import (
"fmt"
"math/rand" "math/rand"
"testing" "testing"
"time" "time"
@ -11,12 +12,15 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/storj/private/testplanet" "storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
) )
func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) { func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) {
@ -46,15 +50,16 @@ func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) {
timestamp = timestamp.Add(time.Hour * 24) timestamp = timestamp.Add(time.Hour * 24)
} }
threeDays := currentTime.AddDate(0, 0, -days/2).Add(-time.Millisecond) threeDays := currentTime.AddDate(0, 0, -days/2).Add(-time.Millisecond)
finishedNodes, err := geDB.GetFinishedExitNodes(ctx, threeDays) disableAsOfSystemTime := time.Second * 0
finishedNodes, err := geDB.GetFinishedExitNodes(ctx, threeDays, disableAsOfSystemTime)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, finishedNodes, 3) require.Len(t, finishedNodes, 3)
finishedNodes, err = geDB.GetFinishedExitNodes(ctx, currentTime) finishedNodes, err = geDB.GetFinishedExitNodes(ctx, currentTime, disableAsOfSystemTime)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, finishedNodes, 6) require.Len(t, finishedNodes, 6)
count, err := geDB.DeleteFinishedExitProgress(ctx, threeDays) count, err := geDB.DeleteFinishedExitProgress(ctx, threeDays, disableAsOfSystemTime)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 3, count) require.EqualValues(t, 3, count)
@ -71,6 +76,22 @@ func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) {
}) })
} }
func TestGracefulExit_HandleAsOfSystemTimeBadInput(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
gracefulexitDB := planet.Satellites[0].DB.GracefulExit()
now := time.Now().UTC()
// explicitly set as of system time to invalid time values and run queries to ensure queries don't break
badTime1 := -1 * time.Nanosecond
_, err := gracefulexitDB.CountFinishedTransferQueueItemsByNode(ctx, now, badTime1)
require.NoError(t, err)
badTime2 := 1 * time.Second
_, err = gracefulexitDB.DeleteFinishedExitProgress(ctx, now, badTime2)
require.NoError(t, err)
})
}
func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) { func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 7, SatelliteCount: 1, StorageNodeCount: 7,
@ -80,7 +101,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
currentTime = time.Now().UTC() currentTime = time.Now().UTC()
) )
// mark some of the storagenodes as successful exit // Mark some of the storagenodes as successful exit
nodeSuccessful1 := planet.StorageNodes[1] nodeSuccessful1 := planet.StorageNodes[1]
_, err := cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ _, err := cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeSuccessful1.ID(), NodeID: nodeSuccessful1.ID(),
@ -111,14 +132,14 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
// mark some of the storagenodes as failed exit // Mark some of the storagenodes as failed exit
nodeFailed1 := planet.StorageNodes[4] nodeFailed1 := planet.StorageNodes[4]
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{ _, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeFailed1.ID(), NodeID: nodeFailed1.ID(),
ExitInitiatedAt: currentTime.Add(-time.Hour), ExitInitiatedAt: currentTime.Add(-time.Hour),
ExitLoopCompletedAt: currentTime.Add(-28 * time.Minute), ExitLoopCompletedAt: currentTime.Add(-28 * time.Minute),
ExitFinishedAt: currentTime.Add(-20 * time.Minute), ExitFinishedAt: currentTime.Add(-20 * time.Minute),
ExitSuccess: true, ExitSuccess: false,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -128,7 +149,7 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
ExitInitiatedAt: currentTime.Add(-time.Hour), ExitInitiatedAt: currentTime.Add(-time.Hour),
ExitLoopCompletedAt: currentTime.Add(-17 * time.Minute), ExitLoopCompletedAt: currentTime.Add(-17 * time.Minute),
ExitFinishedAt: currentTime.Add(-15 * time.Minute), ExitFinishedAt: currentTime.Add(-15 * time.Minute),
ExitSuccess: true, ExitSuccess: false,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -142,17 +163,21 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
// add some items to the transfer queue for the exited nodes queueItemsPerNode := 500
queueItems, nodesItems := generateTransferQueueItems(t, []*testplanet.StorageNode{ // Add some items to the transfer queue for the exited nodes
queueItems, nodesItems := generateTransferQueueItems(t, queueItemsPerNode, []*testplanet.StorageNode{
nodeSuccessful1, nodeSuccessful2, nodeSuccessful3, nodeFailed1, nodeFailed2, nodeSuccessful1, nodeSuccessful2, nodeSuccessful3, nodeFailed1, nodeFailed2,
}) })
gracefulExitDB := planet.Satellites[0].DB.GracefulExit() gracefulExitDB := planet.Satellites[0].DB.GracefulExit()
err = gracefulExitDB.Enqueue(ctx, queueItems) batchSize := 1000
err = gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
require.NoError(t, err) require.NoError(t, err)
// count nodes exited before 15 minutes ago asOfSystemTime := -1 * time.Microsecond
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute)) // Count nodes exited before 15 minutes ago
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, nodes, 3, "invalid number of nodes which have exited 15 minutes ago") require.Len(t, nodes, 3, "invalid number of nodes which have exited 15 minutes ago")
@ -160,8 +185,8 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items") assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
} }
// count nodes exited before 4 minutes ago // Count nodes exited before 4 minutes ago
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute)) nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(-4*time.Minute), asOfSystemTime)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, nodes, 5, "invalid number of nodes which have exited 4 minutes ago") require.Len(t, nodes, 5, "invalid number of nodes which have exited 4 minutes ago")
@ -169,16 +194,16 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
assert.EqualValues(t, nodesItems[id], n, "unexpected number of items") assert.EqualValues(t, nodesItems[id], n, "unexpected number of items")
} }
// delete items of nodes exited before 15 minutes ago // Delete items of nodes exited before 15 minutes ago
count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute)) count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), asOfSystemTime, batchSize)
require.NoError(t, err) require.NoError(t, err)
expectedNumDeletedItems := nodesItems[nodeSuccessful1.ID()] + expectedNumDeletedItems := nodesItems[nodeSuccessful1.ID()] +
nodesItems[nodeSuccessful2.ID()] + nodesItems[nodeSuccessful2.ID()] +
nodesItems[nodeFailed1.ID()] nodesItems[nodeFailed1.ID()]
require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items") require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items")
// check that only a few nodes have exited are left with items // Check that only a few nodes have exited are left with items
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute)) nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, nodes, 2, "invalid number of exited nodes with items") require.Len(t, nodes, 2, "invalid number of exited nodes with items")
@ -189,36 +214,283 @@ func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
assert.NotEqual(t, nodeFailed1.ID(), id, "node shouldn't have items") assert.NotEqual(t, nodeFailed1.ID(), id, "node shouldn't have items")
} }
// delete items of there rest exited nodes // Delete the rest of the nodes' items
count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute)) count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(time.Minute), asOfSystemTime, batchSize)
require.NoError(t, err) require.NoError(t, err)
expectedNumDeletedItems = nodesItems[nodeSuccessful3.ID()] + nodesItems[nodeFailed2.ID()] expectedNumDeletedItems = nodesItems[nodeSuccessful3.ID()] + nodesItems[nodeFailed2.ID()]
require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of delet items") require.EqualValues(t, expectedNumDeletedItems, count, "invalid number of deleted items")
// check that there aren't more exited nodes with items // Check that there aren't more exited nodes with items
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute)) nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime.Add(time.Minute), asOfSystemTime)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, nodes, 0, "invalid number of exited nodes with items") require.Len(t, nodes, 0, "invalid number of exited nodes with items")
}) })
} }
func generateTransferQueueItems(t *testing.T, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) { // TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batch
getNodeID := func() storj.NodeID { // ensures that deletion works as expected using different batch sizes.
n := rand.Intn(len(nodes)) func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(t *testing.T) {
return nodes[n].ID() rand.Seed(time.Now().UnixNano())
var testCases = []struct {
name string
batchSize int
transferQueueItemsPerNode int
numExitedNodes int
}{
{"less than complete batch, odd batch", 333, 3, 30},
{"less than complete batch, even batch", 8888, 222, 40},
{"over complete batch, odd batch", 3000, 200, 25},
{"over complete batch, even batch", 1000, 110, 10},
{"exact batch, odd batch", 1125, 25, 45},
{"exact batch, even batch", 7200, 1200, 6},
}
for _, tt := range testCases {
tt := tt
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
var (
gracefulExitDB = db.GracefulExit()
currentTime = time.Now().UTC()
batchSize = tt.batchSize
numItems = tt.transferQueueItemsPerNode
numExitedNodes = tt.numExitedNodes
)
exitedNodeIDs := generateExitedNodes(t, ctx, db, currentTime, numExitedNodes)
queueItems := generateNTransferQueueItemsPerNode(t, numItems, exitedNodeIDs...)
// Add some items to the transfer queue for the exited nodes.
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
require.NoError(t, err)
disableAsOfSystemTime := time.Second * 0
// Count exited nodes
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
require.NoError(t, err)
require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes")
// Delete items of the exited nodes
count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize)
require.NoError(t, err)
require.EqualValues(t, len(queueItems), count, "invalid number of deleted items")
// Count exited nodes. At this time there shouldn't be any exited node with
// items in the queue
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
require.NoError(t, err)
require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes")
// Delete items of the exited nodes. At this time there shouldn't be any
count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize)
require.NoError(t, err)
require.Zero(t, count, "invalid number of deleted items")
})
}
}
func generateExitedNodes(t *testing.T, ctx *testcontext.Context, db satellite.DB, currentTime time.Time, numExitedNodes int) (exitedNodeIDs storj.NodeIDList) {
const (
addr = "127.0.1.0:8080"
lastNet = "127.0.0"
)
var (
cache = db.OverlayCache()
nodeIDsMap = make(map[storj.NodeID]struct{})
)
for i := 0; i < numExitedNodes; i++ {
nodeID := generateNodeIDFromPostiveInt(t, i)
exitedNodeIDs = append(exitedNodeIDs, nodeID)
if _, ok := nodeIDsMap[nodeID]; ok {
fmt.Printf("this %v already exists\n", nodeID.Bytes())
}
nodeIDsMap[nodeID] = struct{}{}
info := overlay.NodeCheckInInfo{
NodeID: nodeID,
Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC},
LastIPPort: addr,
LastNet: lastNet,
Version: &pb.NodeVersion{Version: "v1.0.0"},
Capacity: &pb.NodeCapacity{},
IsUp: true,
}
err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1)) * time.Minute)
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute),
ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute),
ExitFinishedAt: exitFinishedAt,
ExitSuccess: true,
})
require.NoError(t, err)
}
require.Equal(t, numExitedNodes, len(nodeIDsMap), "map")
return exitedNodeIDs
}
// TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch verifies that
// the CRDB batch logic for delete all the transfer queue items of exited nodes
// works as expected.
func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) {
rand.Seed(time.Now().UnixNano())
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
const (
addr = "127.0.1.0:8080"
lastNet = "127.0.0"
)
var (
numNonExitedNodes = rand.Intn(20) + 1
numExitedNodes = rand.Intn(10) + 20
cache = db.OverlayCache()
)
for i := 0; i < numNonExitedNodes; i++ {
info := overlay.NodeCheckInInfo{
NodeID: generateNodeIDFromPostiveInt(t, i),
Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC},
LastIPPort: addr,
LastNet: lastNet,
Version: &pb.NodeVersion{Version: "v1.0.0"},
Capacity: &pb.NodeCapacity{},
IsUp: true,
}
err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
}
var (
currentTime = time.Now().UTC()
exitedNodeIDs = make([]storj.NodeID, 0, numNonExitedNodes)
nodeIDsMap = make(map[storj.NodeID]struct{})
)
for i := numNonExitedNodes; i < (numNonExitedNodes + numExitedNodes); i++ {
nodeID := generateNodeIDFromPostiveInt(t, i)
exitedNodeIDs = append(exitedNodeIDs, nodeID)
if _, ok := nodeIDsMap[nodeID]; ok {
fmt.Printf("this %v already exists\n", nodeID.Bytes())
}
nodeIDsMap[nodeID] = struct{}{}
info := overlay.NodeCheckInInfo{
NodeID: nodeID,
Address: &pb.NodeAddress{Address: addr, Transport: pb.NodeTransport_TCP_TLS_GRPC},
LastIPPort: addr,
LastNet: lastNet,
Version: &pb.NodeVersion{Version: "v1.0.0"},
Capacity: &pb.NodeCapacity{},
IsUp: true,
}
err := cache.UpdateCheckIn(ctx, info, time.Now(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
exitFinishedAt := currentTime.Add(time.Duration(-(rand.Int63n(15) + 1)) * time.Minute)
_, err = cache.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitInitiatedAt: exitFinishedAt.Add(-30 * time.Minute),
ExitLoopCompletedAt: exitFinishedAt.Add(-20 * time.Minute),
ExitFinishedAt: exitFinishedAt,
ExitSuccess: true,
})
require.NoError(t, err)
}
require.Equal(t, numExitedNodes, len(nodeIDsMap), "map")
gracefulExitDB := db.GracefulExit()
batchSize := 1000
queueItems := generateNTransferQueueItemsPerNode(t, 25, exitedNodeIDs...)
// Add some items to the transfer queue for the exited nodes.
err := gracefulExitDB.Enqueue(ctx, queueItems, batchSize)
require.NoError(t, err)
disableAsOfSystemTime := time.Second * 0
// Count exited nodes
nodes, err := gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
require.NoError(t, err)
require.EqualValues(t, numExitedNodes, len(nodes), "invalid number of exited nodes")
// Delete items of the exited nodes
count, err := gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime, disableAsOfSystemTime, batchSize)
require.NoError(t, err)
require.EqualValues(t, len(queueItems), count, "invalid number of deleted items")
// Count exited nodes. At this time there shouldn't be any exited node with
// items in the queue
nodes, err = gracefulExitDB.CountFinishedTransferQueueItemsByNode(ctx, currentTime, disableAsOfSystemTime)
require.NoError(t, err)
require.EqualValues(t, 0, len(nodes), "invalid number of exited nodes")
// Delete items of the exited nodes. At this time there shouldn't be any
count, err = gracefulExitDB.DeleteAllFinishedTransferQueueItems(ctx, currentTime.Add(-15*time.Minute), disableAsOfSystemTime, batchSize)
require.NoError(t, err)
require.Zero(t, count, "invalid number of deleted items")
})
}
// generateTransferQueueItems generates a random number of transfer queue items,
// between 10 and 120, for each passed node.
func generateTransferQueueItems(t *testing.T, itemsPerNode int, nodes []*testplanet.StorageNode) ([]gracefulexit.TransferQueueItem, map[storj.NodeID]int64) {
nodeIDs := make([]storj.NodeID, len(nodes))
for i, n := range nodes {
nodeIDs[i] = n.ID()
} }
var ( items := generateNTransferQueueItemsPerNode(t, itemsPerNode, nodeIDs...)
items = make([]gracefulexit.TransferQueueItem, rand.Intn(100)+10)
nodesItems = make(map[storj.NodeID]int64, len(items)) nodesItems := make(map[storj.NodeID]int64, len(nodes))
) for _, item := range items {
for i, item := range items {
item.NodeID = getNodeID()
item.Key = metabase.SegmentKey{byte(i)}
item.PieceNum = int32(i + 1)
items[i] = item
nodesItems[item.NodeID]++ nodesItems[item.NodeID]++
} }
return items, nodesItems return items, nodesItems
} }
// generateNTransferQueueItemsPerNode generates n queue items for each nodeID.
func generateNTransferQueueItemsPerNode(t *testing.T, n int, nodeIDs ...storj.NodeID) []gracefulexit.TransferQueueItem {
items := make([]gracefulexit.TransferQueueItem, 0)
for _, nodeID := range nodeIDs {
for i := 0; i < n; i++ {
items = append(items, gracefulexit.TransferQueueItem{
NodeID: nodeID,
Key: metabase.SegmentKey{byte(rand.Int31n(256))},
PieceNum: rand.Int31(),
})
}
}
return items
}
// generateNodeIDFromPostiveInt generates a specific node ID for val; each val
// value produces a different node ID.
func generateNodeIDFromPostiveInt(t *testing.T, val int) storj.NodeID {
t.Helper()
if val < 0 {
t.Fatal("cannot generate a node from a negative integer")
}
nodeID := storj.NodeID{}
idx := 0
for {
m := val & 255
nodeID[idx] = byte(m)
q := val >> 8
if q == 0 {
break
}
if q < 256 {
nodeID[idx+1] = byte(q)
break
}
val = q
idx++
}
return nodeID
}

View File

@ -115,7 +115,7 @@ func (collector *PathCollector) InlineSegment(ctx context.Context, segment *meta
func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) { func (collector *PathCollector) flush(ctx context.Context, limit int) (err error) {
if len(collector.buffer) >= limit { if len(collector.buffer) >= limit {
err = collector.db.Enqueue(ctx, collector.buffer) err = collector.db.Enqueue(ctx, collector.buffer, collector.batchSize)
collector.buffer = collector.buffer[:0] collector.buffer = collector.buffer[:0]
return errs.Wrap(err) return errs.Wrap(err)

View File

@ -159,9 +159,10 @@ func (dbc *satelliteDBCollection) getByName(name string) *satelliteDB {
} }
// AsOfSystemTimeClause returns the "AS OF SYSTEM TIME" clause if the DB implementation // AsOfSystemTimeClause returns the "AS OF SYSTEM TIME" clause if the DB implementation
// is CockroachDB and the interval is less than 0. // is CockroachDB and the interval is less than or equal to a negative microsecond
// (CRDB does not support intervals in the negative nanoseconds).
func (db *satelliteDB) AsOfSystemTimeClause(interval time.Duration) (asOf string) { func (db *satelliteDB) AsOfSystemTimeClause(interval time.Duration) (asOf string) {
if db.implementation == dbutil.Cockroach && interval < 0 { if db.implementation == dbutil.Cockroach && interval <= -time.Microsecond {
asOf = " AS OF SYSTEM TIME '" + interval.String() + "' " asOf = " AS OF SYSTEM TIME '" + interval.String() + "' "
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/private/dbutil"
"storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql" "storj.io/private/tagsql"
"storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/gracefulexit"
@ -75,8 +76,8 @@ func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID)
return progress, Error.Wrap(err) return progress, Error.Wrap(err)
} }
// Enqueue batch inserts graceful exit transfer queue entries it does not exist. // Enqueue batch inserts graceful exit transfer queue entries if it does not exist.
func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem) (err error) { func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
sort.Slice(items, func(i, k int) bool { sort.Slice(items, func(i, k int) bool {
@ -87,25 +88,38 @@ func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.Tran
return compare < 0 return compare < 0
}) })
var nodeIDs []storj.NodeID for i := 0; i < len(items); i += batchSize {
var keys [][]byte lowerBound := i
var pieceNums []int32 upperBound := lowerBound + batchSize
var rootPieceIDs [][]byte
var durabilities []float64
for _, item := range items {
nodeIDs = append(nodeIDs, item.NodeID)
keys = append(keys, item.Key)
pieceNums = append(pieceNums, item.PieceNum)
rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes())
durabilities = append(durabilities, item.DurabilityRatio)
}
_, err = db.db.ExecContext(ctx, db.db.Rebind(` if upperBound > len(items) {
upperBound = len(items)
}
var nodeIDs []storj.NodeID
var keys [][]byte
var pieceNums []int32
var rootPieceIDs [][]byte
var durabilities []float64
for _, item := range items[lowerBound:upperBound] {
nodeIDs = append(nodeIDs, item.NodeID)
keys = append(keys, item.Key)
pieceNums = append(pieceNums, item.PieceNum)
rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes())
durabilities = append(durabilities, item.DurabilityRatio)
}
_, err = db.db.ExecContext(ctx, db.db.Rebind(`
INSERT INTO graceful_exit_transfer_queue(node_id, path, piece_num, root_piece_id, durability_ratio, queued_at) INSERT INTO graceful_exit_transfer_queue(node_id, path, piece_num, root_piece_id, durability_ratio, queued_at)
SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]), unnest($4::bytea[]), unnest($5::float8[]), $6 SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int4[]), unnest($4::bytea[]), unnest($5::float8[]), $6
ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC()) ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(keys), pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), time.Now().UTC())
return Error.Wrap(err) if err != nil {
return Error.Wrap(err)
}
}
return nil
} }
// UpdateTransferQueueItem creates a graceful exit transfer queue entry. // UpdateTransferQueueItem creates a graceful exit transfer queue entry.
@ -161,39 +175,119 @@ func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context,
// queue items whose nodes have finished the exit before the indicated time // queue items whose nodes have finished the exit before the indicated time
// returning the total number of deleted items. // returning the total number of deleted items.
func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
ctx context.Context, before time.Time) (_ int64, err error) { ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (_ int64, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind( switch db.db.implementation {
`DELETE FROM graceful_exit_transfer_queue case dbutil.Postgres:
WHERE node_id IN ( statement := `
DELETE FROM graceful_exit_transfer_queue
WHERE node_id IN (
SELECT id
FROM nodes
WHERE exit_finished_at IS NOT NULL
AND exit_finished_at < $1
)`
res, err := db.db.ExecContext(ctx, statement, before)
if err != nil {
return 0, Error.Wrap(err)
}
count, err := res.RowsAffected()
if err != nil {
return 0, Error.Wrap(err)
}
return count, nil
case dbutil.Cockroach:
asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval)
nodesQuery := `
SELECT id SELECT id
FROM nodes FROM nodes ` + asOf + `
WHERE exit_finished_at IS NOT NULL WHERE exit_finished_at IS NOT NULL
AND exit_finished_at < ? AND exit_finished_at < $1
)`, LIMIT $2 OFFSET $3
`
deleteStmt := `
DELETE FROM graceful_exit_transfer_queue
WHERE node_id = $1
LIMIT $2
`
var (
deleteCount int64
offset int
)
for {
var nodeIDs storj.NodeIDList
deleteItems := func() (int64, error) {
// Select exited nodes
rows, err := db.db.QueryContext(ctx, nodesQuery, before, batchSize, offset)
if err != nil {
return deleteCount, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
count := 0
for rows.Next() {
var id storj.NodeID
if err = rows.Scan(&id); err != nil {
return deleteCount, Error.Wrap(err)
}
nodeIDs = append(nodeIDs, id)
count++
}
if count == batchSize {
offset += count
} else {
offset = -1 // indicates that there aren't more nodes to query
}
for _, id := range nodeIDs {
for {
res, err := db.db.ExecContext(ctx, deleteStmt, id.Bytes(), batchSize)
if err != nil {
return deleteCount, Error.Wrap(err)
}
count, err := res.RowsAffected()
if err != nil {
return deleteCount, Error.Wrap(err)
}
deleteCount += count
if count < int64(batchSize) {
break
}
}
}
return deleteCount, nil
}
deleteCount, err = deleteItems()
if err != nil {
return deleteCount, err
}
// when offset is negative means that we have get already all the nodes
// which have exited
if offset < 0 {
break
}
}
return deleteCount, nil
}
return 0, Error.New("unsupported implementation: %s",
dbutil.SchemeForImplementation(db.db.implementation),
) )
res, err := db.db.ExecContext(ctx, statement, before)
if err != nil {
return 0, Error.Wrap(err)
}
count, err := res.RowsAffected()
if err != nil {
return 0, Error.Wrap(err)
}
return count, nil
} }
// DeleteFinishedExitProgress deletes exit progress entries for nodes that // DeleteFinishedExitProgress deletes exit progress entries for nodes that
// finished exiting before the indicated time, returns number of deleted entries. // finished exiting before the indicated time, returns number of deleted entries.
func (db *gracefulexitDB) DeleteFinishedExitProgress( func (db *gracefulexitDB) DeleteFinishedExitProgress(
ctx context.Context, before time.Time) (_ int64, err error) { ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ int64, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
finishedNodes, err := db.GetFinishedExitNodes(ctx, before) finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemTimeInterval)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -201,14 +295,14 @@ func (db *gracefulexitDB) DeleteFinishedExitProgress(
} }
// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time) (finishedNodes []storj.NodeID, err error) { func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
stmt := db.db.Rebind( asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval)
` SELECT id FROM nodes stmt := `SELECT id FROM nodes ` + asOf + `
WHERE exit_finished_at IS NOT NULL WHERE exit_finished_at IS NOT NULL
AND exit_finished_at < ?`, AND exit_finished_at < ?
) `
rows, err := db.db.Query(ctx, stmt, before.UTC()) rows, err := db.db.Query(ctx, db.db.Rebind(stmt), before.UTC())
if err != nil { if err != nil {
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
@ -360,18 +454,19 @@ func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, node
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has // CountFinishedTransferQueueItemsByNode return a map of the nodes which has
// finished the exit before the indicated time but there are at least one item // finished the exit before the indicated time but there are at least one item
// left in the transfer queue. // left in the transfer queue.
func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time) (_ map[storj.NodeID]int64, err error) { func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ map[storj.NodeID]int64, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
statement := db.db.Rebind( asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval)
`SELECT n.id, count(getq.node_id)
FROM nodes as n LEFT JOIN graceful_exit_transfer_queue as getq query := `SELECT n.id, count(getq.node_id)
ON n.id = getq.node_id FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq
ON n.id = getq.node_id ` + asOf + `
WHERE n.exit_finished_at IS NOT NULL WHERE n.exit_finished_at IS NOT NULL
AND n.exit_finished_at < ? AND n.exit_finished_at < ?
GROUP BY n.id GROUP BY n.id`
HAVING count(getq.node_id) > 0`,
) statement := db.db.Rebind(query)
rows, err := db.db.QueryContext(ctx, statement, before) rows, err := db.db.QueryContext(ctx, statement, before)
if err != nil { if err != nil {

View File

@ -244,6 +244,9 @@ contact.external-address: ""
# if true, skip the first run of GC # if true, skip the first run of GC
# garbage-collection.skip-first: true # garbage-collection.skip-first: true
# interval for AS OF SYSTEM TIME clause (crdb specific) to read from db at a specific time in the past
# graceful-exit.as-of-system-time-interval: -10s
# size of the buffer used to batch inserts into the transfer queue. # size of the buffer used to batch inserts into the transfer queue.
# graceful-exit.chore-batch-size: 500 # graceful-exit.chore-batch-size: 500
@ -274,6 +277,9 @@ contact.external-address: ""
# the minimum duration for receiving a stream from a storage node before timing out # the minimum duration for receiving a stream from a storage node before timing out
# graceful-exit.recv-timeout: 2h0m0s # graceful-exit.recv-timeout: 2h0m0s
# batch size (crdb specific) for deleting and adding items to the transfer queue
# graceful-exit.transfer-queue-batch-size: 1000
# path to the certificate chain for this identity # path to the certificate chain for this identity
identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert