satellite/repair: move over audit.Pieces
This structure is entirely unused within the audit module, and is only used by repair code. Accordingly, this change moves the structure from audit code to repair code. Also, we take the opportunity here to rename the structure to something less generic. Refs: https://github.com/storj/storj/issues/4669 Change-Id: If85b37e08620cda1fde2afe98206293e02b5c36e
This commit is contained in:
parent
70eda67bc1
commit
7d0885bbaa
@ -11,7 +11,6 @@ import (
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
)
|
||||
|
||||
// PieceAudit is piece audit status.
|
||||
@ -30,15 +29,6 @@ const (
|
||||
PieceAuditSuccess
|
||||
)
|
||||
|
||||
// Pieces contains pieces structured by piece audit.
|
||||
type Pieces struct {
|
||||
Successful metabase.Pieces
|
||||
Failed metabase.Pieces
|
||||
Offline metabase.Pieces
|
||||
Contained metabase.Pieces
|
||||
Unknown metabase.Pieces
|
||||
}
|
||||
|
||||
// PieceAuditFromErr returns piece audit based on error.
|
||||
func PieceAuditFromErr(err error) PieceAudit {
|
||||
if err == nil {
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/repair/checker"
|
||||
@ -1020,7 +1019,7 @@ func TestMissingPieceDataRepair(t *testing.T) {
|
||||
}
|
||||
|
||||
var successful metabase.Pieces
|
||||
satellite.Repairer.SegmentRepairer.OnTestingPiecesReportHook = func(pieces audit.Pieces) {
|
||||
satellite.Repairer.SegmentRepairer.OnTestingPiecesReportHook = func(pieces repairer.FetchResultReport) {
|
||||
successful = pieces.Successful
|
||||
}
|
||||
|
||||
@ -1040,9 +1039,9 @@ func TestMissingPieceDataRepair(t *testing.T) {
|
||||
}
|
||||
|
||||
// repair shouldn't update audit status
|
||||
for _, piece := range successful {
|
||||
successfulNodeReputation := nodesReputation[piece.StorageNode]
|
||||
successfulNodeReputationAfter := nodesReputationAfter[piece.StorageNode]
|
||||
for _, result := range successful {
|
||||
successfulNodeReputation := nodesReputation[result.StorageNode]
|
||||
successfulNodeReputationAfter := nodesReputationAfter[result.StorageNode]
|
||||
require.Equal(t, successfulNodeReputation.TotalAuditCount, successfulNodeReputationAfter.TotalAuditCount)
|
||||
require.Equal(t, successfulNodeReputation.AuditSuccessCount, successfulNodeReputationAfter.AuditSuccessCount)
|
||||
require.Equal(t, successfulNodeReputation.AuditReputationAlpha, successfulNodeReputationAfter.AuditReputationAlpha)
|
||||
@ -1250,8 +1249,8 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
}
|
||||
|
||||
var successful metabase.Pieces
|
||||
satellite.Repairer.SegmentRepairer.OnTestingPiecesReportHook = func(pieces audit.Pieces) {
|
||||
successful = pieces.Successful
|
||||
satellite.Repairer.SegmentRepairer.OnTestingPiecesReportHook = func(report repairer.FetchResultReport) {
|
||||
successful = report.Successful
|
||||
}
|
||||
|
||||
satellite.Repair.Checker.Loop.Restart()
|
||||
@ -1270,9 +1269,9 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
|
||||
}
|
||||
|
||||
// repair shouldn't update audit status
|
||||
for _, piece := range successful {
|
||||
successfulNodeReputation := nodesReputation[piece.StorageNode]
|
||||
successfulNodeReputationAfter := nodesReputationAfter[piece.StorageNode]
|
||||
for _, result := range successful {
|
||||
successfulNodeReputation := nodesReputation[result.StorageNode]
|
||||
successfulNodeReputationAfter := nodesReputationAfter[result.StorageNode]
|
||||
require.Equal(t, successfulNodeReputation.TotalAuditCount, successfulNodeReputationAfter.TotalAuditCount)
|
||||
require.Equal(t, successfulNodeReputation.AuditSuccessCount, successfulNodeReputationAfter.AuditSuccessCount)
|
||||
require.Equal(t, successfulNodeReputation.AuditReputationAlpha, successfulNodeReputationAfter.AuditReputationAlpha)
|
||||
|
@ -72,17 +72,17 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
|
||||
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
|
||||
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
|
||||
// If piece hash verification fails, it will return all failed node IDs.
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ audit.Pieces, err error) {
|
||||
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ FetchResultReport, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(limits) != es.TotalCount() {
|
||||
return nil, audit.Pieces{}, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
|
||||
return nil, FetchResultReport{}, Error.New("number of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), es.TotalCount())
|
||||
}
|
||||
|
||||
nonNilLimits := nonNilCount(limits)
|
||||
|
||||
if nonNilLimits < es.RequiredCount() {
|
||||
return nil, audit.Pieces{}, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
|
||||
return nil, FetchResultReport{}, Error.New("number of non-nil limits (%d) is less than required count (%d) of erasure scheme", nonNilCount(limits), es.RequiredCount())
|
||||
}
|
||||
|
||||
pieceSize := eestream.CalcPieceSize(dataSize, es)
|
||||
@ -90,7 +90,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
|
||||
var successfulPieces, inProgress int
|
||||
unusedLimits := nonNilLimits
|
||||
pieceReaders := make(map[int]io.ReadCloser)
|
||||
var pieces audit.Pieces
|
||||
var pieces FetchResultReport
|
||||
|
||||
limiter := sync2.NewLimiter(es.RequiredCount())
|
||||
cond := sync.NewCond(&sync.Mutex{})
|
||||
|
@ -56,6 +56,16 @@ func (ie *irreparableError) Error() string {
|
||||
return fmt.Sprintf("%d available pieces < %d required", ie.piecesAvailable, ie.piecesRequired)
|
||||
}
|
||||
|
||||
// FetchResultReport contains a categorization of a set of pieces based on the results of
|
||||
// GET operations.
|
||||
type FetchResultReport struct {
|
||||
Successful metabase.Pieces
|
||||
Failed metabase.Pieces
|
||||
Offline metabase.Pieces
|
||||
Contained metabase.Pieces
|
||||
Unknown metabase.Pieces
|
||||
}
|
||||
|
||||
// SegmentRepairer for segments.
|
||||
type SegmentRepairer struct {
|
||||
log *zap.Logger
|
||||
@ -77,7 +87,7 @@ type SegmentRepairer struct {
|
||||
|
||||
nowFn func() time.Time
|
||||
OnTestingCheckSegmentAlteredHook func()
|
||||
OnTestingPiecesReportHook func(pieces audit.Pieces)
|
||||
OnTestingPiecesReportHook func(pieces FetchResultReport)
|
||||
}
|
||||
|
||||
// NewSegmentRepairer creates a new instance of SegmentRepairer.
|
||||
|
Loading…
Reference in New Issue
Block a user