satellite/repair: update audit records during repair

Change-Id: I788b2096968f043601aba6502a2e4e784f1f02a0
This commit is contained in:
Yaroslav Vorobiov 2021-08-03 16:21:27 +03:00 committed by Yingrong Zhao
parent 030ab669fa
commit 469ae72c19
8 changed files with 1697 additions and 168 deletions

View File

@ -69,6 +69,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
db.Buckets(),
db.OverlayCache(),
db.Reputation(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,

View File

@ -658,7 +658,7 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Reputation(), rollupsWriteCache, versionInfo, &config, nil)
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil)
}
type rollupsWriteCacheCloser struct {

66
satellite/audit/pieces.go Normal file
View File

@ -0,0 +1,66 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
"github.com/zeebo/errs"
"storj.io/common/errs2"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/storj/satellite/metabase"
)
// PieceAudit is piece audit status.
type PieceAudit int
const (
// PieceAuditUnknown is unknown piece audit.
PieceAuditUnknown PieceAudit = iota
// PieceAuditFailure is failed piece audit.
PieceAuditFailure
// PieceAuditOffline is offline node piece audit.
PieceAuditOffline
// PieceAuditContained is online but unresponsive node piece audit.
PieceAuditContained
// PieceAuditSuccess is successful piece audit.
PieceAuditSuccess
)
// Pieces contains pieces structured by piece audit.
type Pieces struct {
Successful metabase.Pieces
Failed metabase.Pieces
Offline metabase.Pieces
Contained metabase.Pieces
Unknown metabase.Pieces
}
// PieceAuditFromErr returns piece audit based on error.
func PieceAuditFromErr(err error) PieceAudit {
if err == nil {
return PieceAuditSuccess
}
if rpc.Error.Has(err) {
switch {
case errs.Is(err, context.DeadlineExceeded), errs2.IsRPC(err, rpcstatus.Unknown):
return PieceAuditOffline
default:
// TODO: is this path not reachable?
return PieceAuditUnknown
}
}
switch {
case errs2.IsRPC(err, rpcstatus.NotFound):
return PieceAuditFailure
case errs2.IsRPC(err, rpcstatus.DeadlineExceeded):
return PieceAuditContained
default:
return PieceAuditUnknown
}
}

View File

@ -0,0 +1,69 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package audit_test
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/storj/satellite/audit"
)
func TestPieceAuditFromErr(t *testing.T) {
cases := []struct {
Error error
Audit audit.PieceAudit
}{
{
Error: nil,
Audit: audit.PieceAuditSuccess,
},
{
Error: rpc.Error.Wrap(context.DeadlineExceeded),
Audit: audit.PieceAuditOffline,
},
{
Error: rpc.Error.New("unknown rpc error"),
Audit: audit.PieceAuditOffline,
},
{
Error: rpc.Error.Wrap(rpcstatus.Error(rpcstatus.InvalidArgument, "rpc wrapped rpcstatus invalid arg error")),
Audit: audit.PieceAuditOffline,
},
{
Error: rpc.Error.Wrap(rpcstatus.Error(rpcstatus.NotFound, "rpc wrapped rpcstatus not found error")),
// TODO: should not this be failure?
Audit: audit.PieceAuditOffline,
},
{
Error: rpcstatus.Error(rpcstatus.NotFound, "rpcstatus not found error"),
Audit: audit.PieceAuditFailure,
},
{
Error: context.DeadlineExceeded,
Audit: audit.PieceAuditContained,
},
{
Error: rpcstatus.Error(rpcstatus.DeadlineExceeded, "deadline exceeded rpcstatus error"),
Audit: audit.PieceAuditContained,
},
{
Error: errs.New("unknown error"),
Audit: audit.PieceAuditUnknown,
},
{
Error: rpcstatus.Error(rpcstatus.Unknown, "unknown rpcstatus error"),
Audit: audit.PieceAuditUnknown,
},
}
for _, c := range cases {
pieceAudit := audit.PieceAuditFromErr(c.Error)
require.Equal(t, c.Audit, pieceAudit, c.Error)
}
}

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,8 @@ import (
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase"
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/piecestore"
)
@ -63,17 +65,17 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
// If piece hash verification fails, it will return all failed node IDs.
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedIPsAndPorts map[storj.NodeID]string, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedPieces []*pb.RemotePiece, err error) {
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedIPsAndPorts map[storj.NodeID]string, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ audit.Pieces, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != es.TotalCount() {
return nil, nil, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
return nil, audit.Pieces{}, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
}
nonNilLimits := nonNilCount(limits)
if nonNilLimits < es.RequiredCount() {
return nil, nil, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
return nil, audit.Pieces{}, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
}
pieceSize := eestream.CalcPieceSize(dataSize, es)
@ -81,6 +83,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
var successfulPieces, inProgress int
unusedLimits := nonNilLimits
pieceReaders := make(map[int]io.ReadCloser)
var pieces audit.Pieces
limiter := sync2.NewLimiter(es.RequiredCount())
cond := sync.NewCond(&sync.Mutex{})
@ -129,26 +132,54 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
}
pieceReadCloser, err := ec.downloadAndVerifyPiece(ctx, limit, address, privateKey, pieceSize)
// if piecestore dial with last ip:port failed try again with node address
if triedLastIPPort && piecestore.Error.Has(err) {
pieceReadCloser, err = ec.downloadAndVerifyPiece(ctx, limit, limit.GetStorageNodeAddress().GetAddress(), privateKey, pieceSize)
}
cond.L.Lock()
inProgress--
piece := metabase.Piece{
Number: uint16(currentLimitIndex),
StorageNode: limit.GetLimit().StorageNodeId,
}
if err != nil {
// gather nodes where the calculated piece hash doesn't match the uplink signed piece hash
if ErrPieceHashVerifyFailed.Has(err) {
ec.log.Info("audit failed", zap.Stringer("node ID", limit.GetLimit().StorageNodeId),
zap.String("reason", err.Error()))
failedPieces = append(failedPieces, &pb.RemotePiece{
PieceNum: int32(currentLimitIndex),
NodeId: limit.GetLimit().StorageNodeId,
})
} else {
ec.log.Debug("Failed to download pieces for repair",
zap.Error(err))
pieces.Failed = append(pieces.Failed, piece)
return
}
pieceAudit := audit.PieceAuditFromErr(err)
switch pieceAudit {
case audit.PieceAuditFailure:
ec.log.Debug("Failed to download pieces for repair: piece not found (audit failed)",
zap.Stringer("Node ID", limit.GetLimit().StorageNodeId),
zap.Error(err))
pieces.Failed = append(pieces.Failed, piece)
case audit.PieceAuditOffline:
ec.log.Debug("Failed to download pieces for repair: dial timeout (offline)",
zap.Stringer("Node ID", limit.GetLimit().StorageNodeId),
zap.Error(err))
pieces.Offline = append(pieces.Offline, piece)
case audit.PieceAuditContained:
ec.log.Info("Failed to download pieces for repair: download timeout (contained)",
zap.Stringer("Node ID", limit.GetLimit().StorageNodeId),
zap.Error(err))
pieces.Contained = append(pieces.Contained, piece)
case audit.PieceAuditUnknown:
ec.log.Info("Failed to download pieces for repair: unknown transport error (skipped)",
zap.Stringer("Node ID", limit.GetLimit().StorageNodeId),
zap.Error(err))
pieces.Unknown = append(pieces.Unknown, piece)
}
mu.Lock()
errlist.Add(fmt.Errorf("node id: %s, error: %w", limit.GetLimit().StorageNodeId.String(), err))
mu.Unlock()
@ -156,8 +187,8 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
}
pieceReaders[currentLimitIndex] = pieceReadCloser
pieces.Successful = append(pieces.Successful, piece)
successfulPieces++
return
}
})
@ -167,7 +198,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
if successfulPieces < es.RequiredCount() {
mon.Meter("download_failed_not_enough_pieces_repair").Mark(1) //mon:locked
return nil, failedPieces, &irreparableError{
return nil, pieces, &irreparableError{
piecesAvailable: int32(successfulPieces),
piecesRequired: int32(es.RequiredCount()),
errlist: errlist,
@ -176,7 +207,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
fec, err := infectious.NewFEC(es.RequiredCount(), es.TotalCount())
if err != nil {
return nil, failedPieces, Error.Wrap(err)
return nil, pieces, Error.Wrap(err)
}
esScheme := eestream.NewUnsafeRSScheme(fec, es.ErasureShareSize())
@ -185,7 +216,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
ctx, cancel := context.WithCancel(ctx)
decodeReader := eestream.DecodeReaders2(ctx, cancel, pieceReaders, esScheme, expectedSize, 0, false)
return decodeReader, failedPieces, nil
return decodeReader, pieces, nil
}
// downloadAndVerifyPiece downloads a piece from a storagenode,

View File

@ -14,15 +14,13 @@ import (
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/queue"
"storj.io/storj/satellite/reputation"
"storj.io/uplink/private/eestream"
)
@ -34,6 +32,12 @@ var (
orderLimitFailureError = errs.Class("order limits failure")
repairReconstructError = errs.Class("repair reconstruction failure")
repairPutError = errs.Class("repair could not store repaired pieces")
// segmentVerificationError is the errs class when the repaired segment can not be verified during repair.
segmentVerificationError = errs.Class("segment verification failed")
// segmentDeletedError is the errs class when the repaired segment was deleted during the repair.
segmentDeletedError = errs.Class("segment deleted during repair")
// segmentModifiedError is the errs class used when a segment has been changed in any way.
segmentModifiedError = errs.Class("segment has been modified")
)
// irreparableError identifies situations where a segment could not be repaired due to reasons
@ -56,9 +60,9 @@ type SegmentRepairer struct {
metabase *metabase.DB
orders *orders.Service
overlay *overlay.Service
reputation *reputation.Service
ec *ECRepairer
timeout time.Duration
reporter *audit.Reporter
// multiplierOptimalThreshold is the value that multiplied by the optimal
// threshold results in the maximum limit of number of nodes to upload
@ -68,7 +72,8 @@ type SegmentRepairer struct {
// repairOverrides is the set of values configured by the checker to override the repair threshold for various RS schemes.
repairOverrides checker.RepairOverridesMap
nowFn func() time.Time
nowFn func() time.Time
OnTestingCheckSegmentAlteredHook func()
}
// NewSegmentRepairer creates a new instance of SegmentRepairer.
@ -77,11 +82,14 @@ type SegmentRepairer struct {
// threshould to determine the maximum limit of nodes to upload repaired pieces,
// when negative, 0 is applied.
func NewSegmentRepairer(
log *zap.Logger, metabase *metabase.DB, orders *orders.Service,
overlay *overlay.Service, reputation *reputation.Service, dialer rpc.Dialer,
log *zap.Logger,
metabase *metabase.DB,
orders *orders.Service,
overlay *overlay.Service,
reporter *audit.Reporter,
ecRepairer *ECRepairer,
repairOverrides checker.RepairOverrides,
timeout time.Duration, excessOptimalThreshold float64,
repairOverrides checker.RepairOverrides, downloadTimeout time.Duration,
inMemoryRepair bool, satelliteSignee signing.Signee,
) *SegmentRepairer {
if excessOptimalThreshold < 0 {
@ -94,11 +102,11 @@ func NewSegmentRepairer(
metabase: metabase,
orders: orders,
overlay: overlay,
reputation: reputation,
ec: NewECRepairer(log.Named("ec repairer"), dialer, satelliteSignee, downloadTimeout, inMemoryRepair),
ec: ecRepairer,
timeout: timeout,
multiplierOptimalThreshold: 1 + excessOptimalThreshold,
repairOverrides: repairOverrides.GetMap(),
reporter: reporter,
nowFn: time.Now,
}
@ -273,33 +281,50 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}
// Download the segment using just the healthy pieces
segmentReader, pbFailedPieces, err := repairer.ec.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
// Populate node IDs that failed piece hashes verification
var failedNodeIDs storj.NodeIDList
for _, piece := range pbFailedPieces {
failedNodeIDs = append(failedNodeIDs, piece.NodeId)
}
// TODO refactor repairer.ec.Get?
failedPieces := make(metabase.Pieces, len(pbFailedPieces))
for i, piece := range pbFailedPieces {
failedPieces[i] = metabase.Piece{
Number: uint16(piece.PieceNum),
StorageNode: piece.NodeId,
}
}
segmentReader, piecesReport, err := repairer.ec.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
// ensure we get values, even if only zero values, so that redash can have an alert based on this
mon.Meter("repair_too_many_nodes_failed").Mark(0) //mon:locked
stats.repairTooManyNodesFailed.Mark(0)
// update audit status for nodes that failed piece hash verification during downloading
failedNum, updateErr := repairer.updateAuditFailStatus(ctx, failedNodeIDs)
if updateErr != nil || failedNum > 0 {
// failed updates should not affect repair, therefore we will not return the error
repairer.log.Debug("failed to update audit fail status", zap.Int("Failed Update Number", failedNum), zap.Error(updateErr))
// Check if segment has been altered
checkSegmentError := repairer.checkIfSegmentAltered(ctx, segment)
if checkSegmentError != nil {
if segmentDeletedError.Has(checkSegmentError) {
// mon.Meter("segment_deleted_during_repair").Mark(1) //mon:locked
repairer.log.Debug("segment deleted during Repair")
return true, nil
}
if segmentModifiedError.Has(checkSegmentError) {
// mon.Meter("segment_modified_during_repair").Mark(1) //mon:locked
repairer.log.Debug("segment modified during Repair")
return true, nil
}
return false, segmentVerificationError.Wrap(checkSegmentError)
}
if len(piecesReport.Contained) > 0 {
repairer.log.Debug("unexpected contained pieces during repair", zap.Int("count", len(piecesReport.Contained)))
}
var report audit.Report
for _, piece := range piecesReport.Successful {
report.Successes = append(report.Successes, piece.StorageNode)
}
for _, piece := range piecesReport.Failed {
report.Fails = append(report.Fails, piece.StorageNode)
}
for _, piece := range piecesReport.Offline {
report.Offlines = append(report.Offlines, piece.StorageNode)
}
for _, piece := range piecesReport.Unknown {
report.Unknown = append(report.Unknown, piece.StorageNode)
}
_, reportErr := repairer.reporter.RecordAudits(ctx, report)
if reportErr != nil {
// failed updates should not affect repair, therefore we will not return the error
repairer.log.Debug("failed to record audit", zap.Error(reportErr))
}
if err != nil {
// If the context was closed during the Get phase, it will appear here as though
// we just failed to download enough pieces to reconstruct the segment. Check for
@ -398,7 +423,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}
// add pieces that failed piece hashes verification to the removal list
toRemove = append(toRemove, failedPieces...)
toRemove = append(toRemove, piecesReport.Failed...)
newPieces, err := segment.Pieces.Update(repairedPieces, toRemove)
if err != nil {
@ -443,6 +468,31 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return true, nil
}
// checkIfSegmentAltered checks if oldSegment has been altered since it was selected for audit.
func (repairer *SegmentRepairer) checkIfSegmentAltered(ctx context.Context, oldSegment metabase.Segment) (err error) {
defer mon.Task()(&ctx)(&err)
if repairer.OnTestingCheckSegmentAlteredHook != nil {
repairer.OnTestingCheckSegmentAlteredHook()
}
newSegment, err := repairer.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: oldSegment.StreamID,
Position: oldSegment.Position,
})
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
return segmentDeletedError.New("StreamID: %q Position: %d", oldSegment.StreamID.String(), oldSegment.Position.Encode())
}
return err
}
if !oldSegment.Pieces.Equal(newSegment.Pieces) {
return segmentModifiedError.New("StreamID: %q Position: %d", oldSegment.StreamID.String(), oldSegment.Position.Encode())
}
return nil
}
func (repairer *SegmentRepairer) getStatsByRS(redundancy *pb.RedundancyScheme) *stats {
rsString := getRSString(repairer.loadRedundancy(redundancy))
return repairer.statsCollector.getStatsByRS(rsString)
@ -457,22 +507,6 @@ func (repairer *SegmentRepairer) loadRedundancy(redundancy *pb.RedundancyScheme)
return int(redundancy.MinReq), repair, int(redundancy.SuccessThreshold), int(redundancy.Total)
}
func (repairer *SegmentRepairer) updateAuditFailStatus(ctx context.Context, failedAuditNodeIDs storj.NodeIDList) (failedNum int, err error) {
var errGroup errs.Group
for _, nodeID := range failedAuditNodeIDs {
err := repairer.reputation.ApplyAudit(ctx, nodeID, reputation.AuditFailure)
if err != nil {
failedNum++
errGroup.Add(err)
continue
}
}
if failedNum > 0 {
return failedNum, errs.Combine(Error.New("failed to update some audit fail statuses in overlay"), errGroup.Err())
}
return 0, nil
}
// SetNow allows tests to have the server act as if the current time is whatever they want.
func (repairer *SegmentRepairer) SetNow(nowFn func() time.Time) {
repairer.nowFn = nowFn

View File

@ -23,6 +23,7 @@ import (
"storj.io/private/version"
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/audit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
@ -61,6 +62,12 @@ type Repairer struct {
Service *orders.Service
Chore *orders.Chore
}
Audit struct {
Reporter *audit.Reporter
}
EcRepairer *repairer.ECRepairer
SegmentRepairer *repairer.SegmentRepairer
Repairer *repairer.Service
}
@ -68,10 +75,15 @@ type Repairer struct {
// NewRepairer creates a new repairer peer.
func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
metabaseDB *metabase.DB,
revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue,
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB,
reputationdb reputation.DB, rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Repairer, error) {
revocationDB extensions.RevocationDB,
repairQueue queue.RepairQueue,
bucketsDB metainfo.BucketsDB,
overlayCache overlay.DB,
reputationdb reputation.DB,
containmentDB audit.Containment,
rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel,
) (*Repairer, error) {
peer := &Repairer{
Log: log,
Identity: full,
@ -176,20 +188,33 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
}
}
{ // setup audit
peer.Audit.Reporter = audit.NewReporter(
log.Named("reporter"),
peer.Reputation,
containmentDB,
config.Audit.MaxRetriesStatDB,
int32(config.Audit.MaxReverifyCount))
}
{ // setup repairer
peer.EcRepairer = repairer.NewECRepairer(
log.Named("ec-repair"),
peer.Dialer,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
config.Repairer.DownloadTimeout,
config.Repairer.InMemoryRepair)
peer.SegmentRepairer = repairer.NewSegmentRepairer(
log.Named("segment-repair"),
metabaseDB,
peer.Orders.Service,
peer.Overlay,
peer.Reputation,
peer.Dialer,
peer.Audit.Reporter,
peer.EcRepairer,
config.Checker.RepairOverrides,
config.Repairer.Timeout,
config.Repairer.MaxExcessRateOptimalThreshold,
config.Checker.RepairOverrides,
config.Repairer.DownloadTimeout,
config.Repairer.InMemoryRepair,
signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()),
)
peer.Repairer = repairer.NewService(log.Named("repairer"), repairQueue, &config.Repairer, peer.SegmentRepairer)