From 2f6e193c323b2f3617a56d393f7b08fd0b3aaaa1 Mon Sep 17 00:00:00 2001 From: Maximillian von Briesen Date: Wed, 3 Jul 2019 11:00:24 -0400 Subject: [PATCH] Remove long tail timer in ecclient (#2433) * remove long tail timer in ecclient --- pkg/storage/ec/client.go | 47 +++++----------------------------------- 1 file changed, 6 insertions(+), 41 deletions(-) diff --git a/pkg/storage/ec/client.go b/pkg/storage/ec/client.go index 6c8d959d5..708e2a105 100644 --- a/pkg/storage/ec/client.go +++ b/pkg/storage/ec/client.go @@ -100,8 +100,6 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r psCtx, cancel := context.WithCancel(ctx) defer cancel() - start := time.Now() - for i, addressedLimit := range limits { go func(i int, addressedLimit *pb.AddressedOrderLimit) { hash, err := ec.putPiece(psCtx, ctx, addressedLimit, readers[i], expiration) @@ -112,9 +110,6 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r successfulNodes = make([]*pb.Node, len(limits)) successfulHashes = make([]*pb.PieceHash, len(limits)) var successfulCount int32 - var timer *time.Timer - var lastSuccess time.Time - var waitStart time.Time for range limits { info := <-infos @@ -133,38 +128,15 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r Address: limits[info.i].GetStorageNodeAddress(), } successfulHashes[info.i] = info.hash - lastSuccess = time.Now() - switch int(atomic.AddInt32(&successfulCount, 1)) { - case rs.OptimalThreshold(): - ec.log.Sugar().Infof("Success threshold (%d nodes) reached. Canceling the long tail...", rs.OptimalThreshold()) - if timer != nil { - timer.Stop() - } + atomic.AddInt32(&successfulCount, 1) + + if int(successfulCount) >= rs.OptimalThreshold() { + ec.log.Sugar().Infof("Success threshold (%d nodes) reached. Cancelling remaining uploads.", rs.OptimalThreshold()) cancel() - case rs.RepairThreshold() + 1: - waitStart = time.Now() - elapsed := waitStart.Sub(start) - more := elapsed * 3 / 2 - - ec.log.Sugar().Debugf("Repair threshold (%d nodes) passed in %.2f s. Starting a timer for %.2f s for reaching the success threshold (%d nodes)...", - rs.RepairThreshold(), elapsed.Seconds(), more.Seconds(), rs.OptimalThreshold()) - - timer = time.AfterFunc(more, func() { - if ctx.Err() != context.Canceled { - ec.log.Sugar().Debugf("Timer expired. Successfully uploaded to %d nodes. Canceling the long tail...", atomic.LoadInt32(&successfulCount)) - cancel() - } - }) } } - // Ensure timer is stopped in the case of repair threshold is reached, but - // not the success threshold due to errors instead of slowness. - if timer != nil { - timer.Stop() - } - defer func() { select { case <-ctx.Done(): @@ -182,15 +154,8 @@ func (ec *ecClient) Put(ctx context.Context, limits []*pb.AddressedOrderLimit, r return nil, nil, Error.New("successful puts (%d) less than or equal to repair threshold (%d)", successes, rs.RepairThreshold()) } - // Monitor what the best fraction would have been for this upload. - if !lastSuccess.IsZero() && !waitStart.IsZero() { - repairThreshold := waitStart.Sub(start).Seconds() - extraDuration := lastSuccess.Sub(waitStart).Seconds() - if extraDuration != 0 { - mon.FloatVal("repair_threshold").Observe(repairThreshold) - mon.FloatVal("extra_duration").Observe(extraDuration) - mon.FloatVal("optimal_fraction").Observe(extraDuration / repairThreshold) - } + if successes < rs.OptimalThreshold() { + return nil, nil, Error.New("successful puts (%d) less than success threshold (%d)", successes, rs.OptimalThreshold()) } return successfulNodes, successfulHashes, nil