satellite/metainfo/piecedeletion: add Dialer
This adds a piece deletion handler that has debounce for failed dialing and batching multiple jobs into a single request. Change-Id: If64021bebb2faae7f3e6bdcceef705aed41e7d7b
This commit is contained in:
parent
bdbf764b86
commit
22ea0c7c1a
146
satellite/metainfo/piecedeletion/dialer.go
Normal file
146
satellite/metainfo/piecedeletion/dialer.go
Normal file
@ -0,0 +1,146 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package piecedeletion
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/uplink/private/piecestore"
|
||||
)
|
||||
|
||||
// Dialer implements dialing piecestores and sending delete requests with batching and redial threshold.
|
||||
type Dialer struct {
|
||||
log *zap.Logger
|
||||
dialer rpc.Dialer
|
||||
|
||||
requestTimeout time.Duration
|
||||
failThreshold time.Duration
|
||||
piecesPerRequest int
|
||||
|
||||
mu sync.RWMutex
|
||||
dialFailed map[storj.NodeID]time.Time
|
||||
}
|
||||
|
||||
// NewDialer returns a new Dialer.
|
||||
func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold time.Duration, piecesPerRequest int) *Dialer {
|
||||
return &Dialer{
|
||||
log: log,
|
||||
dialer: dialer,
|
||||
|
||||
requestTimeout: requestTimeout,
|
||||
failThreshold: failThreshold,
|
||||
piecesPerRequest: piecesPerRequest,
|
||||
|
||||
dialFailed: map[storj.NodeID]time.Time{},
|
||||
}
|
||||
}
|
||||
|
||||
// Handle tries to send the deletion requests to the specified node.
|
||||
func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) {
|
||||
defer FailPending(queue)
|
||||
|
||||
if dialer.recentlyFailed(ctx, node) {
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := piecestore.Dial(ctx, dialer.dialer, node, dialer.log, piecestore.DefaultConfig)
|
||||
if err != nil {
|
||||
dialer.log.Info("failed to dial", zap.Stringer("id", node.Id), zap.Error(err))
|
||||
dialer.markFailed(ctx, node)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
dialer.log.Info("closing connection failed", zap.Stringer("id", node.Id), zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
jobs, ok := queue.PopAll()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for len(jobs) > 0 {
|
||||
batch, promises, rest := batchJobs(jobs, dialer.piecesPerRequest)
|
||||
jobs = rest
|
||||
|
||||
requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout)
|
||||
err := conn.DeletePieces(requestCtx, batch...)
|
||||
cancel()
|
||||
|
||||
for _, promise := range promises {
|
||||
if err != nil {
|
||||
promise.Failure()
|
||||
} else {
|
||||
promise.Success()
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
dialer.log.Info("deletion request failed", zap.Stringer("id", node.Id), zap.Error(err))
|
||||
// don't try to send to this storage node a bit, when the deletion times out
|
||||
if errs2.IsCanceled(err) {
|
||||
dialer.markFailed(ctx, node)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// if we failed early, remaining jobs should be marked as failures
|
||||
for _, job := range jobs {
|
||||
job.Resolve.Failure()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// markFailed marks node as something failed recently, so we shouldn't try again,
|
||||
// for some time.
|
||||
func (dialer *Dialer) markFailed(ctx context.Context, node *pb.Node) {
|
||||
dialer.mu.Lock()
|
||||
defer dialer.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
lastFailed, ok := dialer.dialFailed[node.Id]
|
||||
if !ok || lastFailed.Before(now) {
|
||||
dialer.dialFailed[node.Id] = now
|
||||
}
|
||||
}
|
||||
|
||||
// recentlyFailed checks whether a request to node recently failed.
|
||||
func (dialer *Dialer) recentlyFailed(ctx context.Context, node *pb.Node) bool {
|
||||
dialer.mu.RLock()
|
||||
lastFailed, ok := dialer.dialFailed[node.Id]
|
||||
dialer.mu.RUnlock()
|
||||
|
||||
// when we recently failed to dial, then fail immediately
|
||||
return ok && time.Since(lastFailed) < dialer.failThreshold
|
||||
}
|
||||
|
||||
func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises []Promise, rest []Job) {
|
||||
for i, job := range jobs {
|
||||
if len(pieces) >= maxBatchSize {
|
||||
return pieces, promises, jobs[i:]
|
||||
}
|
||||
|
||||
pieces = append(pieces, job.Pieces...)
|
||||
promises = append(promises, job.Resolve)
|
||||
}
|
||||
|
||||
return pieces, promises, nil
|
||||
}
|
121
satellite/metainfo/piecedeletion/dialer_test.go
Normal file
121
satellite/metainfo/piecedeletion/dialer_test.go
Normal file
@ -0,0 +1,121 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package piecedeletion_test
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/metainfo/piecedeletion"
|
||||
)
|
||||
|
||||
type CountedPromise struct {
|
||||
SuccessCount int64
|
||||
FailureCount int64
|
||||
}
|
||||
|
||||
func (p *CountedPromise) Success() { atomic.AddInt64(&p.SuccessCount, 1) }
|
||||
func (p *CountedPromise) Failure() { atomic.AddInt64(&p.FailureCount, 1) }
|
||||
|
||||
func TestDialer(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
dialer := piecedeletion.NewDialer(log, planet.Satellites[0].Dialer, 5*time.Second, 5*time.Second, 100)
|
||||
require.NotNil(t, dialer)
|
||||
|
||||
storageNode := &pb.Node{
|
||||
Id: planet.StorageNodes[0].ID(),
|
||||
Address: &pb.NodeAddress{
|
||||
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
||||
Address: planet.StorageNodes[0].Addr(),
|
||||
},
|
||||
}
|
||||
|
||||
promise, jobs := makeJobsQueue(t, 2)
|
||||
|
||||
dialer.Handle(ctx, storageNode, jobs)
|
||||
|
||||
require.Equal(t, int64(2), promise.SuccessCount)
|
||||
require.Equal(t, int64(0), promise.FailureCount)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDialer_DialTimeout(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
const dialTimeout = 5 * time.Second
|
||||
|
||||
rpcdial := planet.Satellites[0].Dialer
|
||||
rpcdial.DialTimeout = dialTimeout
|
||||
|
||||
dialer := piecedeletion.NewDialer(log, rpcdial, 5*time.Second, 1*time.Minute, 100)
|
||||
require.NotNil(t, dialer)
|
||||
|
||||
require.NoError(t, planet.StopPeer(planet.StorageNodes[0]))
|
||||
|
||||
storageNode := &pb.Node{
|
||||
Id: planet.StorageNodes[0].ID(),
|
||||
Address: &pb.NodeAddress{
|
||||
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
||||
Address: planet.StorageNodes[0].Addr(),
|
||||
},
|
||||
}
|
||||
|
||||
{
|
||||
promise, jobs := makeJobsQueue(t, 1)
|
||||
// we should fail to dial in the time allocated
|
||||
start := time.Now()
|
||||
dialer.Handle(ctx, storageNode, jobs)
|
||||
failingToDial := time.Since(start)
|
||||
|
||||
require.Less(t, failingToDial.Seconds(), (2 * dialTimeout).Seconds())
|
||||
require.Equal(t, int64(0), promise.SuccessCount)
|
||||
require.Equal(t, int64(1), promise.FailureCount)
|
||||
}
|
||||
|
||||
{
|
||||
promise, jobs := makeJobsQueue(t, 1)
|
||||
|
||||
// we should immediately return when we try to redial within 1 minute
|
||||
start := time.Now()
|
||||
dialer.Handle(ctx, storageNode, jobs)
|
||||
failingToRedial := time.Since(start)
|
||||
|
||||
require.Less(t, failingToRedial.Seconds(), time.Second.Seconds())
|
||||
require.Equal(t, int64(0), promise.SuccessCount)
|
||||
require.Equal(t, int64(1), promise.FailureCount)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// we can use a random piece id, since deletion requests for already deleted pieces is expected.
|
||||
func makeJobsQueue(t *testing.T, n int) (*CountedPromise, piecedeletion.Queue) {
|
||||
promise := &CountedPromise{}
|
||||
|
||||
jobs := piecedeletion.NewLimitedJobs(-1)
|
||||
for i := 0; i < n; i++ {
|
||||
require.True(t, jobs.TryPush(piecedeletion.Job{
|
||||
Pieces: []storj.PieceID{testrand.PieceID(), testrand.PieceID()},
|
||||
Resolve: promise,
|
||||
}))
|
||||
}
|
||||
|
||||
return promise, jobs
|
||||
}
|
Loading…
Reference in New Issue
Block a user