satellite/{repair,audit}: simplify reputation reporter

Also, make it an interface so that the upcoming write cache can be
dropped in to the same place.

Change-Id: I2c286743825e647c0cef5b6578245391851fa10c
This commit is contained in:
paul cannon 2022-04-11 11:47:14 -05:00
parent 648d6bf0a7
commit fd01c6cc25
7 changed files with 49 additions and 94 deletions

View File

@ -129,7 +129,7 @@ type Satellite struct {
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter *audit.Reporter
Reporter audit.Reporter
}
Reputation struct {

View File

@ -14,10 +14,10 @@ import (
"storj.io/storj/satellite/reputation"
)
// Reporter records audit reports in overlay and implements the reporter interface.
// reporter records audit reports in overlay and implements the Reporter interface.
//
// architecture: Service
type Reporter struct {
type reporter struct {
log *zap.Logger
reputations *reputation.Service
containment Containment
@ -25,6 +25,11 @@ type Reporter struct {
maxReverifyCount int32
}
// Reporter records audit reports in the overlay and database.
type Reporter interface {
RecordAudits(ctx context.Context, req Report) (_ Report, err error)
}
// Report contains audit result.
// It records whether an audit is able to be completed, the total number of
// pieces a given audit has conducted for, lists for nodes that
@ -40,8 +45,8 @@ type Report struct {
}
// NewReporter instantiates a reporter.
func NewReporter(log *zap.Logger, reputations *reputation.Service, containment Containment, maxRetries int, maxReverifyCount int32) *Reporter {
return &Reporter{
func NewReporter(log *zap.Logger, reputations *reputation.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter {
return &reporter{
log: log,
reputations: reputations,
containment: containment,
@ -53,7 +58,7 @@ func NewReporter(log *zap.Logger, reputations *reputation.Service, containment C
// RecordAudits saves audit results to overlay. When no error, it returns
// nil for both return values, otherwise it returns the report with the fields
// set to the values which have been saved and the error.
func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Report, err error) {
func (reporter *reporter) RecordAudits(ctx context.Context, req Report) (_ Report, err error) {
defer mon.Task()(&ctx)(&err)
successes := req.Successes
@ -81,36 +86,16 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Repor
errlist = errs.Group{}
if len(successes) > 0 {
successes, err = reporter.recordAuditSuccessStatus(ctx, successes, nodesReputation)
if err != nil {
errlist.Add(err)
}
}
if len(fails) > 0 {
fails, err = reporter.recordAuditFailStatus(ctx, fails, nodesReputation)
if err != nil {
errlist.Add(err)
}
}
if len(unknowns) > 0 {
unknowns, err = reporter.recordAuditUnknownStatus(ctx, unknowns, nodesReputation)
if err != nil {
errlist.Add(err)
}
}
if len(offlines) > 0 {
offlines, err = reporter.recordOfflineStatus(ctx, offlines, nodesReputation)
if err != nil {
errlist.Add(err)
}
}
if len(pendingAudits) > 0 {
pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits, nodesReputation)
if err != nil {
errlist.Add(err)
}
}
successes, err = reporter.recordAuditStatus(ctx, successes, nodesReputation, reputation.AuditSuccess)
errlist.Add(err)
fails, err = reporter.recordAuditStatus(ctx, fails, nodesReputation, reputation.AuditFailure)
errlist.Add(err)
unknowns, err = reporter.recordAuditStatus(ctx, unknowns, nodesReputation, reputation.AuditUnknown)
errlist.Add(err)
offlines, err = reporter.recordAuditStatus(ctx, offlines, nodesReputation, reputation.AuditOffline)
errlist.Add(err)
pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits, nodesReputation)
errlist.Add(err)
tries++
}
@ -128,70 +113,25 @@ func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Repor
return Report{}, nil
}
// TODO record* methods can be consolidated to reduce code duplication
// recordAuditFailStatus updates nodeIDs in overlay with isup=true, auditoutcome=fail.
func (reporter *Reporter) recordAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) {
func (reporter *reporter) recordAuditStatus(ctx context.Context, nodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus, auditOutcome reputation.AuditType) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
var errors errs.Group
for _, nodeID := range failedAuditNodeIDs {
err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditFailure)
if err != nil {
failed = append(failed, nodeID)
errors.Add(errs.Combine(Error.New("failed to record audit fail status in overlay for node %s", nodeID.String()), err))
}
if len(nodeIDs) == 0 {
return nil, nil
}
return failed, errors.Err()
}
// recordAuditUnknownStatus updates nodeIDs in overlay with isup=true, auditoutcome=unknown.
func (reporter *Reporter) recordAuditUnknownStatus(ctx context.Context, unknownAuditNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
var errors errs.Group
for _, nodeID := range unknownAuditNodeIDs {
err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditUnknown)
for _, nodeID := range nodeIDs {
err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], auditOutcome)
if err != nil {
failed = append(failed, nodeID)
errors.Add(errs.Combine(Error.New("failed to record audit unknown status in overlay for node %s", nodeID.String()), err))
}
}
return failed, errors.Err()
}
// recordOfflineStatus updates nodeIDs in overlay with isup=false, auditoutcome=offline.
func (reporter *Reporter) recordOfflineStatus(ctx context.Context, offlineNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
var errors errs.Group
for _, nodeID := range offlineNodeIDs {
err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditOffline)
if err != nil {
failed = append(failed, nodeID)
errors.Add(errs.Combine(Error.New("failed to record audit offline status in overlay for node %s", nodeID.String()), err))
}
}
return failed, errors.Err()
}
// recordAuditSuccessStatus updates nodeIDs in overlay with isup=true, auditoutcome=success.
func (reporter *Reporter) recordAuditSuccessStatus(ctx context.Context, successNodeIDs storj.NodeIDList, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
var errors errs.Group
for _, nodeID := range successNodeIDs {
err = reporter.reputations.ApplyAudit(ctx, nodeID, nodesReputation[nodeID], reputation.AuditSuccess)
if err != nil {
failed = append(failed, nodeID)
errors.Add(errs.Combine(Error.New("failed to record audit success status in overlay for node %s", nodeID.String()), err))
errors.Add(Error.New("failed to record audit status %s in overlay for node %s: %w", auditOutcome.String(), nodeID.String(), err))
}
}
return failed, errors.Err()
}
// recordPendingAudits updates the containment status of nodes with pending audits.
func (reporter *Reporter) recordPendingAudits(ctx context.Context, pendingAudits []*PendingAudit, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*PendingAudit, err error) {
func (reporter *reporter) recordPendingAudits(ctx context.Context, pendingAudits []*PendingAudit, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*PendingAudit, err error) {
defer mon.Task()(&ctx)(&err)
var errlist errs.Group

View File

@ -36,13 +36,13 @@ type Worker struct {
log *zap.Logger
queues *Queues
verifier *Verifier
reporter *Reporter
reporter Reporter
Loop *sync2.Cycle
limiter *sync2.Limiter
}
// NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error) {
func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Reporter, config Config) (*Worker, error) {
return &Worker{
log: log,

View File

@ -101,7 +101,7 @@ type Core struct {
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reporter *audit.Reporter
Reporter audit.Reporter
}
ExpiredDeletion struct {

View File

@ -65,7 +65,7 @@ type SegmentRepairer struct {
overlay *overlay.Service
ec *ECRepairer
timeout time.Duration
reporter *audit.Reporter
reporter audit.Reporter
// multiplierOptimalThreshold is the value that multiplied by the optimal
// threshold results in the maximum limit of number of nodes to upload
@ -90,7 +90,7 @@ func NewSegmentRepairer(
metabase *metabase.DB,
orders *orders.Service,
overlay *overlay.Service,
reporter *audit.Reporter,
reporter audit.Reporter,
ecRepairer *ECRepairer,
repairOverrides checker.RepairOverrides,
timeout time.Duration, excessOptimalThreshold float64,

View File

@ -64,7 +64,7 @@ type Repairer struct {
}
Audit struct {
Reporter *audit.Reporter
Reporter audit.Reporter
}
EcRepairer *repairer.ECRepairer

View File

@ -4,6 +4,7 @@
package reputation
import (
"fmt"
"time"
"github.com/spacemonkeygo/monkit/v3"
@ -74,3 +75,17 @@ const (
// AuditOffline represents an audit where a node was offline.
AuditOffline
)
func (auditType AuditType) String() string {
switch auditType {
case AuditSuccess:
return "AuditSuccess"
case AuditFailure:
return "AuditFailure"
case AuditUnknown:
return "AuditUnknown"
case AuditOffline:
return "AuditOffline"
}
return fmt.Sprintf("<unregistered audittype %d>", auditType)
}