Remove long tail timer in ecclient (#2433)
* remove long tail timer in ecclient
This commit is contained in:
parent
ca0058c9f1
commit
2f6e193c32
@ -100,8 +100,6 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
|
||||
psCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for i, addressedLimit := range limits {
|
||||
go func(i int, addressedLimit *pb.AddressedOrderLimit) {
|
||||
hash, err := ec.putPiece(psCtx, ctx, addressedLimit, readers[i], expiration)
|
||||
@ -112,9 +110,6 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
|
||||
successfulNodes = make([]*pb.Node, len(limits))
|
||||
successfulHashes = make([]*pb.PieceHash, len(limits))
|
||||
var successfulCount int32
|
||||
var timer *time.Timer
|
||||
var lastSuccess time.Time
|
||||
var waitStart time.Time
|
||||
|
||||
for range limits {
|
||||
info := <-infos
|
||||
@ -133,38 +128,15 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
|
||||
Address: limits[info.i].GetStorageNodeAddress(),
|
||||
}
|
||||
successfulHashes[info.i] = info.hash
|
||||
lastSuccess = time.Now()
|
||||
|
||||
switch int(atomic.AddInt32(&successfulCount, 1)) {
|
||||
case rs.OptimalThreshold():
|
||||
ec.log.Sugar().Infof("Success threshold (%d nodes) reached. Canceling the long tail...", rs.OptimalThreshold())
|
||||
if timer != nil {
|
||||
timer.Stop()
|
||||
}
|
||||
atomic.AddInt32(&successfulCount, 1)
|
||||
|
||||
if int(successfulCount) >= rs.OptimalThreshold() {
|
||||
ec.log.Sugar().Infof("Success threshold (%d nodes) reached. Cancelling remaining uploads.", rs.OptimalThreshold())
|
||||
cancel()
|
||||
case rs.RepairThreshold() + 1:
|
||||
waitStart = time.Now()
|
||||
elapsed := waitStart.Sub(start)
|
||||
more := elapsed * 3 / 2
|
||||
|
||||
ec.log.Sugar().Debugf("Repair threshold (%d nodes) passed in %.2f s. Starting a timer for %.2f s for reaching the success threshold (%d nodes)...",
|
||||
rs.RepairThreshold(), elapsed.Seconds(), more.Seconds(), rs.OptimalThreshold())
|
||||
|
||||
timer = time.AfterFunc(more, func() {
|
||||
if ctx.Err() != context.Canceled {
|
||||
ec.log.Sugar().Debugf("Timer expired. Successfully uploaded to %d nodes. Canceling the long tail...", atomic.LoadInt32(&successfulCount))
|
||||
cancel()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure timer is stopped in the case of repair threshold is reached, but
|
||||
// not the success threshold due to errors instead of slowness.
|
||||
if timer != nil {
|
||||
timer.Stop()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -182,15 +154,8 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r
|
||||
return nil, nil, Error.New("successful puts (%d) less than or equal to repair threshold (%d)", successes, rs.RepairThreshold())
|
||||
}
|
||||
|
||||
// Monitor what the best fraction would have been for this upload.
|
||||
if !lastSuccess.IsZero() && !waitStart.IsZero() {
|
||||
repairThreshold := waitStart.Sub(start).Seconds()
|
||||
extraDuration := lastSuccess.Sub(waitStart).Seconds()
|
||||
if extraDuration != 0 {
|
||||
mon.FloatVal("repair_threshold").Observe(repairThreshold)
|
||||
mon.FloatVal("extra_duration").Observe(extraDuration)
|
||||
mon.FloatVal("optimal_fraction").Observe(extraDuration / repairThreshold)
|
||||
}
|
||||
if successes < rs.OptimalThreshold() {
|
||||
return nil, nil, Error.New("successful puts (%d) less than success threshold (%d)", successes, rs.OptimalThreshold())
|
||||
}
|
||||
|
||||
return successfulNodes, successfulHashes, nil
|
||||
|
Loading…
Reference in New Issue
Block a user