diff --git a/storagenode/gracefulexit/common.go b/storagenode/gracefulexit/common.go index 6374d0253..516644fef 100644 --- a/storagenode/gracefulexit/common.go +++ b/storagenode/gracefulexit/common.go @@ -16,6 +16,9 @@ var ( // Error is the default error class for graceful exit package. Error = errs.Class("gracefulexit") + // ErrReconnect is error class for connection/transport error. + ErrReconnect = errs.Class("reconnect") + mon = monkit.Package() ) diff --git a/storagenode/gracefulexit/worker.go b/storagenode/gracefulexit/worker.go index 854365a1b..17ac4be22 100644 --- a/storagenode/gracefulexit/worker.go +++ b/storagenode/gracefulexit/worker.go @@ -8,6 +8,7 @@ import ( "context" "io" "os" + "sync/atomic" "time" "github.com/zeebo/errs" @@ -40,6 +41,12 @@ type Worker struct { ecclient ecclient.Client minBytesPerSecond memory.Size minDownloadTimeout time.Duration + Connects int64 + NumSucceeded int64 + NumFailed int64 + Conn *rpc.Conn + ProcessClient pb.DRPCSatelliteGracefulExit_ProcessClient + backoffTime time.Duration } // NewWorker instantiates Worker. @@ -58,34 +65,45 @@ func NewWorker(log *zap.Logger, store *pieces.Store, trust *trust.Pool, satellit } } +const ( + startBackOffTime = 500 * time.Millisecond + maxBackOffTime = time.Hour +) + // Run calls the satellite endpoint, transfers pieces, validates, and responds with success or failure. // It also marks the satellite finished once all the pieces have been transferred. func (worker *Worker) Run(ctx context.Context, done func()) (err error) { defer mon.Task()(&ctx)(&err) defer done() + worker.backoffTime = startBackOffTime + worker.log.Debug("running worker") - conn, err := worker.dialer.DialNodeURL(ctx, worker.satelliteURL) - if err != nil { - return errs.Wrap(err) - } defer func() { - err = errs.Combine(err, conn.Close()) + worker.log.Info("numbers of success, reconnects, failures:", zap.Any("successes:", atomic.LoadInt64(&worker.NumSucceeded)), zap.Any("connection attempts:", atomic.LoadInt64(&worker.Connects)), zap.Any("fails:", atomic.LoadInt64(&worker.NumFailed))) }() - client := pb.NewDRPCSatelliteGracefulExitClient(conn) - - c, err := client.Process(ctx) - if err != nil { - return errs.Wrap(err) - } + defer func() { + err = errs.Combine(err, worker.Conn.Close()) + }() for { - response, err := c.Recv() + err = worker.CheckConnection(ctx) + if err != nil { + if ErrReconnect.Has(err) { + continue + } + } + worker.backoffTime = startBackOffTime + + response, err := worker.ProcessClient.Recv() if errs.Is(err, io.EOF) { - // Done - return nil + err = worker.Conn.Close() + if err != nil { + worker.log.Error("unable to close connection", zap.Error(err)) + } + continue } if errs2.IsRPC(err, rpcstatus.FailedPrecondition) { // delete the entry from satellite table and inform graceful exit has failed to start @@ -97,7 +115,7 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) { return errs.Wrap(err) } if err != nil { - // TODO what happened + worker.log.Error("error while receiving message from satellite", zap.Error(err)) return errs.Wrap(err) } @@ -108,11 +126,14 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) { case *pb.SatelliteMessage_TransferPiece: transferPieceMsg := msg.TransferPiece worker.limiter.Go(ctx, func() { - err = worker.transferPiece(ctx, transferPieceMsg, c) + err = worker.transferPiece(ctx, transferPieceMsg, worker.ProcessClient) if err != nil { worker.log.Error("failed to transfer piece.", zap.Stringer("Satellite ID", worker.satelliteURL.ID), zap.Error(errs.Wrap(err))) + atomic.AddInt64(&worker.NumFailed, 1) + } else { + atomic.AddInt64(&worker.NumSucceeded, 1) } }) @@ -162,8 +183,9 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) { if err != nil { return errs.Wrap(err) } - // delete everything left in blobs folder of specific satellites + // delete everything left in blobs folder of specific satellites. err = worker.store.DeleteSatelliteBlobs(ctx, worker.satelliteURL.ID) + return errs.Wrap(err) default: // TODO handle err @@ -364,6 +386,48 @@ func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) er return err } +// wait waiting between new requests incrementing interval each time until limit exceeded. +func (worker *Worker) wait(ctx context.Context) error { + worker.backoffTime *= 2 + if worker.backoffTime > maxBackOffTime { + worker.backoffTime = maxBackOffTime + } + + if !sync2.Sleep(ctx, worker.backoffTime) { + return ctx.Err() + } + + return nil +} + +// CheckConnection in worker for loop checks if connection is closed or nil, if so - reconnects to satellite. +func (worker *Worker) CheckConnection(ctx context.Context) error { + if worker.Conn != nil && !worker.Conn.Closed() { + return nil + } + + err := worker.wait(ctx) + if err != nil { + return Error.Wrap(err) + } + + atomic.AddInt64(&worker.Connects, 1) + worker.Conn, err = worker.dialer.DialNodeURL(ctx, worker.satelliteURL) + if err != nil { + worker.log.Error("couldn't create connection with satellite", zap.Error(err)) + return ErrReconnect.Wrap(err) + } + client := pb.NewDRPCSatelliteGracefulExitClient(worker.Conn) + + worker.ProcessClient, err = client.Process(ctx) + if err != nil { + worker.log.Error("storagenode couldn't call Process", zap.Error(err)) + return errs.Wrap(err) + } + + return nil +} + // deleteAllPieces deletes pieces stored for a satellite. func (worker *Worker) deleteAllPieces(ctx context.Context) error { var totalDeleted int64 diff --git a/storagenode/gracefulexit/worker_test.go b/storagenode/gracefulexit/worker_test.go index 413761c6a..22cd4c9e4 100644 --- a/storagenode/gracefulexit/worker_test.go +++ b/storagenode/gracefulexit/worker_test.go @@ -94,6 +94,89 @@ func TestWorkerSuccess(t *testing.T) { }) } +func TestWorkerCheckConnection(t *testing.T) { + const successThreshold = 4 + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: successThreshold + 1, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + ul := planet.Uplinks[0] + + satellite.GracefulExit.Chore.Loop.Pause() + + err := ul.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + + exitingNode, err := findNodeToExit(ctx, planet, 1) + require.NoError(t, err) + exitingNode.GracefulExit.Chore.Loop.Pause() + + exitStatusReq := overlay.ExitStatusRequest{ + NodeID: exitingNode.ID(), + ExitInitiatedAt: time.Now(), + } + _, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq) + require.NoError(t, err) + + // run the satellite chore to build the transfer queue. + satellite.GracefulExit.Chore.Loop.TriggerWait() + satellite.GracefulExit.Chore.Loop.Pause() + + // check that the satellite knows the storage node is exiting. + exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx) + require.NoError(t, err) + require.Len(t, exitingNodes, 1) + require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID) + + queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0) + require.NoError(t, err) + require.Len(t, queueItems, 1) + + // run the SN chore again to start processing transfers. + worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.Peer.Storage2.Trust, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.NodeURL(), + gracefulexit.Config{ + ChoreInterval: 0, + NumWorkers: 2, + NumConcurrentTransfers: 2, + MinBytesPerSecond: 128, + MinDownloadTimeout: 2 * time.Minute, + }) + defer ctx.Check(worker.Close) + require.Nil(t, worker.Conn) + require.Nil(t, worker.ProcessClient) + require.Equal(t, worker.NumFailed, int64(0)) + require.Equal(t, worker.NumSucceeded, int64(0)) + require.Equal(t, worker.Connects, int64(0)) + + err = worker.CheckConnection(ctx) + require.NoError(t, err) + require.NotNil(t, worker.Conn) + require.NotNil(t, worker.ProcessClient) + require.Equal(t, worker.Connects, int64(1)) + + err = worker.Run(ctx, func() {}) + require.NoError(t, err) + + progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID()) + require.NoError(t, err) + require.EqualValues(t, progress.PiecesFailed, 0) + require.EqualValues(t, progress.PiecesTransferred, 1) + + exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID()) + require.NoError(t, err) + require.NotNil(t, exitStatus.ExitFinishedAt) + require.True(t, exitStatus.ExitSuccess) + require.Equal(t, worker.NumFailed, int64(0)) + require.Equal(t, worker.NumSucceeded, int64(1)) + require.Equal(t, worker.Connects, int64(1)) + }) +} + func TestWorkerTimeout(t *testing.T) { const successThreshold = 4 testplanet.Run(t, testplanet.Config{