diff --git a/cmd/satellite/fetchpieces.go b/cmd/satellite/fetchpieces.go index 0976c5a55..9493b71fc 100644 --- a/cmd/satellite/fetchpieces.go +++ b/cmd/satellite/fetchpieces.go @@ -91,7 +91,7 @@ func cmdFetchPieces(cmd *cobra.Command, args []string) (err error) { db.OverlayCache(), db.NodeEvents(), db.Reputation(), - db.NewContainment(), + db.Containment(), rollupsWriteCache, version.Build, &runCfg.Config, diff --git a/cmd/satellite/repairer.go b/cmd/satellite/repairer.go index 05811e35c..3cfebd6a4 100644 --- a/cmd/satellite/repairer.go +++ b/cmd/satellite/repairer.go @@ -71,7 +71,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) { db.OverlayCache(), db.NodeEvents(), db.Reputation(), - db.NewContainment(), + db.Containment(), rollupsWriteCache, version.Build, &runCfg.Config, diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index c89973c24..4cba13f06 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -700,7 +700,7 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize) planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache}) - return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.NewContainment(), rollupsWriteCache, versionInfo, &config, nil) + return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil) } type rollupsWriteCacheCloser struct { diff --git a/satellite/audit/containment.go b/satellite/audit/containment.go index a4bcaf506..4d667c48b 100644 --- a/satellite/audit/containment.go +++ b/satellite/audit/containment.go @@ -22,16 +22,10 @@ var ( ErrContainDelete = errs.Class("unable to delete pending audit") ) -// NewContainment holds information about pending audits for contained nodes. -// -// It will exist side by side with Containment for a few commits in this -// commit chain, to allow the change in reverifications to be made over -// several smaller commits. -// -// Later in the commit chain, NewContainment will be renamed to Containment. +// Containment holds information about pending audits for contained nodes. // // architecture: Database -type NewContainment interface { +type Containment interface { Get(ctx context.Context, nodeID pb.NodeID) (*ReverificationJob, error) Insert(ctx context.Context, job *PieceLocator) error Delete(ctx context.Context, job *PieceLocator) (wasDeleted, nodeStillContained bool, err error) diff --git a/satellite/audit/containment2_test.go b/satellite/audit/containment_test.go similarity index 92% rename from satellite/audit/containment2_test.go rename to satellite/audit/containment_test.go index 2a2790b91..8feaf3dd5 100644 --- a/satellite/audit/containment2_test.go +++ b/satellite/audit/containment_test.go @@ -20,11 +20,11 @@ import ( "storj.io/storj/satellite/reputation" ) -func TestNewContainInsertAndGet(t *testing.T) { +func TestContainInsertAndGet(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 2, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - containment := planet.Satellites[0].DB.NewContainment() + containment := planet.Satellites[0].DB.Containment() input := &audit.PieceLocator{ StreamID: testrand.UUID(), @@ -49,11 +49,11 @@ func TestNewContainInsertAndGet(t *testing.T) { }) } -func TestNewContainIncrementPendingEntryExists(t *testing.T) { +func TestContainIncrementPendingEntryExists(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - containment := planet.Satellites[0].DB.NewContainment() + containment := planet.Satellites[0].DB.Containment() info1 := &audit.PieceLocator{ NodeID: planet.StorageNodes[0].ID(), @@ -86,11 +86,11 @@ func TestNewContainIncrementPendingEntryExists(t *testing.T) { }) } -func TestNewContainDelete(t *testing.T) { +func TestContainDelete(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - containment := planet.Satellites[0].DB.NewContainment() + containment := planet.Satellites[0].DB.Containment() // add two reverification jobs for the same node info1 := &audit.PieceLocator{ @@ -147,11 +147,11 @@ func TestNewContainDelete(t *testing.T) { // UpdateStats used to remove nodes from containment. It doesn't anymore. // This is a sanity check. -func TestNewContainUpdateStats(t *testing.T) { +func TestContainUpdateStats(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - containment := planet.Satellites[0].DB.NewContainment() + containment := planet.Satellites[0].DB.Containment() cache := planet.Satellites[0].DB.OverlayCache() info1 := &audit.PieceLocator{ diff --git a/satellite/audit/getshare_test.go b/satellite/audit/getshare_test.go index 32cbfb9be..6d75a8314 100644 --- a/satellite/audit/getshare_test.go +++ b/satellite/audit/getshare_test.go @@ -61,7 +61,7 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo sat.Metabase.DB, newDialer, sat.Overlay.Service, - sat.DB.NewContainment(), + sat.DB.Containment(), sat.Orders.Service, sat.Identity, sat.Config.Audit.MinBytesPerSecond, diff --git a/satellite/audit/reporter.go b/satellite/audit/reporter.go index a978694c2..d487ac778 100644 --- a/satellite/audit/reporter.go +++ b/satellite/audit/reporter.go @@ -19,11 +19,10 @@ import ( // // architecture: Service type reporter struct { - log *zap.Logger - reputations *reputation.Service - overlay *overlay.Service - // newContainment will be renamed to containment. - newContainment NewContainment + log *zap.Logger + reputations *reputation.Service + overlay *overlay.Service + containment Containment maxRetries int maxReverifyCount int32 } @@ -41,22 +40,21 @@ type Reporter interface { // succeeded, failed, were offline, have pending audits, or failed for unknown // reasons and their current reputation status. type Report struct { - Successes storj.NodeIDList - Fails storj.NodeIDList - Offlines storj.NodeIDList - // PieceAudits will be renamed to PendingAudits. - PieceAudits []*ReverificationJob + Successes storj.NodeIDList + Fails storj.NodeIDList + Offlines storj.NodeIDList + PendingAudits []*ReverificationJob Unknown storj.NodeIDList NodesReputation map[storj.NodeID]overlay.ReputationStatus } // NewReporter instantiates a reporter. -func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, newContainment NewContainment, maxRetries int, maxReverifyCount int32) Reporter { +func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter { return &reporter{ log: log, reputations: reputations, overlay: overlay, - newContainment: newContainment, + containment: containment, maxRetries: maxRetries, maxReverifyCount: maxReverifyCount, } @@ -72,7 +70,7 @@ func (reporter *reporter) RecordAudits(ctx context.Context, req Report) { fails := req.Fails unknowns := req.Unknown offlines := req.Offlines - pendingAudits := req.PieceAudits + pendingAudits := req.PendingAudits reporter.log.Debug("Reporting audits", zap.Int("successes", len(successes)), @@ -110,7 +108,7 @@ func (reporter *reporter) RecordAudits(ctx context.Context, req Report) { reportFailures(tries, "unknown", err, unknowns, nil) offlines, err = reporter.recordAuditStatus(ctx, offlines, nodesReputation, reputation.AuditOffline) reportFailures(tries, "offline", err, offlines, nil) - pendingAudits, err = reporter.recordPendingPieceAudits(ctx, pendingAudits, nodesReputation) + pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits, nodesReputation) reportFailures(tries, "pending", err, nil, pendingAudits) } } @@ -132,9 +130,8 @@ func (reporter *reporter) recordAuditStatus(ctx context.Context, nodeIDs storj.N return failed, errors.Err() } -// recordPendingPieceAudits updates the containment status of nodes with pending piece audits. -// This function is temporary and will be renamed to recordPendingAudits later in this commit chain. -func (reporter *reporter) recordPendingPieceAudits(ctx context.Context, pendingAudits []*ReverificationJob, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*ReverificationJob, err error) { +// recordPendingAudits updates the containment status of nodes with pending piece audits. +func (reporter *reporter) recordPendingAudits(ctx context.Context, pendingAudits []*ReverificationJob, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*ReverificationJob, err error) { defer mon.Task()(&ctx)(&err) var errlist errs.Group @@ -164,7 +161,7 @@ func (reporter *reporter) recordPendingPieceAudits(ctx context.Context, pendingA failed = append(failed, pendingAudit) continue } - _, stillContained, err := reporter.newContainment.Delete(ctx, &pendingAudit.Locator) + _, stillContained, err := reporter.containment.Delete(ctx, &pendingAudit.Locator) if err != nil { if !ErrContainedNotFound.Has(err) { errlist.Add(err) @@ -188,7 +185,7 @@ func (reporter *reporter) recordPendingPieceAudits(ctx context.Context, pendingA func (reporter *reporter) ReportReverificationNeeded(ctx context.Context, piece *PieceLocator) (err error) { defer mon.Task()(&ctx)(&err) - err = reporter.newContainment.Insert(ctx, piece) + err = reporter.containment.Insert(ctx, piece) if err != nil { return Error.New("failed to queue reverification audit for node: %w", err) } @@ -223,7 +220,7 @@ func (reporter *reporter) RecordReverificationResult(ctx context.Context, pendin // This will get re-added to the reverification queue, but that is idempotent // and fine. We do need to add it to PendingAudits in order to get the // maxReverifyCount check. - report.PieceAudits = append(report.PieceAudits, pendingJob) + report.PendingAudits = append(report.PendingAudits, pendingJob) case OutcomeUnknownError: report.Unknown = append(report.Unknown, pendingJob.Locator.NodeID) keepInQueue = false @@ -237,7 +234,7 @@ func (reporter *reporter) RecordReverificationResult(ctx context.Context, pendin // remove from reverifications queue if appropriate if !keepInQueue { - _, stillContained, err := reporter.newContainment.Delete(ctx, &pendingJob.Locator) + _, stillContained, err := reporter.containment.Delete(ctx, &pendingJob.Locator) if err != nil { if !ErrContainedNotFound.Has(err) { errList.Add(err) diff --git a/satellite/audit/reporter_test.go b/satellite/audit/reporter_test.go index 326005e7b..78ccf4c34 100644 --- a/satellite/audit/reporter_test.go +++ b/satellite/audit/reporter_test.go @@ -35,8 +35,8 @@ func TestReportPendingAudits(t *testing.T) { }, } - report := audit.Report{PieceAudits: []*audit.ReverificationJob{&pending}} - containment := satellite.DB.NewContainment() + report := audit.Report{PendingAudits: []*audit.ReverificationJob{&pending}} + containment := satellite.DB.Containment() audits.Reporter.RecordAudits(ctx, report) @@ -94,7 +94,7 @@ func TestRecordAuditsCorrectOutcome(t *testing.T) { Successes: []storj.NodeID{goodNode}, Fails: []storj.NodeID{dqNode}, Unknown: []storj.NodeID{suspendedNode}, - PieceAudits: []*audit.ReverificationJob{ + PendingAudits: []*audit.ReverificationJob{ { Locator: audit.PieceLocator{NodeID: pendingNode}, ReverifyCount: 0, @@ -206,11 +206,11 @@ func TestGracefullyExitedNotUpdated(t *testing.T) { }, } report = audit.Report{ - Successes: storj.NodeIDList{successNode.ID()}, - Fails: storj.NodeIDList{failedNode.ID()}, - Offlines: storj.NodeIDList{offlineNode.ID()}, - PieceAudits: []*audit.ReverificationJob{&pending}, - Unknown: storj.NodeIDList{unknownNode.ID()}, + Successes: storj.NodeIDList{successNode.ID()}, + Fails: storj.NodeIDList{failedNode.ID()}, + Offlines: storj.NodeIDList{offlineNode.ID()}, + PendingAudits: []*audit.ReverificationJob{&pending}, + Unknown: storj.NodeIDList{unknownNode.ID()}, } audits.Reporter.RecordAudits(ctx, report) diff --git a/satellite/audit/reverify_test.go b/satellite/audit/reverify_test.go index 006520460..ce3953443 100644 --- a/satellite/audit/reverify_test.go +++ b/satellite/audit/reverify_test.go @@ -62,7 +62,7 @@ func TestReverifySuccess(t *testing.T) { pieceIndex := testrand.Intn(len(segment.Pieces)) piece := segment.Pieces[pieceIndex] - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() pending := &audit.PieceLocator{ NodeID: piece.StorageNode, @@ -120,7 +120,7 @@ func TestReverifyFailMissingShare(t *testing.T) { }) require.NoError(t, err) - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() pieceIndex := testrand.Intn(len(segment.Pieces)) piece := segment.Pieces[pieceIndex] @@ -212,7 +212,7 @@ func TestReverifyOffline(t *testing.T) { require.NoError(t, err) // make sure that pending audit is not removed - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() _, err = containment.Get(ctx, pending.NodeID) require.NoError(t, err) }) @@ -273,7 +273,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) { satellite.Metabase.DB, dialer, satellite.Overlay.Service, - satellite.DB.NewContainment(), + satellite.DB.Containment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, @@ -304,7 +304,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) { require.NoError(t, err) // make sure that pending audit is not removed - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() _, err = containment.Get(ctx, pending.NodeID) require.NoError(t, err) }) @@ -371,7 +371,7 @@ func TestReverifyDeletedSegment(t *testing.T) { require.NoError(t, err) // expect that the node was removed from containment since the segment it was contained for has been deleted - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() _, err = containment.Get(ctx, piece.StorageNode) require.True(t, audit.ErrContainedNotFound.Has(err)) }) @@ -433,7 +433,7 @@ func TestReverifyModifiedSegment(t *testing.T) { PieceNum: int(piece.Number), } - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() err = audits.Reporter.ReportReverificationNeeded(ctx, pending) require.NoError(t, err) @@ -508,7 +508,7 @@ func TestReverifyReplacedSegment(t *testing.T) { PieceNum: int(piece.Number), } - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() err = audits.Reporter.ReportReverificationNeeded(ctx, pending) require.NoError(t, err) @@ -582,7 +582,7 @@ func TestReverifyExpired(t *testing.T) { // expect that the node was removed from containment since the segment it was // contained for has expired - _, err = satellite.DB.NewContainment().Get(ctx, piece.StorageNode) + _, err = satellite.DB.Containment().Get(ctx, piece.StorageNode) require.True(t, audit.ErrContainedNotFound.Has(err)) }) } @@ -631,7 +631,7 @@ func TestReverifySlowDownload(t *testing.T) { slowPiece := segment.Pieces[0] slowNode := slowPiece.StorageNode - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() pending := &audit.PieceLocator{ NodeID: slowNode, @@ -696,7 +696,7 @@ func TestReverifyUnknownError(t *testing.T) { badPiece := segment.Pieces[0] badNode := badPiece.StorageNode - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() pending := &audit.PieceLocator{ NodeID: badNode, @@ -768,7 +768,7 @@ func TestMaxReverifyCount(t *testing.T) { slowPiece := segment.Pieces[0] slowNode := slowPiece.StorageNode - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() pending := &audit.PieceLocator{ NodeID: slowNode, diff --git a/satellite/audit/verifier.go b/satellite/audit/verifier.go index be86207ab..994f87a9d 100644 --- a/satellite/audit/verifier.go +++ b/satellite/audit/verifier.go @@ -53,14 +53,13 @@ type Share struct { // // architecture: Worker type Verifier struct { - log *zap.Logger - metabase *metabase.DB - orders *orders.Service - auditor *identity.PeerIdentity - dialer rpc.Dialer - overlay *overlay.Service - // newContainment will be renamed to containment. - newContainment NewContainment + log *zap.Logger + metabase *metabase.DB + orders *orders.Service + auditor *identity.PeerIdentity + dialer rpc.Dialer + overlay *overlay.Service + containment Containment minBytesPerSecond memory.Size minDownloadTimeout time.Duration @@ -69,7 +68,7 @@ type Verifier struct { } // NewVerifier creates a Verifier. -func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, newContainment NewContainment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier { +func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier { return &Verifier{ log: log, metabase: metabase, @@ -77,7 +76,7 @@ func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, over auditor: id.PeerIdentity(), dialer: dialer, overlay: overlay, - newContainment: newContainment, + containment: containment, minBytesPerSecond: minBytesPerSecond, minDownloadTimeout: minDownloadTimeout, nowFn: time.Now, @@ -288,11 +287,11 @@ func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[ } return Report{ - Successes: successNodes, - Fails: failedNodes, - Offlines: offlineNodes, - PieceAudits: pendingAudits, - Unknown: unknownNodes, + Successes: successNodes, + Fails: failedNodes, + Offlines: offlineNodes, + PendingAudits: pendingAudits, + Unknown: unknownNodes, }, nil } @@ -359,7 +358,7 @@ func (verifier *Verifier) IdentifyContainedNodes(ctx context.Context, segment Se skipList = make(map[storj.NodeID]bool) for _, piece := range segmentInfo.Pieces { - _, err := verifier.newContainment.Get(ctx, piece.StorageNode) + _, err := verifier.containment.Get(ctx, piece.StorageNode) if err != nil { if ErrContainedNotFound.Has(err) { continue @@ -613,7 +612,7 @@ func recordStats(report Report, totalPieces int, verifyErr error) { numOffline := len(report.Offlines) numSuccessful := len(report.Successes) numFailed := len(report.Fails) - numContained := len(report.PieceAudits) + numContained := len(report.PendingAudits) numUnknown := len(report.Unknown) totalAudited := numSuccessful + numFailed + numOffline + numContained diff --git a/satellite/audit/verifier_test.go b/satellite/audit/verifier_test.go index b177e4f36..a043f760c 100644 --- a/satellite/audit/verifier_test.go +++ b/satellite/audit/verifier_test.go @@ -257,7 +257,7 @@ func TestDownloadSharesDialTimeout(t *testing.T) { satellite.Metabase.DB, dialer, satellite.Overlay.Service, - satellite.DB.NewContainment(), + satellite.DB.Containment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, @@ -332,7 +332,7 @@ func TestDownloadSharesDownloadTimeout(t *testing.T) { satellite.Metabase.DB, satellite.Dialer, satellite.Overlay.Service, - satellite.DB.NewContainment(), + satellite.DB.Containment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, @@ -390,7 +390,7 @@ func TestVerifierHappyPath(t *testing.T) { assert.Len(t, report.Successes, len(segment.Pieces)) assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, 0) - assert.Len(t, report.PieceAudits, 0) + assert.Len(t, report.PendingAudits, 0) }) } @@ -427,7 +427,7 @@ func TestVerifierExpired(t *testing.T) { assert.Len(t, report.Successes, 0) assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, 0) - assert.Len(t, report.PieceAudits, 0) + assert.Len(t, report.PendingAudits, 0) }) } @@ -470,7 +470,7 @@ func TestVerifierOfflineNode(t *testing.T) { assert.Len(t, report.Successes, len(segment.Pieces)-1) assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, 1) - assert.Len(t, report.PieceAudits, 0) + assert.Len(t, report.PendingAudits, 0) }) } @@ -515,7 +515,7 @@ func TestVerifierMissingPiece(t *testing.T) { assert.Len(t, report.Successes, origNumPieces-1) assert.Len(t, report.Fails, 1) assert.Len(t, report.Offlines, 0) - assert.Len(t, report.PieceAudits, 0) + assert.Len(t, report.PendingAudits, 0) }) } @@ -633,7 +633,7 @@ func TestVerifierDialTimeout(t *testing.T) { satellite.Metabase.DB, dialer, satellite.Overlay.Service, - satellite.DB.NewContainment(), + satellite.DB.Containment(), satellite.Orders.Service, satellite.Identity, minBytesPerSecond, @@ -645,7 +645,7 @@ func TestVerifierDialTimeout(t *testing.T) { assert.Len(t, report.Successes, 0) assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, len(segment.Pieces)) - assert.Len(t, report.PieceAudits, 0) + assert.Len(t, report.PendingAudits, 0) }) } @@ -680,7 +680,7 @@ func TestVerifierDeletedSegment(t *testing.T) { assert.Zero(t, report.Successes) assert.Zero(t, report.Fails) assert.Zero(t, report.Offlines) - assert.Zero(t, report.PieceAudits) + assert.Zero(t, report.PendingAudits) assert.Zero(t, report.Unknown) }) } @@ -731,7 +731,7 @@ func TestVerifierModifiedSegment(t *testing.T) { assert.Zero(t, report.Successes) assert.Zero(t, report.Fails) assert.Zero(t, report.Offlines) - assert.Zero(t, report.PieceAudits) + assert.Zero(t, report.PendingAudits) assert.Zero(t, report.Unknown) }) } @@ -769,7 +769,7 @@ func TestVerifierReplacedSegment(t *testing.T) { assert.Zero(t, report.Successes) assert.Zero(t, report.Fails) assert.Zero(t, report.Offlines) - assert.Zero(t, report.PieceAudits) + assert.Zero(t, report.PendingAudits) assert.Zero(t, report.Unknown) }) } @@ -816,7 +816,7 @@ func TestVerifierModifiedSegmentFailsOnce(t *testing.T) { require.Len(t, report.Fails, 1) assert.Equal(t, report.Fails[0], piece.StorageNode) assert.Len(t, report.Offlines, 0) - require.Len(t, report.PieceAudits, 0) + require.Len(t, report.PendingAudits, 0) }) } @@ -874,8 +874,8 @@ func TestVerifierSlowDownload(t *testing.T) { assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, 0) assert.Len(t, report.Unknown, 0) - require.Len(t, report.PieceAudits, 1) - assert.Equal(t, report.PieceAudits[0].Locator.NodeID, slowNode.ID()) + require.Len(t, report.PendingAudits, 1) + assert.Equal(t, report.PendingAudits[0].Locator.NodeID, slowNode.ID()) }) } @@ -925,7 +925,7 @@ func TestVerifierUnknownError(t *testing.T) { assert.Len(t, report.Successes, 3) assert.Len(t, report.Fails, 0) assert.Len(t, report.Offlines, 0) - assert.Len(t, report.PieceAudits, 0) + assert.Len(t, report.PendingAudits, 0) require.Len(t, report.Unknown, 1) assert.Equal(t, report.Unknown[0], badNode.ID()) }) @@ -1136,7 +1136,7 @@ func TestIdentifyContainedNodes(t *testing.T) { // mark a node as contained containedNode := segment.Pieces[0].StorageNode - containment := satellite.DB.NewContainment() + containment := satellite.DB.Containment() err = containment.Insert(ctx, &audit.PieceLocator{ StreamID: testrand.UUID(), NodeID: containedNode, diff --git a/satellite/core.go b/satellite/core.go index 6f68e63a4..edecc193e 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -415,7 +415,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Metainfo.Metabase, dialer, peer.Overlay.Service, - peer.DB.NewContainment(), + peer.DB.Containment(), peer.Orders.Service, peer.Identity, config.MinBytesPerSecond, @@ -429,7 +429,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"), peer.Reputation.Service, peer.Overlay.Service, - peer.DB.NewContainment(), + peer.DB.Containment(), config.MaxRetriesStatDB, int32(config.MaxReverifyCount), ) diff --git a/satellite/peer.go b/satellite/peer.go index ccfc285cd..3e3dab1bd 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -112,9 +112,8 @@ type DB interface { OIDC() oidc.DB // Orders returns database for orders Orders() orders.DB - // NewContainment is temporary and will be renamed to Containment later in the commit chain. // Containment returns database for containment - NewContainment() audit.NewContainment + Containment() audit.Containment // Buckets returns the database to interact with buckets Buckets() buckets.DB // GracefulExit returns database for graceful exit diff --git a/satellite/repairer.go b/satellite/repairer.go index 675c294de..7bdec9d7e 100644 --- a/satellite/repairer.go +++ b/satellite/repairer.go @@ -85,7 +85,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, overlayCache overlay.DB, nodeEvents nodeevents.DB, reputationdb reputation.DB, - newContainmentDB audit.NewContainment, + containmentDB audit.Containment, rollupsWriteCache *orders.RollupsWriteCache, versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel, ) (*Repairer, error) { @@ -219,7 +219,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity, log.Named("reporter"), peer.Reputation, peer.Overlay, - newContainmentDB, + containmentDB, config.Audit.MaxRetriesStatDB, int32(config.Audit.MaxReverifyCount)) } diff --git a/satellite/satellitedb/containment.go b/satellite/satellitedb/containment.go index fd994beda..4da32ba14 100644 --- a/satellite/satellitedb/containment.go +++ b/satellite/satellitedb/containment.go @@ -10,18 +10,17 @@ import ( "storj.io/storj/satellite/audit" ) -// newContainment will be renamed to containment. -type newContainment struct { +type containment struct { reverifyQueue audit.ReverifyQueue } -var _ audit.NewContainment = &newContainment{} +var _ audit.Containment = &containment{} // Get gets a pending reverification audit by node id. If there are // multiple pending reverification audits, an arbitrary one is returned. // If there are none, an error wrapped by audit.ErrContainedNotFound is // returned. -func (containment *newContainment) Get(ctx context.Context, id pb.NodeID) (_ *audit.ReverificationJob, err error) { +func (containment *containment) Get(ctx context.Context, id pb.NodeID) (_ *audit.ReverificationJob, err error) { defer mon.Task()(&ctx)(&err) if id.IsZero() { return nil, audit.ContainError.New("node ID empty") @@ -31,7 +30,7 @@ func (containment *newContainment) Get(ctx context.Context, id pb.NodeID) (_ *au } // Insert creates a new pending audit entry. -func (containment *newContainment) Insert(ctx context.Context, pendingJob *audit.PieceLocator) (err error) { +func (containment *containment) Insert(ctx context.Context, pendingJob *audit.PieceLocator) (err error) { defer mon.Task()(&ctx)(&err) return containment.reverifyQueue.Insert(ctx, pendingJob) @@ -41,7 +40,7 @@ func (containment *newContainment) Insert(ctx context.Context, pendingJob *audit // was successful or because the job is no longer necessary. The wasDeleted // return value indicates whether the indicated job was actually deleted (if // not, there was no such job in the queue). -func (containment *newContainment) Delete(ctx context.Context, pendingJob *audit.PieceLocator) (isDeleted, nodeStillContained bool, err error) { +func (containment *containment) Delete(ctx context.Context, pendingJob *audit.PieceLocator) (isDeleted, nodeStillContained bool, err error) { defer mon.Task()(&ctx)(&err) isDeleted, err = containment.reverifyQueue.Remove(ctx, pendingJob) diff --git a/satellite/satellitedb/database.go b/satellite/satellitedb/database.go index 15c10b6be..d62ced545 100644 --- a/satellite/satellitedb/database.go +++ b/satellite/satellitedb/database.go @@ -276,9 +276,10 @@ func (dbc *satelliteDBCollection) Orders() orders.DB { return &ordersDB{db: db} } -// NewContainment will be renamed to Containment later in the commit chain. -func (dbc *satelliteDBCollection) NewContainment() audit.NewContainment { - return &newContainment{reverifyQueue: dbc.ReverifyQueue()} +// Containment returns database for storing pending audit info. +// It does all of its work by way of the ReverifyQueue. +func (dbc *satelliteDBCollection) Containment() audit.Containment { + return &containment{reverifyQueue: dbc.ReverifyQueue()} } // GracefulExit returns database for graceful exit.