diff --git a/cmd/satellite/fetchpieces.go b/cmd/satellite/fetchpieces.go index 9493b71fc..01255f831 100644 --- a/cmd/satellite/fetchpieces.go +++ b/cmd/satellite/fetchpieces.go @@ -92,6 +92,7 @@ func cmdFetchPieces(cmd *cobra.Command, args []string) (err error) { db.NodeEvents(), db.Reputation(), db.Containment(), + db.NewContainment(), rollupsWriteCache, version.Build, &runCfg.Config, diff --git a/cmd/satellite/repairer.go b/cmd/satellite/repairer.go index 3cfebd6a4..948c66af3 100644 --- a/cmd/satellite/repairer.go +++ b/cmd/satellite/repairer.go @@ -72,6 +72,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) { db.NodeEvents(), db.Reputation(), db.Containment(), + db.NewContainment(), rollupsWriteCache, version.Build, &runCfg.Config, diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index 1907aba4b..dd54bb1a8 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -689,7 +689,7 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize) planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache}) - return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil) + return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), db.NewContainment(), rollupsWriteCache, versionInfo, &config, nil) } type rollupsWriteCacheCloser struct { diff --git a/satellite/audit/containment.go b/satellite/audit/containment.go index dff5f6706..71373654d 100644 --- a/satellite/audit/containment.go +++ b/satellite/audit/containment.go @@ -45,3 +45,16 @@ type Containment interface { IncrementPending(ctx context.Context, pendingAudit *PendingAudit) error Delete(ctx context.Context, nodeID pb.NodeID) (bool, error) } + +// NewContainment holds information about pending audits for contained nodes. +// +// It will exist side by side with Containment for a few commits in this +// commit chain, to allow the change in reverifications to be made over +// several smaller commits. +// +// Later in the commit chain, NewContainment will replace Containment. +type NewContainment interface { + Get(ctx context.Context, nodeID pb.NodeID) (*ReverificationJob, error) + Insert(ctx context.Context, job *PieceLocator) error + Delete(ctx context.Context, job *PieceLocator) (wasDeleted, nodeStillContained bool, err error) +} diff --git a/satellite/audit/containment2_test.go b/satellite/audit/containment2_test.go new file mode 100644 index 000000000..e66ccbf03 --- /dev/null +++ b/satellite/audit/containment2_test.go @@ -0,0 +1,179 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package audit_test + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite/audit" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/reputation" +) + +func TestNewContainInsertAndGet(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 2, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + containment := planet.Satellites[0].DB.NewContainment() + + input := &audit.PieceLocator{ + StreamID: testrand.UUID(), + Position: metabase.SegmentPositionFromEncoded(uint64(rand.Int63())), + NodeID: planet.StorageNodes[0].ID(), + PieceNum: 0, + } + + err := containment.Insert(ctx, input) + require.NoError(t, err) + + output, err := containment.Get(ctx, input.NodeID) + require.NoError(t, err) + + assert.Equal(t, input.NodeID, output.Locator.NodeID) + assert.Equal(t, input.StreamID, output.Locator.StreamID) + assert.Equal(t, input.Position, output.Locator.Position) + assert.Equal(t, input.PieceNum, output.Locator.PieceNum) + assert.EqualValues(t, 0, output.ReverifyCount) + + nodeID1 := planet.StorageNodes[1].ID() + _, err = containment.Get(ctx, nodeID1) + require.Error(t, err, audit.ErrContainedNotFound.New("%v", nodeID1)) + assert.Truef(t, audit.ErrContainedNotFound.Has(err), "expected ErrContainedNotFound but got %+v", err) + }) +} + +func TestNewContainIncrementPendingEntryExists(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + containment := planet.Satellites[0].DB.NewContainment() + + info1 := &audit.PieceLocator{ + NodeID: planet.StorageNodes[0].ID(), + } + + err := containment.Insert(ctx, info1) + require.NoError(t, err) + + // expect reverify count for an entry to be 0 after first IncrementPending call + pending, err := containment.Get(ctx, info1.NodeID) + require.NoError(t, err) + assert.EqualValues(t, 0, pending.ReverifyCount) + + // expect reverify count to be 0 still after second IncrementPending call + err = containment.Insert(ctx, info1) + require.NoError(t, err) + pending, err = containment.Get(ctx, info1.NodeID) + require.NoError(t, err) + assert.EqualValues(t, 0, pending.ReverifyCount) + + // after the job is selected for work, its ReverifyCount should be increased to 1 + job, err := planet.Satellites[0].DB.ReverifyQueue().GetNextJob(ctx, 10*time.Minute) + require.NoError(t, err) + require.Equal(t, pending.Locator, job.Locator) + assert.EqualValues(t, 1, job.ReverifyCount) + + pending, err = containment.Get(ctx, info1.NodeID) + require.NoError(t, err) + assert.EqualValues(t, 1, pending.ReverifyCount) + }) +} + +func TestNewContainDelete(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + containment := planet.Satellites[0].DB.NewContainment() + + // add two reverification jobs for the same node + info1 := &audit.PieceLocator{ + NodeID: planet.StorageNodes[0].ID(), + StreamID: testrand.UUID(), + } + info2 := &audit.PieceLocator{ + NodeID: planet.StorageNodes[0].ID(), + StreamID: testrand.UUID(), + } + + err := containment.Insert(ctx, info1) + require.NoError(t, err) + err = containment.Insert(ctx, info2) + require.NoError(t, err) + + // 'get' will choose one of them (we don't really care which) + got, err := containment.Get(ctx, info1.NodeID) + require.NoError(t, err) + if got.Locator != *info1 { + require.Equal(t, *info2, got.Locator) + } + + // delete one of the pending reverifications + wasDeleted, stillInContainment, err := containment.Delete(ctx, info2) + require.NoError(t, err) + require.True(t, wasDeleted) + require.True(t, stillInContainment) + + // 'get' now is sure to select info1 + got, err = containment.Get(ctx, info1.NodeID) + require.NoError(t, err) + require.Equal(t, *info1, got.Locator) + require.EqualValues(t, 0, got.ReverifyCount) + + // delete the other pending reverification + wasDeleted, stillInContainment, err = containment.Delete(ctx, info1) + require.NoError(t, err) + require.True(t, wasDeleted) + require.False(t, stillInContainment) + + // try to get a pending reverification that isn't in the queue + _, err = containment.Get(ctx, info1.NodeID) + require.Error(t, err, audit.ErrContainedNotFound.New("%v", info1.NodeID)) + require.True(t, audit.ErrContainedNotFound.Has(err)) + + // and try to delete that pending reverification that isn't in the queue + wasDeleted, _, err = containment.Delete(ctx, info1) + require.NoError(t, err) + assert.False(t, wasDeleted) + }) +} + +// UpdateStats used to remove nodes from containment. It doesn't anymore. +// This is a sanity check. +func TestNewContainUpdateStats(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + containment := planet.Satellites[0].DB.NewContainment() + cache := planet.Satellites[0].DB.OverlayCache() + + info1 := &audit.PieceLocator{ + NodeID: planet.StorageNodes[0].ID(), + } + + err := containment.Insert(ctx, info1) + require.NoError(t, err) + + // update node stats + err = planet.Satellites[0].Reputation.Service.ApplyAudit(ctx, info1.NodeID, overlay.ReputationStatus{}, reputation.AuditSuccess) + require.NoError(t, err) + + // check contained flag set to false + node, err := cache.Get(ctx, info1.NodeID) + require.NoError(t, err) + assert.False(t, node.Contained) + + // get pending audit + _, err = containment.Get(ctx, info1.NodeID) + require.NoError(t, err) + }) +} diff --git a/satellite/audit/getshare_test.go b/satellite/audit/getshare_test.go index 6d75a8314..bec3db1be 100644 --- a/satellite/audit/getshare_test.go +++ b/satellite/audit/getshare_test.go @@ -62,6 +62,7 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo newDialer, sat.Overlay.Service, sat.DB.Containment(), + sat.DB.NewContainment(), sat.Orders.Service, sat.Identity, sat.Config.Audit.MinBytesPerSecond, diff --git a/satellite/audit/reporter.go b/satellite/audit/reporter.go index 223cf1236..4e0917051 100644 --- a/satellite/audit/reporter.go +++ b/satellite/audit/reporter.go @@ -18,9 +18,11 @@ import ( // // architecture: Service type reporter struct { - log *zap.Logger - reputations *reputation.Service - containment Containment + log *zap.Logger + reputations *reputation.Service + containment Containment + // newContainment is temporary, and will replace containment + newContainment NewContainment maxRetries int maxReverifyCount int32 } @@ -45,11 +47,12 @@ type Report struct { } // NewReporter instantiates a reporter. -func NewReporter(log *zap.Logger, reputations *reputation.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter { +func NewReporter(log *zap.Logger, reputations *reputation.Service, containment Containment, newContainment NewContainment, maxRetries int, maxReverifyCount int32) Reporter { return &reporter{ log: log, reputations: reputations, containment: containment, + newContainment: newContainment, maxRetries: maxRetries, maxReverifyCount: maxReverifyCount, } diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index 0b9590f90..be59f823c 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -398,6 +398,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) { dialer, satellite.Overlay.Service, satellite.DB.Containment(), + satellite.DB.NewContainment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index ede972e91..c7b31ef63 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -61,6 +61,7 @@ type Verifier struct { dialer rpc.Dialer overlay *overlay.Service containment Containment + newContainment NewContainment minBytesPerSecond memory.Size minDownloadTimeout time.Duration @@ -69,7 +70,7 @@ type Verifier struct { } // NewVerifier creates a Verifier. -func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier { +func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, newContainment NewContainment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier { return &Verifier{ log: log, metabase: metabase, @@ -78,6 +79,7 @@ func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, over dialer: dialer, overlay: overlay, containment: containment, + newContainment: newContainment, minBytesPerSecond: minBytesPerSecond, minDownloadTimeout: minDownloadTimeout, nowFn: time.Now, diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index 91c74e9ab..6a6f5ae44 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -258,6 +258,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) { dialer, satellite.Overlay.Service, satellite.DB.Containment(), + satellite.DB.NewContainment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, @@ -333,6 +334,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { satellite.Dialer, satellite.Overlay.Service, satellite.DB.Containment(), + satellite.DB.NewContainment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, @@ -634,6 +636,7 @@ func TestVerifierDialTimeout(t *testing.T) { dialer, satellite.Overlay.Service, satellite.DB.Containment(), + satellite.DB.NewContainment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, diff --git a/satellite/core.go b/satellite/core.go index cc52311c2..04a5bcd5d 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -414,6 +414,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, dialer, peer.Overlay.Service, peer.DB.Containment(), + peer.DB.NewContainment(), peer.Orders.Service, peer.Identity, config.MinBytesPerSecond, @@ -427,6 +428,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"), peer.Reputation.Service, peer.DB.Containment(), + peer.DB.NewContainment(), config.MaxRetriesStatDB, int32(config.MaxReverifyCount), ) diff --git a/satellite/peer.go b/satellite/peer.go index c89134cc0..5b5faa17b 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -114,6 +114,8 @@ type DB interface { Orders() orders.DB // Containment returns database for containment Containment() audit.Containment + // NewContainment is temporary and will replace Containment later in the commit chain. + NewContainment() audit.NewContainment // Buckets returns the database to interact with buckets Buckets() buckets.DB // GracefulExit returns database for graceful exit @@ -122,7 +124,6 @@ type DB interface { StripeCoinPayments() stripecoinpayments.DB // Billing returns storjscan transactions database. Billing() billing.TransactionsDB - // SNOPayouts returns database for payouts. // Wallets returns storjscan wallets database. Wallets() storjscan.WalletsDB // SNOPayouts returns database for payouts. diff --git a/satellite/repairer.go b/satellite/repairer.go index 615dfc434..1ad8eec92 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -86,6 +86,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, nodeEvents nodeevents.DB, reputationdb reputation.DB, containmentDB audit.Containment, + newContainmentDB audit.NewContainment, rollupsWriteCache *orders.RollupsWriteCache, versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel, ) (*Repairer, error) { @@ -219,6 +220,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, log.Named("reporter"), peer.Reputation, containmentDB, + newContainmentDB, config.Audit.MaxRetriesStatDB, int32(config.Audit.MaxReverifyCount)) } diff --git a/satellite/satellitedb/containment.go b/satellite/satellitedb/containment.go index 1590657be..bc7ebb96a 100644 --- a/satellite/satellitedb/containment.go +++ b/satellite/satellitedb/containment.go @@ -23,6 +23,10 @@ type containment struct { db *satelliteDB } +type newContainment struct { + reverifyQueue audit.ReverifyQueue +} + // Get gets the pending audit by node id. func (containment *containment) Get(ctx context.Context, id pb.NodeID) (_ *audit.PendingAudit, err error) { defer mon.Task()(&ctx)(&err) @@ -136,3 +140,44 @@ func convertDBPending(ctx context.Context, info *dbx.SegmentPendingAudits) (_ *a } return pending, nil } + +// Get gets a pending reverification audit by node id. If there are +// multiple pending reverification audits, an arbitrary one is returned. +// If there are none, an error wrapped by audit.ErrContainedNotFound is +// returned. +func (containment *newContainment) Get(ctx context.Context, id pb.NodeID) (_ *audit.ReverificationJob, err error) { + defer mon.Task()(&ctx)(&err) + if id.IsZero() { + return nil, audit.ContainError.New("node ID empty") + } + + return containment.reverifyQueue.GetByNodeID(ctx, id) +} + +// Insert creates a new pending audit entry. +func (containment *newContainment) Insert(ctx context.Context, pendingJob *audit.PieceLocator) (err error) { + defer mon.Task()(&ctx)(&err) + + return containment.reverifyQueue.Insert(ctx, pendingJob) +} + +// Delete removes a job from the reverification queue, whether because the job +// was successful or because the job is no longer necessary. The wasDeleted +// return value indicates whether the indicated job was actually deleted (if +// not, there was no such job in the queue). +func (containment *newContainment) Delete(ctx context.Context, pendingJob *audit.PieceLocator) (isDeleted, nodeStillContained bool, err error) { + defer mon.Task()(&ctx)(&err) + + isDeleted, err = containment.reverifyQueue.Remove(ctx, pendingJob) + if err != nil { + return false, false, audit.ContainError.Wrap(err) + } + + nodeStillContained = true + _, err = containment.reverifyQueue.GetByNodeID(ctx, pendingJob.NodeID) + if audit.ErrContainedNotFound.Has(err) { + nodeStillContained = false + err = nil + } + return isDeleted, nodeStillContained, audit.ContainError.Wrap(err) +} diff --git a/satellite/satellitedb/database.go b/satellite/satellitedb/database.go index 122ea2afe..f14273d9c 100644 --- a/satellite/satellitedb/database.go +++ b/satellite/satellitedb/database.go @@ -280,6 +280,11 @@ func (dbc *satelliteDBCollection) Containment() audit.Containment { return &containment{db: dbc.getByName("containment")} } +// NewContainment is temporary and will replace Containment later in the commit chain. +func (dbc *satelliteDBCollection) NewContainment() audit.NewContainment { + return &newContainment{reverifyQueue: dbc.ReverifyQueue()} +} + // GracefulExit returns database for graceful exit. func (dbc *satelliteDBCollection) GracefulExit() gracefulexit.DB { return &gracefulexitDB{db: dbc.getByName("gracefulexit")}