cmd/tools/segment-verify: small improvements

* better error handling when Exists method is not avaialble on SN
* more optimal processing of response from Exists method

Change-Id: I6d61c09473e9f5ab76a4601720e8bd520767f4c2
This commit is contained in:
Michal Niewrzal 2022-12-22 15:14:20 +01:00 committed by Storj Robot
parent 5d13a5e0ed
commit 4851b4e06d

View File

@ -87,7 +87,7 @@ func NewVerifier(log *zap.Logger, dialer rpc.Dialer, orders *orders.Service, con
func (service *NodeVerifier) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) { func (service *NodeVerifier) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) {
verifiedCount, err = service.VerifyWithExists(ctx, alias, target, targetVersion, segments) verifiedCount, err = service.VerifyWithExists(ctx, alias, target, targetVersion, segments)
// if Exists method is unimplemented or it is wrong node version fallback to download verification // if Exists method is unimplemented or it is wrong node version fallback to download verification
if !errs2.IsRPC(err, rpcstatus.Unimplemented) && !errWrongNodeVersion.Has(err) { if !methodUnimplemented(err) && !errWrongNodeVersion.Has(err) {
return verifiedCount, err return verifiedCount, err
} }
if err != nil { if err != nil {
@ -292,6 +292,10 @@ func (service *NodeVerifier) VerifyWithExists(ctx context.Context, alias metabas
return len(segments), nil return len(segments), nil
} }
func methodUnimplemented(err error) bool {
return errs2.IsRPC(err, rpcstatus.Unimplemented) || errs2.IsRPC(err, rpcstatus.Unknown)
}
// verifySegmentsWithExists calls the Exists endpoint on the specified target node for each segment. // verifySegmentsWithExists calls the Exists endpoint on the specified target node for each segment.
func (service *NodeVerifier) verifySegmentsWithExists(ctx context.Context, client pb.DRPCPiecestoreClient, alias metabase.NodeAlias, target storj.NodeURL, segments []*Segment) (err error) { func (service *NodeVerifier) verifySegmentsWithExists(ctx context.Context, client pb.DRPCPiecestoreClient, alias metabase.NodeAlias, target storj.NodeURL, segments []*Segment) (err error) {
pieceIds := make([]storj.PieceID, 0, len(segments)) pieceIds := make([]storj.PieceID, 0, len(segments))
@ -310,26 +314,30 @@ func (service *NodeVerifier) verifySegmentsWithExists(ctx context.Context, clien
return Error.Wrap(err) return Error.Wrap(err)
} }
for index := range pieceIds { if len(response.Missing) == 0 {
if missing(index, response.Missing) { for i := range segments {
segments[index].Status.MarkNotFound() segments[i].Status.MarkFound()
}
return nil
}
missing := make(map[uint32]struct{}, len(segments))
for _, m := range response.Missing {
missing[m] = struct{}{}
}
for i := range segments {
_, miss := missing[uint32(i)]
if miss {
segments[i].Status.MarkNotFound()
} else { } else {
segments[index].Status.MarkFound() segments[i].Status.MarkFound()
} }
} }
return nil return nil
} }
func missing(index int, missing []uint32) bool {
for _, m := range missing {
if uint32(index) == m {
return true
}
}
return false
}
// rateLimiter limits the rate of some type of event. It acts like a token // rateLimiter limits the rate of some type of event. It acts like a token
// bucket, allowing for bursting, as long as the _average_ interval between // bucket, allowing for bursting, as long as the _average_ interval between
// events over the lifetime of the rateLimiter is less than or equal to the // events over the lifetime of the rateLimiter is less than or equal to the