satellite/overlay: add SetAllContainedNodes method to overlay.DB
We will be needing an infrequent chore to check which nodes are in the reverify queue and synchronize that set with the 'contained' field in the nodes db, since it is easily possible for them to get out of sync. (We can't require that the reverification queue table be in the same database as the nodes table, so maintaining consistency with SQL transactions is out. Plus, even if they were in the same database, using such SQL transactions to maintain consistency would be slow and unwieldy.) This commit adds a method to the overlay allowing the caller to set the contained status of all nodes in the nodes table at once. This is valid because our definition of "contained" now depends solely on whether a node appears at least once in the reverification queue. Only rows whose contained field does not match the expectation will be updated; the contained timestamp will not be updated for a node which is supposed to be contained and was already contained. Change-Id: I8cabe56ad897b6027e11aa5b17175295391aa3ac
This commit is contained in:
parent
12f884e802
commit
0e2fef977f
@ -78,6 +78,8 @@ type DB interface {
|
||||
UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time, config NodeSelectionConfig) (err error)
|
||||
// SetNodeContained updates the contained field for the node record.
|
||||
SetNodeContained(ctx context.Context, node storj.NodeID, contained bool) (err error)
|
||||
// SetAllContainedNodes updates the contained field for all nodes, as necessary.
|
||||
SetAllContainedNodes(ctx context.Context, containedNodes []storj.NodeID) (err error)
|
||||
|
||||
// AllPieceCounts returns a map of node IDs to piece counts from the db.
|
||||
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error)
|
||||
|
@ -1507,6 +1507,33 @@ func (cache *overlaycache) SetNodeContained(ctx context.Context, nodeID storj.No
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// SetAllContainedNodes updates the contained field for all nodes, as necessary.
|
||||
// containedNodes is expected to be a set of all nodes that should be contained.
|
||||
// All nodes which are in this set but do not already have a non-NULL contained
|
||||
// field will be updated to be contained as of the current time, and all nodes
|
||||
// which are not in this set but are contained in the table will be updated to
|
||||
// have a NULL contained field.
|
||||
func (cache *overlaycache) SetAllContainedNodes(ctx context.Context, containedNodes []storj.NodeID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
updateQuery := `
|
||||
WITH should_be AS (
|
||||
SELECT nodes.id, EXISTS (SELECT 1 FROM unnest($1::BYTEA[]) sb(i) WHERE sb.i = id) AS contained
|
||||
FROM nodes
|
||||
)
|
||||
UPDATE nodes n SET contained =
|
||||
CASE WHEN should_be.contained
|
||||
THEN current_timestamp
|
||||
ELSE NULL
|
||||
END
|
||||
FROM should_be
|
||||
WHERE n.id = should_be.id
|
||||
AND (n.contained IS NOT NULL) != should_be.contained
|
||||
`
|
||||
_, err = cache.db.DB.ExecContext(ctx, updateQuery, pgutil.NodeIDArray(containedNodes))
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrVetting is the error class for the following test methods.
|
||||
ErrVetting = errs.Class("vetting")
|
||||
|
@ -4,6 +4,7 @@
|
||||
package satellitedb_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -223,3 +224,60 @@ func TestUpdateCheckInDirectUpdate(t *testing.T) {
|
||||
require.True(t, updated)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSetAllContainedNodes(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
cache := db.OverlayCache()
|
||||
|
||||
node1 := testrand.NodeID()
|
||||
node2 := testrand.NodeID()
|
||||
node3 := testrand.NodeID()
|
||||
|
||||
// put nodes with these IDs in the db
|
||||
for _, n := range []storj.NodeID{node1, node2, node3} {
|
||||
checkInInfo := overlay.NodeCheckInInfo{
|
||||
IsUp: true,
|
||||
Address: &pb.NodeAddress{Address: "1.2.3.4"},
|
||||
Version: &pb.NodeVersion{Version: "v0.0.0"},
|
||||
NodeID: n,
|
||||
}
|
||||
err := cache.UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// none of them should be contained
|
||||
assertContained(ctx, t, cache, node1, false, node2, false, node3, false)
|
||||
|
||||
// Set node2 (only) to be contained
|
||||
err := cache.SetAllContainedNodes(ctx, []storj.NodeID{node2})
|
||||
require.NoError(t, err)
|
||||
assertContained(ctx, t, cache, node1, false, node2, true, node3, false)
|
||||
|
||||
// Set node1 and node3 (only) to be contained
|
||||
err = cache.SetAllContainedNodes(ctx, []storj.NodeID{node1, node3})
|
||||
require.NoError(t, err)
|
||||
assertContained(ctx, t, cache, node1, true, node2, false, node3, true)
|
||||
|
||||
// Set node1 (only) to be contained
|
||||
err = cache.SetAllContainedNodes(ctx, []storj.NodeID{node1})
|
||||
require.NoError(t, err)
|
||||
assertContained(ctx, t, cache, node1, true, node2, false, node3, false)
|
||||
|
||||
// Set no nodes to be contained
|
||||
err = cache.SetAllContainedNodes(ctx, []storj.NodeID{})
|
||||
require.NoError(t, err)
|
||||
assertContained(ctx, t, cache, node1, false, node2, false, node3, false)
|
||||
})
|
||||
}
|
||||
|
||||
func assertContained(ctx context.Context, t testing.TB, cache overlay.DB, args ...interface{}) {
|
||||
require.Equal(t, 0, len(args)%2, "must be given an even number of args")
|
||||
for n := 0; n < len(args); n += 2 {
|
||||
nodeID := args[n].(storj.NodeID)
|
||||
expectedContainment := args[n+1].(bool)
|
||||
nodeInDB, err := cache.Get(ctx, nodeID)
|
||||
require.NoError(t, err)
|
||||
require.Equalf(t, expectedContainment, nodeInDB.Contained,
|
||||
"Expected nodeID %v (args[%d]) contained = %v, but got %v",
|
||||
nodeID, n, expectedContainment, nodeInDB.Contained)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user