satellite/repair: monkit improvements (#2773)

This commit is contained in:
Bryan White 2019-08-14 21:40:26 +02:00 committed by Stefan Benten
parent 5c61c79ab7
commit 1915b59af3

View File

@ -110,9 +110,7 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
successfulNodes = make([]*pb.Node, pieceCount)
successfulHashes = make([]*pb.PieceHash, pieceCount)
var successfulCount int32
var failures, cancelations int
var successfulCount, failureCount, cancellationCount int32
for range limits {
info := <-infos
@ -122,9 +120,9 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
if info.err != nil {
if !errs2.IsCanceled(info.err) {
failures++
failureCount++
} else {
cancelations++
cancellationCount++
}
ec.log.Sugar().Debugf("Upload to storage node %s failed: %v", limits[info.i].GetLimit().StorageNodeId, info.err)
continue
@ -157,11 +155,11 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
}()
successes := int(atomic.LoadInt32(&successfulCount))
mon.IntVal("segment_pieces_total").Observe(int64(pieceCount))
mon.IntVal("segment_pieces_optimal").Observe(int64(rs.OptimalThreshold()))
mon.IntVal("segment_pieces_successful").Observe(int64(successes))
mon.IntVal("segment_pieces_failed").Observe(int64(failures))
mon.IntVal("segment_pieces_canceled").Observe(int64(cancelations))
mon.IntVal("put_segment_pieces_total").Observe(int64(pieceCount))
mon.IntVal("put_segment_pieces_optimal").Observe(int64(rs.OptimalThreshold()))
mon.IntVal("put_segment_pieces_successful").Observe(int64(successes))
mon.IntVal("put_segment_pieces_failed").Observe(int64(failureCount))
mon.IntVal("put_segment_pieces_canceled").Observe(int64(cancellationCount))
if successes <= rs.RepairThreshold() && successes < rs.OptimalThreshold() {
return nil, nil, Error.New("successful puts (%d) less than or equal to repair threshold (%d)", successes, rs.RepairThreshold())
@ -177,8 +175,9 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, p
func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, expiration time.Time, timeout time.Duration, path storj.Path) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", len(limits), rs.TotalCount())
pieceCount := len(limits)
if pieceCount != rs.TotalCount() {
return nil, nil, Error.New("size of limits slice (%d) does not match total count (%d) of erasure scheme", pieceCount, rs.TotalCount())
}
if !unique(limits) {
@ -196,7 +195,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
err error
hash *pb.PieceHash
}
infos := make(chan info, len(limits))
infos := make(chan info, pieceCount)
psCtx, cancel := context.WithCancel(ctx)
defer cancel()
@ -211,7 +210,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
ec.log.Sugar().Infof("Starting a timer for %s for repairing %s up to %d nodes to try to have a number of pieces around the successful threshold (%d)",
timeout, path, nonNilCount(limits), rs.OptimalThreshold())
var successfulCount int32
var successfulCount, failureCount, cancellationCount int32
timer := time.AfterFunc(timeout, func() {
if ctx.Err() != context.Canceled {
ec.log.Sugar().Infof("Timer expired. Successfully repaired %s to %d nodes. Canceling the long tail...", path, atomic.LoadInt32(&successfulCount))
@ -219,8 +218,8 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
}
})
successfulNodes = make([]*pb.Node, len(limits))
successfulHashes = make([]*pb.PieceHash, len(limits))
successfulNodes = make([]*pb.Node, pieceCount)
successfulHashes = make([]*pb.PieceHash, pieceCount)
for range limits {
info := <-infos
@ -230,6 +229,11 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
}
if info.err != nil {
if !errs2.IsCanceled(info.err) {
failureCount++
} else {
cancellationCount++
}
ec.log.Sugar().Debugf("Repair %s to storage node %s failed: %v", path, limits[info.i].GetLimit().StorageNodeId, info.err)
continue
}
@ -239,7 +243,7 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
Address: limits[info.i].GetStorageNodeAddress(),
}
successfulHashes[info.i] = info.hash
atomic.AddInt32(&successfulCount, 1)
successfulCount++
}
// Ensure timer is stopped
@ -257,12 +261,17 @@ func (ec *ecClient) Repair(ctx context.Context, limits []*pb.AddressedOrderLimit
}
}()
if atomic.LoadInt32(&successfulCount) == 0 {
if successfulCount == 0 {
return nil, nil, Error.New("repair %v to all nodes failed", path)
}
ec.log.Sugar().Infof("Successfully repaired %s to %d nodes.", path, atomic.LoadInt32(&successfulCount))
mon.IntVal("repair_segment_pieces_total").Observe(int64(pieceCount))
mon.IntVal("repair_segment_pieces_successful").Observe(int64(successfulCount))
mon.IntVal("repair_segment_pieces_failed").Observe(int64(failureCount))
mon.IntVal("repair_segment_pieces_canceled").Observe(int64(cancellationCount))
return successfulNodes, successfulHashes, nil
}