gracefulexit: reconnect added

Change-Id: I236689af944effe3e79ef92e852ae264d3b372e5
This commit is contained in:
Qweder93 2020-08-11 15:59:46 +03:00
parent 68b67c83a7
commit cff44fbd19
3 changed files with 167 additions and 17 deletions

View File

@ -16,6 +16,9 @@ var (
// Error is the default error class for graceful exit package.
Error = errs.Class("gracefulexit")
// ErrReconnect is error class for connection/transport error.
ErrReconnect = errs.Class("reconnect")
mon = monkit.Package()
)

View File

@ -8,6 +8,7 @@ import (
"context"
"io"
"os"
"sync/atomic"
"time"
"github.com/zeebo/errs"
@ -40,6 +41,12 @@ type Worker struct {
ecclient ecclient.Client
minBytesPerSecond memory.Size
minDownloadTimeout time.Duration
Connects int64
NumSucceeded int64
NumFailed int64
Conn *rpc.Conn
ProcessClient pb.DRPCSatelliteGracefulExit_ProcessClient
backoffTime time.Duration
}
// NewWorker instantiates Worker.
@ -58,34 +65,45 @@ func NewWorker(log *zap.Logger, store *pieces.Store, trust *trust.Pool, satellit
}
}
const (
startBackOffTime = 500 * time.Millisecond
maxBackOffTime = time.Hour
)
// Run calls the satellite endpoint, transfers pieces, validates, and responds with success or failure.
// It also marks the satellite finished once all the pieces have been transferred.
func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
defer mon.Task()(&ctx)(&err)
defer done()
worker.backoffTime = startBackOffTime
worker.log.Debug("running worker")
conn, err := worker.dialer.DialNodeURL(ctx, worker.satelliteURL)
if err != nil {
return errs.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
worker.log.Info("numbers of success, reconnects, failures:", zap.Any("successes:", atomic.LoadInt64(&worker.NumSucceeded)), zap.Any("connection attempts:", atomic.LoadInt64(&worker.Connects)), zap.Any("fails:", atomic.LoadInt64(&worker.NumFailed)))
}()
client := pb.NewDRPCSatelliteGracefulExitClient(conn)
c, err := client.Process(ctx)
if err != nil {
return errs.Wrap(err)
}
defer func() {
err = errs.Combine(err, worker.Conn.Close())
}()
for {
response, err := c.Recv()
err = worker.CheckConnection(ctx)
if err != nil {
if ErrReconnect.Has(err) {
continue
}
}
worker.backoffTime = startBackOffTime
response, err := worker.ProcessClient.Recv()
if errs.Is(err, io.EOF) {
// Done
return nil
err = worker.Conn.Close()
if err != nil {
worker.log.Error("unable to close connection", zap.Error(err))
}
continue
}
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
// delete the entry from satellite table and inform graceful exit has failed to start
@ -97,7 +115,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
return errs.Wrap(err)
}
if err != nil {
// TODO what happened
worker.log.Error("error while receiving message from satellite", zap.Error(err))
return errs.Wrap(err)
}
@ -108,11 +126,14 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
case *pb.SatelliteMessage_TransferPiece:
transferPieceMsg := msg.TransferPiece
worker.limiter.Go(ctx, func() {
err = worker.transferPiece(ctx, transferPieceMsg, c)
err = worker.transferPiece(ctx, transferPieceMsg, worker.ProcessClient)
if err != nil {
worker.log.Error("failed to transfer piece.",
zap.Stringer("Satellite ID", worker.satelliteURL.ID),
zap.Error(errs.Wrap(err)))
atomic.AddInt64(&worker.NumFailed, 1)
} else {
atomic.AddInt64(&worker.NumSucceeded, 1)
}
})
@ -162,8 +183,9 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
if err != nil {
return errs.Wrap(err)
}
// delete everything left in blobs folder of specific satellites
// delete everything left in blobs folder of specific satellites.
err = worker.store.DeleteSatelliteBlobs(ctx, worker.satelliteURL.ID)
return errs.Wrap(err)
default:
// TODO handle err
@ -364,6 +386,48 @@ func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) er
return err
}
// wait waiting between new requests incrementing interval each time until limit exceeded.
func (worker *Worker) wait(ctx context.Context) error {
worker.backoffTime *= 2
if worker.backoffTime > maxBackOffTime {
worker.backoffTime = maxBackOffTime
}
if !sync2.Sleep(ctx, worker.backoffTime) {
return ctx.Err()
}
return nil
}
// CheckConnection in worker for loop checks if connection is closed or nil, if so - reconnects to satellite.
func (worker *Worker) CheckConnection(ctx context.Context) error {
if worker.Conn != nil && !worker.Conn.Closed() {
return nil
}
err := worker.wait(ctx)
if err != nil {
return Error.Wrap(err)
}
atomic.AddInt64(&worker.Connects, 1)
worker.Conn, err = worker.dialer.DialNodeURL(ctx, worker.satelliteURL)
if err != nil {
worker.log.Error("couldn't create connection with satellite", zap.Error(err))
return ErrReconnect.Wrap(err)
}
client := pb.NewDRPCSatelliteGracefulExitClient(worker.Conn)
worker.ProcessClient, err = client.Process(ctx)
if err != nil {
worker.log.Error("storagenode couldn't call Process", zap.Error(err))
return errs.Wrap(err)
}
return nil
}
// deleteAllPieces deletes pieces stored for a satellite.
func (worker *Worker) deleteAllPieces(ctx context.Context) error {
var totalDeleted int64

View File

@ -94,6 +94,89 @@ func TestWorkerSuccess(t *testing.T) {
})
}
func TestWorkerCheckConnection(t *testing.T) {
const successThreshold = 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
exitingNode, err := findNodeToExit(ctx, planet, 1)
require.NoError(t, err)
exitingNode.GracefulExit.Chore.Loop.Pause()
exitStatusReq := overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
}
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq)
require.NoError(t, err)
// run the satellite chore to build the transfer queue.
satellite.GracefulExit.Chore.Loop.TriggerWait()
satellite.GracefulExit.Chore.Loop.Pause()
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
// run the SN chore again to start processing transfers.
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.Peer.Storage2.Trust, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(),
gracefulexit.Config{
ChoreInterval: 0,
NumWorkers: 2,
NumConcurrentTransfers: 2,
MinBytesPerSecond: 128,
MinDownloadTimeout: 2 * time.Minute,
})
defer ctx.Check(worker.Close)
require.Nil(t, worker.Conn)
require.Nil(t, worker.ProcessClient)
require.Equal(t, worker.NumFailed, int64(0))
require.Equal(t, worker.NumSucceeded, int64(0))
require.Equal(t, worker.Connects, int64(0))
err = worker.CheckConnection(ctx)
require.NoError(t, err)
require.NotNil(t, worker.Conn)
require.NotNil(t, worker.ProcessClient)
require.Equal(t, worker.Connects, int64(1))
err = worker.Run(ctx, func() {})
require.NoError(t, err)
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.EqualValues(t, progress.PiecesFailed, 0)
require.EqualValues(t, progress.PiecesTransferred, 1)
exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
require.NotNil(t, exitStatus.ExitFinishedAt)
require.True(t, exitStatus.ExitSuccess)
require.Equal(t, worker.NumFailed, int64(0))
require.Equal(t, worker.NumSucceeded, int64(1))
require.Equal(t, worker.Connects, int64(1))
})
}
func TestWorkerTimeout(t *testing.T) {
const successThreshold = 4
testplanet.Run(t, testplanet.Config{