storj/satellite/metainfo/piecedeletion/dialer.go
Egon Elbre 676f3e8516 satellite/metainfo/piecedeletion: try to make batches larger
Currently it was possible that PopAll returns 1010 items, then
makes one RPC call with 1000 items, then RPC call 10 items. Meanwhile,
there have been added 500 new items added to the queue.

This change ensures that we pull items from the queue early and
try to make rpc batches as large as possible.

Change-Id: I1a30dde9164c2ff7b90c906a9544593c4f1cf0e9
2020-04-22 18:43:29 +00:00

149 lines
3.5 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/uplink/private/piecestore"
)
// Dialer implements dialing piecestores and sending delete requests with batching and redial threshold.
type Dialer struct {
log *zap.Logger
dialer rpc.Dialer
requestTimeout time.Duration
failThreshold time.Duration
piecesPerRequest int
mu sync.RWMutex
dialFailed map[storj.NodeID]time.Time
}
// NewDialer returns a new Dialer.
func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold time.Duration, piecesPerRequest int) *Dialer {
return &Dialer{
log: log,
dialer: dialer,
requestTimeout: requestTimeout,
failThreshold: failThreshold,
piecesPerRequest: piecesPerRequest,
dialFailed: map[storj.NodeID]time.Time{},
}
}
// Handle tries to send the deletion requests to the specified node.
func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) {
defer FailPending(queue)
if dialer.recentlyFailed(ctx, node) {
return
}
conn, err := piecestore.Dial(ctx, dialer.dialer, node, dialer.log, piecestore.DefaultConfig)
if err != nil {
dialer.log.Info("failed to dial", zap.Stringer("id", node.Id), zap.Error(err))
dialer.markFailed(ctx, node)
return
}
defer func() {
if err := conn.Close(); err != nil {
dialer.log.Info("closing connection failed", zap.Stringer("id", node.Id), zap.Error(err))
}
}()
for {
if err := ctx.Err(); err != nil {
return
}
jobs, ok := queue.PopAll()
if !ok {
return
}
for len(jobs) > 0 {
batch, promises, rest := batchJobs(jobs, dialer.piecesPerRequest)
jobs = rest
requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout)
err := conn.DeletePieces(requestCtx, batch...)
cancel()
for _, promise := range promises {
if err != nil {
promise.Failure()
} else {
promise.Success()
}
}
if err != nil {
dialer.log.Info("deletion request failed", zap.Stringer("id", node.Id), zap.Error(err))
// don't try to send to this storage node a bit, when the deletion times out
if errs2.IsCanceled(err) {
dialer.markFailed(ctx, node)
}
break
}
jobs = append(jobs, queue.PopAllWithoutClose()...)
}
// if we failed early, remaining jobs should be marked as failures
for _, job := range jobs {
job.Resolve.Failure()
}
}
}
// markFailed marks node as something failed recently, so we shouldn't try again,
// for some time.
func (dialer *Dialer) markFailed(ctx context.Context, node *pb.Node) {
dialer.mu.Lock()
defer dialer.mu.Unlock()
now := time.Now()
lastFailed, ok := dialer.dialFailed[node.Id]
if !ok || lastFailed.Before(now) {
dialer.dialFailed[node.Id] = now
}
}
// recentlyFailed checks whether a request to node recently failed.
func (dialer *Dialer) recentlyFailed(ctx context.Context, node *pb.Node) bool {
dialer.mu.RLock()
lastFailed, ok := dialer.dialFailed[node.Id]
dialer.mu.RUnlock()
// when we recently failed to dial, then fail immediately
return ok && time.Since(lastFailed) < dialer.failThreshold
}
func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises []Promise, rest []Job) {
for i, job := range jobs {
if len(pieces) >= maxBatchSize {
return pieces, promises, jobs[i:]
}
pieces = append(pieces, job.Pieces...)
promises = append(promises, job.Resolve)
}
return pieces, promises, nil
}