edb8d656de
Currently slower storagenodes can slow down deletion queue. To make piece deletion faster reduce the maximum time spent in either dialing or piece deletion requests. With this change: * dial timeout is 3s * request timeout is 15s * fail threshold is set to 10min Similarly, we'll mark storage node as failed when the timeout occurs. The timeout usually indicates that the storagenode is overwhelmed. Garbage collection will ensure that the pieces get deleted eventually. Change-Id: Iec5de699f5917905f5807140e2c3252088c6399b
172 lines
4.3 KiB
Go
172 lines
4.3 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package piecedeletion
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/spacemonkeygo/monkit/v3"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/errs2"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/rpc"
|
|
"storj.io/common/storj"
|
|
)
|
|
|
|
// Dialer implements dialing piecestores and sending delete requests with batching and redial threshold.
|
|
type Dialer struct {
|
|
log *zap.Logger
|
|
dialer rpc.Dialer
|
|
|
|
requestTimeout time.Duration
|
|
failThreshold time.Duration
|
|
piecesPerRequest int
|
|
|
|
mu sync.RWMutex
|
|
dialFailed map[storj.NodeID]time.Time
|
|
}
|
|
|
|
// NewDialer returns a new Dialer.
|
|
func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold time.Duration, piecesPerRequest int) *Dialer {
|
|
return &Dialer{
|
|
log: log,
|
|
dialer: dialer,
|
|
|
|
requestTimeout: requestTimeout,
|
|
failThreshold: failThreshold,
|
|
piecesPerRequest: piecesPerRequest,
|
|
|
|
dialFailed: map[storj.NodeID]time.Time{},
|
|
}
|
|
}
|
|
|
|
// Handle tries to send the deletion requests to the specified node.
|
|
func (dialer *Dialer) Handle(ctx context.Context, node storj.NodeURL, queue Queue) {
|
|
defer mon.Task()(&ctx, node.ID.String())(nil)
|
|
defer FailPending(queue)
|
|
|
|
if dialer.recentlyFailed(ctx, node) {
|
|
return
|
|
}
|
|
|
|
client, conn, err := dialPieceStore(ctx, dialer.dialer, node)
|
|
if err != nil {
|
|
dialer.log.Debug("failed to dial", zap.Stringer("id", node.ID), zap.Error(err))
|
|
dialer.markFailed(ctx, node)
|
|
return
|
|
}
|
|
defer func() {
|
|
if err := conn.Close(); err != nil {
|
|
dialer.log.Debug("closing connection failed", zap.Stringer("id", node.ID), zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
for {
|
|
if err := ctx.Err(); err != nil {
|
|
return
|
|
}
|
|
|
|
jobs, ok := queue.PopAll()
|
|
// Add metrics on to the span
|
|
s := monkit.SpanFromCtx(ctx)
|
|
s.Annotate("delete jobs size", strconv.Itoa(len(jobs)))
|
|
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
for len(jobs) > 0 {
|
|
batch, promises, rest := batchJobs(jobs, dialer.piecesPerRequest)
|
|
// Aggregation metrics
|
|
mon.IntVal("delete_batch_size").Observe(int64(len(batch))) //mon:locked
|
|
// Tracing metrics
|
|
s.Annotate("delete_batch_size", strconv.Itoa(len(batch)))
|
|
|
|
jobs = rest
|
|
|
|
requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout)
|
|
resp, err := client.DeletePieces(requestCtx, &pb.DeletePiecesRequest{
|
|
PieceIds: batch,
|
|
})
|
|
cancel()
|
|
|
|
for _, promise := range promises {
|
|
if err != nil {
|
|
promise.Failure()
|
|
} else {
|
|
promise.Success()
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
dialer.log.Debug("deletion request failed", zap.Stringer("id", node.ID), zap.Error(err))
|
|
// don't try to send to this storage node a bit, when the deletion times out
|
|
if errs2.IsCanceled(err) || errors.Is(err, context.DeadlineExceeded) {
|
|
dialer.markFailed(ctx, node)
|
|
}
|
|
break
|
|
} else {
|
|
mon.IntVal("deletion_pieces_unhandled_count").Observe(resp.UnhandledCount) //mon:locked
|
|
}
|
|
|
|
jobs = append(jobs, queue.PopAllWithoutClose()...)
|
|
}
|
|
|
|
// if we failed early, remaining jobs should be marked as failures
|
|
for _, job := range jobs {
|
|
job.Resolve.Failure()
|
|
}
|
|
}
|
|
}
|
|
|
|
// markFailed marks node as something failed recently, so we shouldn't try again,
|
|
// for some time.
|
|
func (dialer *Dialer) markFailed(ctx context.Context, node storj.NodeURL) {
|
|
dialer.mu.Lock()
|
|
defer dialer.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
lastFailed, ok := dialer.dialFailed[node.ID]
|
|
if !ok || lastFailed.Before(now) {
|
|
dialer.dialFailed[node.ID] = now
|
|
}
|
|
}
|
|
|
|
// recentlyFailed checks whether a request to node recently failed.
|
|
func (dialer *Dialer) recentlyFailed(ctx context.Context, node storj.NodeURL) bool {
|
|
dialer.mu.RLock()
|
|
lastFailed, ok := dialer.dialFailed[node.ID]
|
|
dialer.mu.RUnlock()
|
|
|
|
// when we recently failed to dial, then fail immediately
|
|
return ok && time.Since(lastFailed) < dialer.failThreshold
|
|
}
|
|
|
|
func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises []Promise, rest []Job) {
|
|
for i, job := range jobs {
|
|
if len(pieces) >= maxBatchSize {
|
|
return pieces, promises, jobs[i:]
|
|
}
|
|
|
|
pieces = append(pieces, job.Pieces...)
|
|
promises = append(promises, job.Resolve)
|
|
}
|
|
|
|
return pieces, promises, nil
|
|
}
|
|
|
|
func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target storj.NodeURL) (pb.DRPCPiecestoreClient, *rpc.Conn, error) {
|
|
conn, err := dialer.DialNodeURL(ctx, target)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return pb.NewDRPCPiecestoreClient(conn), conn, nil
|
|
}
|