storj/satellite/metainfo/piecedeletion/dialer.go
Egon Elbre edb8d656de satellite/metainfo: adjust piecedeletion timeouts
Currently slower storagenodes can slow down deletion queue.
To make piece deletion faster reduce the maximum time spent in
either dialing or piece deletion requests.

With this change:
* dial timeout is 3s
* request timeout is 15s
* fail threshold is set to 10min

Similarly, we'll mark storage node as failed when the timeout occurs.
The timeout usually indicates that the storagenode is overwhelmed.
Garbage collection will ensure that the pieces get deleted eventually.

Change-Id: Iec5de699f5917905f5807140e2c3252088c6399b
2021-10-28 13:37:01 +03:00

172 lines
4.3 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"errors"
"strconv"
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
)
// Dialer implements dialing piecestores and sending delete requests with batching and redial threshold.
type Dialer struct {
log *zap.Logger
dialer rpc.Dialer
requestTimeout time.Duration
failThreshold time.Duration
piecesPerRequest int
mu sync.RWMutex
dialFailed map[storj.NodeID]time.Time
}
// NewDialer returns a new Dialer.
func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold time.Duration, piecesPerRequest int) *Dialer {
return &Dialer{
log: log,
dialer: dialer,
requestTimeout: requestTimeout,
failThreshold: failThreshold,
piecesPerRequest: piecesPerRequest,
dialFailed: map[storj.NodeID]time.Time{},
}
}
// Handle tries to send the deletion requests to the specified node.
func (dialer *Dialer) Handle(ctx context.Context, node storj.NodeURL, queue Queue) {
defer mon.Task()(&ctx, node.ID.String())(nil)
defer FailPending(queue)
if dialer.recentlyFailed(ctx, node) {
return
}
client, conn, err := dialPieceStore(ctx, dialer.dialer, node)
if err != nil {
dialer.log.Debug("failed to dial", zap.Stringer("id", node.ID), zap.Error(err))
dialer.markFailed(ctx, node)
return
}
defer func() {
if err := conn.Close(); err != nil {
dialer.log.Debug("closing connection failed", zap.Stringer("id", node.ID), zap.Error(err))
}
}()
for {
if err := ctx.Err(); err != nil {
return
}
jobs, ok := queue.PopAll()
// Add metrics on to the span
s := monkit.SpanFromCtx(ctx)
s.Annotate("delete jobs size", strconv.Itoa(len(jobs)))
if !ok {
return
}
for len(jobs) > 0 {
batch, promises, rest := batchJobs(jobs, dialer.piecesPerRequest)
// Aggregation metrics
mon.IntVal("delete_batch_size").Observe(int64(len(batch))) //mon:locked
// Tracing metrics
s.Annotate("delete_batch_size", strconv.Itoa(len(batch)))
jobs = rest
requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout)
resp, err := client.DeletePieces(requestCtx, &pb.DeletePiecesRequest{
PieceIds: batch,
})
cancel()
for _, promise := range promises {
if err != nil {
promise.Failure()
} else {
promise.Success()
}
}
if err != nil {
dialer.log.Debug("deletion request failed", zap.Stringer("id", node.ID), zap.Error(err))
// don't try to send to this storage node a bit, when the deletion times out
if errs2.IsCanceled(err) || errors.Is(err, context.DeadlineExceeded) {
dialer.markFailed(ctx, node)
}
break
} else {
mon.IntVal("deletion_pieces_unhandled_count").Observe(resp.UnhandledCount) //mon:locked
}
jobs = append(jobs, queue.PopAllWithoutClose()...)
}
// if we failed early, remaining jobs should be marked as failures
for _, job := range jobs {
job.Resolve.Failure()
}
}
}
// markFailed marks node as something failed recently, so we shouldn't try again,
// for some time.
func (dialer *Dialer) markFailed(ctx context.Context, node storj.NodeURL) {
dialer.mu.Lock()
defer dialer.mu.Unlock()
now := time.Now()
lastFailed, ok := dialer.dialFailed[node.ID]
if !ok || lastFailed.Before(now) {
dialer.dialFailed[node.ID] = now
}
}
// recentlyFailed checks whether a request to node recently failed.
func (dialer *Dialer) recentlyFailed(ctx context.Context, node storj.NodeURL) bool {
dialer.mu.RLock()
lastFailed, ok := dialer.dialFailed[node.ID]
dialer.mu.RUnlock()
// when we recently failed to dial, then fail immediately
return ok && time.Since(lastFailed) < dialer.failThreshold
}
func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises []Promise, rest []Job) {
for i, job := range jobs {
if len(pieces) >= maxBatchSize {
return pieces, promises, jobs[i:]
}
pieces = append(pieces, job.Pieces...)
promises = append(promises, job.Resolve)
}
return pieces, promises, nil
}
func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target storj.NodeURL) (pb.DRPCPiecestoreClient, *rpc.Conn, error) {
conn, err := dialer.DialNodeURL(ctx, target)
if err != nil {
return nil, nil, err
}
return pb.NewDRPCPiecestoreClient(conn), conn, nil
}