satellite/audit: add ContainmentSyncChore
We will be needing an infrequent chore to check which nodes are in the reverify queue and synchronize that set with the 'contained' field in the nodes db, since it is easily possible for them to get out of sync. (We can't require that the reverification queue table be in the same database as the nodes table, so maintaining consistency with SQL transactions is out. Plus, even if they were in the same database, using such SQL transactions to maintain consistency would be slow and unwieldy.) This commit adds the actual chore. Refs: https://github.com/storj/storj/issues/5431 Change-Id: Id78b40bf69fae1ac39010e3b553315db8a1472bd
This commit is contained in:
parent
3d3f9d133a
commit
d6f8be1ec6
@ -148,6 +148,7 @@ type Satellite struct {
|
|||||||
Verifier *audit.Verifier
|
Verifier *audit.Verifier
|
||||||
Reverifier *audit.Reverifier
|
Reverifier *audit.Reverifier
|
||||||
Reporter audit.Reporter
|
Reporter audit.Reporter
|
||||||
|
ContainmentSyncChore *audit.ContainmentSyncChore
|
||||||
}
|
}
|
||||||
|
|
||||||
Reputation struct {
|
Reputation struct {
|
||||||
@ -644,6 +645,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
|
|||||||
system.Audit.Verifier = auditorPeer.Audit.Verifier
|
system.Audit.Verifier = auditorPeer.Audit.Verifier
|
||||||
system.Audit.Reverifier = auditorPeer.Audit.Reverifier
|
system.Audit.Reverifier = auditorPeer.Audit.Reverifier
|
||||||
system.Audit.Reporter = auditorPeer.Audit.Reporter
|
system.Audit.Reporter = auditorPeer.Audit.Reporter
|
||||||
|
system.Audit.ContainmentSyncChore = peer.Audit.ContainmentSyncChore
|
||||||
|
|
||||||
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender
|
system.GarbageCollection.Sender = gcPeer.GarbageCollection.Sender
|
||||||
system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service
|
system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service
|
||||||
|
69
satellite/audit/reverify_chore.go
Normal file
69
satellite/audit/reverify_chore.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
// Copyright (C) 2023 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package audit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/common/sync2"
|
||||||
|
"storj.io/storj/satellite/overlay"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ContainmentSyncChore is a chore to update the set of contained nodes in the
|
||||||
|
// overlay cache. This is necessary because it is possible for the "contained"
|
||||||
|
// field in the nodes table to disagree with whether a node appears in the
|
||||||
|
// reverification queue. We make an effort to keep them in sync when making
|
||||||
|
// changes to the reverification queue, but this infrequent chore will clean up
|
||||||
|
// any inconsistencies that creep in (because we can't maintain perfect
|
||||||
|
// consistency while the reverification queue and the nodes table may be in
|
||||||
|
// separate databases). Fortunately, it is acceptable for a node's containment
|
||||||
|
// status to be out of date for some amount of time.
|
||||||
|
type ContainmentSyncChore struct {
|
||||||
|
log *zap.Logger
|
||||||
|
queue ReverifyQueue
|
||||||
|
overlay overlay.DB
|
||||||
|
|
||||||
|
Loop *sync2.Cycle
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewContainmentSyncChore creates a new ContainmentSyncChore.
|
||||||
|
func NewContainmentSyncChore(log *zap.Logger, queue ReverifyQueue, overlay overlay.DB, interval time.Duration) *ContainmentSyncChore {
|
||||||
|
return &ContainmentSyncChore{
|
||||||
|
log: log,
|
||||||
|
queue: queue,
|
||||||
|
overlay: overlay,
|
||||||
|
Loop: sync2.NewCycle(interval),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs the reverify chore.
|
||||||
|
func (rc *ContainmentSyncChore) Run(ctx context.Context) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
return rc.Loop.Run(ctx, rc.syncContainedStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncContainedStatus updates the contained status of all nodes in the overlay cache
|
||||||
|
// as necessary to match whether they currently appear in the reverification queue at
|
||||||
|
// least once.
|
||||||
|
func (rc *ContainmentSyncChore) syncContainedStatus(ctx context.Context) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
containedSet, err := rc.queue.GetAllContainedNodes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
rc.log.Error("failed to get set of contained nodes from reverify queue", zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err = rc.overlay.SetAllContainedNodes(ctx, containedSet)
|
||||||
|
if err != nil {
|
||||||
|
rc.log.Error("failed to update the set of contained nodes in the overlay cache", zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rc.log.Info("updated containment status of all nodes as necessary",
|
||||||
|
zap.Int("num contained nodes", len(containedSet)))
|
||||||
|
return nil
|
||||||
|
}
|
114
satellite/audit/reverify_chore_test.go
Normal file
114
satellite/audit/reverify_chore_test.go
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
// Copyright (C) 2023 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package audit_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/common/testcontext"
|
||||||
|
"storj.io/common/testrand"
|
||||||
|
"storj.io/storj/private/testplanet"
|
||||||
|
"storj.io/storj/satellite/audit"
|
||||||
|
"storj.io/storj/satellite/overlay"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestContainmentSyncChore(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1,
|
||||||
|
StorageNodeCount: 3,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
satellite := planet.Satellites[0]
|
||||||
|
reverifyQueue := satellite.Audit.ReverifyQueue
|
||||||
|
cache := satellite.Overlay.DB
|
||||||
|
syncChore := satellite.Audit.ContainmentSyncChore
|
||||||
|
syncChore.Loop.Pause()
|
||||||
|
|
||||||
|
node1 := planet.StorageNodes[0].ID()
|
||||||
|
node2 := planet.StorageNodes[1].ID()
|
||||||
|
node3 := planet.StorageNodes[2].ID()
|
||||||
|
|
||||||
|
// no nodes should be in the reverify queue
|
||||||
|
requireInReverifyQueue(ctx, t, reverifyQueue)
|
||||||
|
// and none should be marked contained in the overlay
|
||||||
|
requireContainedStatus(ctx, t, cache, node1, false, node2, false, node3, false)
|
||||||
|
|
||||||
|
// set node1 contained in the overlay, but node2 contained in the reverify queue
|
||||||
|
err := cache.SetNodeContained(ctx, node1, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
node2Piece := &audit.PieceLocator{StreamID: testrand.UUID(), NodeID: node2}
|
||||||
|
err = reverifyQueue.Insert(ctx, node2Piece)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requireInReverifyQueue(ctx, t, reverifyQueue, node2)
|
||||||
|
requireContainedStatus(ctx, t, cache, node1, true, node2, false, node3, false)
|
||||||
|
|
||||||
|
// run the chore to synchronize
|
||||||
|
syncChore.Loop.TriggerWait()
|
||||||
|
|
||||||
|
// there should only be node2 in both places now
|
||||||
|
requireInReverifyQueue(ctx, t, reverifyQueue, node2)
|
||||||
|
requireContainedStatus(ctx, t, cache, node1, false, node2, true, node3, false)
|
||||||
|
|
||||||
|
// now get node3 in the reverify queue as well
|
||||||
|
node3Piece := &audit.PieceLocator{StreamID: testrand.UUID(), NodeID: node3}
|
||||||
|
err = reverifyQueue.Insert(ctx, node3Piece)
|
||||||
|
require.NoError(t, err)
|
||||||
|
requireInReverifyQueue(ctx, t, reverifyQueue, node2, node3)
|
||||||
|
requireContainedStatus(ctx, t, cache, node1, false, node2, true, node3, false)
|
||||||
|
|
||||||
|
// run the chore to synchronize
|
||||||
|
syncChore.Loop.TriggerWait()
|
||||||
|
|
||||||
|
// nodes 2 and 3 should both be contained in both places now
|
||||||
|
requireInReverifyQueue(ctx, t, reverifyQueue, node2, node3)
|
||||||
|
requireContainedStatus(ctx, t, cache, node1, false, node2, true, node3, true)
|
||||||
|
|
||||||
|
// remove both node2 and node3 from reverify queue
|
||||||
|
wasDeleted, err := reverifyQueue.Remove(ctx, node2Piece)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, wasDeleted)
|
||||||
|
wasDeleted, err = reverifyQueue.Remove(ctx, node3Piece)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, wasDeleted)
|
||||||
|
requireInReverifyQueue(ctx, t, reverifyQueue)
|
||||||
|
requireContainedStatus(ctx, t, cache, node1, false, node2, true, node3, true)
|
||||||
|
|
||||||
|
// run the chore to synchronize
|
||||||
|
syncChore.Loop.TriggerWait()
|
||||||
|
|
||||||
|
// nothing should be contained in either place now
|
||||||
|
requireInReverifyQueue(ctx, t, reverifyQueue)
|
||||||
|
requireContainedStatus(ctx, t, cache, node1, false, node2, false, node3, false)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireInReverifyQueue(ctx context.Context, t testing.TB, reverifyQueue audit.ReverifyQueue, expectedNodes ...storj.NodeID) {
|
||||||
|
nodesInReverifyQueue, err := reverifyQueue.GetAllContainedNodes(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sort.Slice(nodesInReverifyQueue, func(i, j int) bool {
|
||||||
|
return nodesInReverifyQueue[i].Compare(nodesInReverifyQueue[j]) < 0
|
||||||
|
})
|
||||||
|
sort.Slice(nodesInReverifyQueue, func(i, j int) bool {
|
||||||
|
return expectedNodes[i].Compare(expectedNodes[j]) < 0
|
||||||
|
})
|
||||||
|
require.Equal(t, expectedNodes, nodesInReverifyQueue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireContainedStatus(ctx context.Context, t testing.TB, cache overlay.DB, args ...interface{}) {
|
||||||
|
require.Equal(t, 0, len(args)%2, "must be given an even number of args")
|
||||||
|
for n := 0; n < len(args); n += 2 {
|
||||||
|
nodeID := args[n].(storj.NodeID)
|
||||||
|
expectedContainment := args[n+1].(bool)
|
||||||
|
nodeInDB, err := cache.Get(ctx, nodeID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equalf(t, expectedContainment, nodeInDB.Contained,
|
||||||
|
"Expected nodeID %v (args[%d]) contained = %v, but got %v",
|
||||||
|
nodeID, n, expectedContainment, nodeInDB.Contained)
|
||||||
|
}
|
||||||
|
}
|
@ -34,6 +34,8 @@ type Config struct {
|
|||||||
|
|
||||||
ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"`
|
ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"`
|
||||||
ReverificationRetryInterval time.Duration `help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m"`
|
ReverificationRetryInterval time.Duration `help:"how long a single reverification job can take before it may be taken over by another worker" releaseDefault:"6h" devDefault:"10m"`
|
||||||
|
|
||||||
|
ContainmentSyncChoreInterval time.Duration `help:"how often to run the containment-sync chore" releaseDefault:"2h" devDefault:"2m" testDefault:"$TESTINTERVAL"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Worker contains information for populating audit queue and processing audits.
|
// Worker contains information for populating audit queue and processing audits.
|
||||||
|
@ -117,7 +117,9 @@ type Core struct {
|
|||||||
|
|
||||||
Audit struct {
|
Audit struct {
|
||||||
VerifyQueue audit.VerifyQueue
|
VerifyQueue audit.VerifyQueue
|
||||||
|
ReverifyQueue audit.ReverifyQueue
|
||||||
Chore *audit.Chore
|
Chore *audit.Chore
|
||||||
|
ContainmentSyncChore *audit.ContainmentSyncChore
|
||||||
}
|
}
|
||||||
|
|
||||||
ExpiredDeletion struct {
|
ExpiredDeletion struct {
|
||||||
@ -401,6 +403,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
|||||||
config := config.Audit
|
config := config.Audit
|
||||||
|
|
||||||
peer.Audit.VerifyQueue = db.VerifyQueue()
|
peer.Audit.VerifyQueue = db.VerifyQueue()
|
||||||
|
peer.Audit.ReverifyQueue = db.ReverifyQueue()
|
||||||
|
|
||||||
if config.UseRangedLoop {
|
if config.UseRangedLoop {
|
||||||
peer.Log.Named("audit:chore").Info("using ranged loop")
|
peer.Log.Named("audit:chore").Info("using ranged loop")
|
||||||
@ -417,6 +420,18 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
|||||||
})
|
})
|
||||||
peer.Debug.Server.Panel.Add(
|
peer.Debug.Server.Panel.Add(
|
||||||
debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
|
debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
|
||||||
|
|
||||||
|
peer.Audit.ContainmentSyncChore = audit.NewContainmentSyncChore(peer.Log.Named("audit:containment-sync-chore"),
|
||||||
|
peer.Audit.ReverifyQueue,
|
||||||
|
peer.Overlay.DB,
|
||||||
|
config.ContainmentSyncChoreInterval,
|
||||||
|
)
|
||||||
|
peer.Services.Add(lifecycle.Item{
|
||||||
|
Name: "audit:containment-sync-chore",
|
||||||
|
Run: peer.Audit.ContainmentSyncChore.Run,
|
||||||
|
})
|
||||||
|
peer.Debug.Server.Panel.Add(
|
||||||
|
debug.Cycle("Audit Containment Sync Chore", peer.Audit.ContainmentSyncChore.Loop))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -37,6 +37,9 @@
|
|||||||
# how often to run the reservoir chore
|
# how often to run the reservoir chore
|
||||||
# audit.chore-interval: 24h0m0s
|
# audit.chore-interval: 24h0m0s
|
||||||
|
|
||||||
|
# how often to run the containment-sync chore
|
||||||
|
# audit.containment-sync-chore-interval: 2h0m0s
|
||||||
|
|
||||||
# max number of times to attempt updating a statdb batch
|
# max number of times to attempt updating a statdb batch
|
||||||
# audit.max-retries-stat-db: 3
|
# audit.max-retries-stat-db: 3
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user