improve repair logs (#1999)
This commit is contained in:
parent
42562429f5
commit
d2c95c1d62
@ -30,7 +30,7 @@ var mon = monkit.Package()
|
|||||||
// Client defines an interface for storing erasure coded data to piece store nodes
|
// Client defines an interface for storing erasure coded data to piece store nodes
|
||||||
type Client interface {
|
type Client interface {
|
||||||
Put(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
|
Put(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
|
||||||
Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
|
Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration, path storj.Path) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)
|
||||||
Get(ctx context.Context, limits []*pb.AddressedOrderLimit, es eestream.ErasureScheme, size int64) (ranger.Ranger, error)
|
Get(ctx context.Context, limits []*pb.AddressedOrderLimit, es eestream.ErasureScheme, size int64) (ranger.Ranger, error)
|
||||||
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit) error
|
Delete(ctx context.Context, limits []*pb.AddressedOrderLimit) error
|
||||||
}
|
}
|
||||||
@ -172,7 +172,7 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
|
|||||||
return successfulNodes, successfulHashes, nil
|
return successfulNodes, successfulHashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
|
func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration, path storj.Path) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
if len(limits) != rs.TotalCount() {
|
if len(limits) != rs.TotalCount() {
|
||||||
@ -213,12 +213,12 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
|
|||||||
// how many nodes must be repaired to reach the success threshold: o - (n - r)
|
// how many nodes must be repaired to reach the success threshold: o - (n - r)
|
||||||
optimalCount := rs.OptimalThreshold() - (rs.TotalCount() - nonNilCount(limits))
|
optimalCount := rs.OptimalThreshold() - (rs.TotalCount() - nonNilCount(limits))
|
||||||
|
|
||||||
zap.S().Infof("Starting a timer for %s for repairing to %d nodes to reach the success threshold (%d nodes)...",
|
zap.S().Infof("Starting a timer for %s for repairing %s to %d nodes to reach the success threshold (%d nodes)...",
|
||||||
timeout, optimalCount, rs.OptimalThreshold())
|
timeout, path, optimalCount, rs.OptimalThreshold())
|
||||||
|
|
||||||
timer := time.AfterFunc(timeout, func() {
|
timer := time.AfterFunc(timeout, func() {
|
||||||
if ctx.Err() != context.Canceled {
|
if ctx.Err() != context.Canceled {
|
||||||
zap.S().Infof("Timer expired. Successfully repaired to %d nodes. Canceling the long tail...", atomic.LoadInt32(&successfulCount))
|
zap.S().Infof("Timer expired. Successfully repaired %s to %d nodes. Canceling the long tail...", path, atomic.LoadInt32(&successfulCount))
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -231,7 +231,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
|
|||||||
}
|
}
|
||||||
|
|
||||||
if info.err != nil {
|
if info.err != nil {
|
||||||
zap.S().Debugf("Repair to storage node %s failed: %v", limits[info.i].GetLimit().StorageNodeId, info.err)
|
zap.S().Debugf("Repair %s to storage node %s failed: %v", path, limits[info.i].GetLimit().StorageNodeId, info.err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -242,8 +242,8 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
|
|||||||
successfulHashes[info.i] = info.hash
|
successfulHashes[info.i] = info.hash
|
||||||
|
|
||||||
if int(atomic.AddInt32(&successfulCount, 1)) == optimalCount {
|
if int(atomic.AddInt32(&successfulCount, 1)) == optimalCount {
|
||||||
zap.S().Infof("Success threshold (%d nodes) reached by repairing to %d nodes. Canceling the long tail...",
|
zap.S().Infof("Success threshold (%d nodes) reached for %s by repairing to %d nodes. Canceling the long tail...",
|
||||||
rs.OptimalThreshold(), optimalCount)
|
rs.OptimalThreshold(), path, optimalCount)
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
@ -268,7 +268,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
if successfulCount < int32(optimalCount) {
|
if successfulCount < int32(optimalCount) {
|
||||||
return nil, nil, Error.New("successful nodes count (%d) does not match optimal count (%d) of erasure scheme", successfulCount, optimalCount)
|
return nil, nil, Error.New("successful nodes count (%d) for %s does not match optimal count (%d) of erasure scheme", successfulCount, path, optimalCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
return successfulNodes, successfulHashes, nil
|
return successfulNodes, successfulHashes, nil
|
||||||
|
@ -74,12 +74,12 @@ func (repairer *Repairer) Repair(ctx context.Context, path storj.Path) (err erro
|
|||||||
numHealthy := len(pieces) - len(missingPieces)
|
numHealthy := len(pieces) - len(missingPieces)
|
||||||
// irreparable piece
|
// irreparable piece
|
||||||
if int32(numHealthy) < pointer.Remote.Redundancy.MinReq {
|
if int32(numHealthy) < pointer.Remote.Redundancy.MinReq {
|
||||||
return Error.New("piece cannot be repaired")
|
return Error.New("piece %v cannot be repaired", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// repair not needed
|
// repair not needed
|
||||||
if (int32(numHealthy) >= pointer.Remote.Redundancy.MinReq) && (int32(numHealthy) > pointer.Remote.Redundancy.RepairThreshold) {
|
if (int32(numHealthy) >= pointer.Remote.Redundancy.MinReq) && (int32(numHealthy) > pointer.Remote.Redundancy.RepairThreshold) {
|
||||||
return nil
|
return Error.New("piece %v with %d pieces above repiar threshold %d", path, numHealthy, pointer.Remote.Redundancy.RepairThreshold)
|
||||||
}
|
}
|
||||||
|
|
||||||
lostPiecesSet := sliceToSet(missingPieces)
|
lostPiecesSet := sliceToSet(missingPieces)
|
||||||
@ -134,7 +134,7 @@ func (repairer *Repairer) Repair(ctx context.Context, path storj.Path) (err erro
|
|||||||
defer func() { err = errs.Combine(err, r.Close()) }()
|
defer func() { err = errs.Combine(err, r.Close()) }()
|
||||||
|
|
||||||
// Upload the repaired pieces
|
// Upload the repaired pieces
|
||||||
successfulNodes, hashes, err := repairer.ec.Repair(ctx, putLimits, redundancy, r, convertTime(expiration), repairer.timeout)
|
successfulNodes, hashes, err := repairer.ec.Repair(ctx, putLimits, redundancy, r, convertTime(expiration), repairer.timeout, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user