From ebeee58001699cfa972129afcfb8e4a0b737f452 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao Date: Tue, 7 Jan 2020 21:33:41 -0500 Subject: [PATCH] storagenode/gracefulexit: remove satellite entry when node fail precondition Change-Id: I3c215170f10f0053e4f8718ee31d64d93f52ec80 --- storagenode/gracefulexit/chore.go | 1 + storagenode/gracefulexit/worker.go | 11 +++++ storagenode/gracefulexit/worker_test.go | 61 +++++++++++++++++++++++++ storagenode/satellites/satellites.go | 2 + storagenode/storagenodedb/satellites.go | 8 ++++ 5 files changed, 83 insertions(+) diff --git a/storagenode/gracefulexit/chore.go b/storagenode/gracefulexit/chore.go index a36202e41..e06db17cd 100644 --- a/storagenode/gracefulexit/chore.go +++ b/storagenode/gracefulexit/chore.go @@ -91,6 +91,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) { chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satelliteID)) chore.exitingMap.Delete(satelliteID) }) + if err != nil { chore.log.Error("worker failed", zap.Error(err)) } diff --git a/storagenode/gracefulexit/worker.go b/storagenode/gracefulexit/worker.go index 5f136e526..ae0812271 100644 --- a/storagenode/gracefulexit/worker.go +++ b/storagenode/gracefulexit/worker.go @@ -14,9 +14,11 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/errs2" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/rpc" + "storj.io/common/rpc/rpcstatus" "storj.io/common/signing" "storj.io/common/storj" "storj.io/common/sync2" @@ -85,6 +87,15 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) { // Done return nil } + if errs2.IsRPC(err, rpcstatus.FailedPrecondition) { + // delete the entry from satellite table and inform graceful exit has failed to start + deleteErr := worker.satelliteDB.CancelGracefulExit(ctx, worker.satelliteID) + if deleteErr != nil { + // TODO: what to do now? + return errs.Combine(deleteErr, err) + } + return errs.Wrap(err) + } if err != nil { // TODO what happened return errs.Wrap(err) diff --git a/storagenode/gracefulexit/worker_test.go b/storagenode/gracefulexit/worker_test.go index b0949e6f2..5391cf673 100644 --- a/storagenode/gracefulexit/worker_test.go +++ b/storagenode/gracefulexit/worker_test.go @@ -11,12 +11,15 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest" + "storj.io/common/errs2" "storj.io/common/memory" + "storj.io/common/rpc/rpcstatus" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testblobs" "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" "storj.io/storj/satellite/overlay" "storj.io/storj/storagenode" "storj.io/storj/storagenode/gracefulexit" @@ -183,3 +186,61 @@ func TestWorkerTimeout(t *testing.T) { require.False(t, exitStatus.ExitSuccess) }) } + +func TestWorkerFailure_IneligibleNodeAge(t *testing.T) { + successThreshold := 4 + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, + StorageNodeCount: 5, + UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(logger *zap.Logger, index int, config *satellite.Config) { + // Set the required node age to 1 month. + config.GracefulExit.NodeMinAgeInMonths = 1 + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + satellite := planet.Satellites[0] + ul := planet.Uplinks[0] + + satellite.GracefulExit.Chore.Loop.Pause() + + rs := &storj.RedundancyScheme{ + Algorithm: storj.ReedSolomon, + RequiredShares: 2, + RepairShares: 3, + OptimalShares: int16(successThreshold), + TotalShares: int16(successThreshold), + } + + err := ul.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB)) + require.NoError(t, err) + + exitingNode, err := findNodeToExit(ctx, planet, 1) + require.NoError(t, err) + exitingNode.GracefulExit.Chore.Loop.Pause() + + spaceUsed, err := exitingNode.Storage2.BlobsCache.SpaceUsedForPieces(ctx) + require.NoError(t, err) + err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite.ID(), time.Now(), spaceUsed) + require.NoError(t, err) + + worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), + gracefulexit.Config{ + ChoreInterval: 0, + NumWorkers: 2, + NumConcurrentTransfers: 2, + MinBytesPerSecond: 128, + MinDownloadTimeout: 2 * time.Minute, + }) + defer ctx.Check(worker.Close) + + err = worker.Run(ctx, func() {}) + require.Error(t, err) + require.True(t, errs2.IsRPC(err, rpcstatus.FailedPrecondition)) + + result, err := exitingNode.DB.Satellites().ListGracefulExits(ctx) + require.NoError(t, err) + require.Len(t, result, 0) + }) +} diff --git a/storagenode/satellites/satellites.go b/storagenode/satellites/satellites.go index a34987803..2a998021d 100644 --- a/storagenode/satellites/satellites.go +++ b/storagenode/satellites/satellites.go @@ -51,6 +51,8 @@ type DB interface { GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite Satellite, err error) // InitiateGracefulExit updates the database to reflect the beginning of a graceful exit InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) error + // CancelGracefulExit removes that satellite by ID + CancelGracefulExit(ctx context.Context, satelliteID storj.NodeID) error // UpdateGracefulExit increments the total bytes deleted during a graceful exit UpdateGracefulExit(ctx context.Context, satelliteID storj.NodeID, bytesDeleted int64) error // CompleteGracefulExit updates the database when a graceful exit is completed or failed diff --git a/storagenode/storagenodedb/satellites.go b/storagenode/storagenodedb/satellites.go index f142d7e53..c16f1630b 100644 --- a/storagenode/storagenodedb/satellites.go +++ b/storagenode/storagenodedb/satellites.go @@ -59,6 +59,14 @@ func (db *satellitesDB) InitiateGracefulExit(ctx context.Context, satelliteID st })) } +// CancelGracefulExit delete an entry by satellite ID +func (db *satellitesDB) CancelGracefulExit(ctx context.Context, satelliteID storj.NodeID) (err error) { + defer mon.Task()(&ctx)(&err) + + _, err = db.ExecContext(ctx, "DELETE FROM satellite_exit_progress WHERE satellite_id = ?", satelliteID) + return ErrSatellitesDB.Wrap(err) +} + // UpdateGracefulExit increments the total bytes deleted during a graceful exit func (db *satellitesDB) UpdateGracefulExit(ctx context.Context, satelliteID storj.NodeID, addToBytesDeleted int64) (err error) { defer mon.Task()(&ctx)(&err)