storagenode/gracefulexit: remove satellite entry when node fail precondition
Change-Id: I3c215170f10f0053e4f8718ee31d64d93f52ec80
This commit is contained in:
parent
05e4a86654
commit
ebeee58001
@ -91,6 +91,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
chore.log.Debug("finished for satellite.", zap.Stringer("Satellite ID", satelliteID))
|
||||
chore.exitingMap.Delete(satelliteID)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
chore.log.Error("worker failed", zap.Error(err))
|
||||
}
|
||||
|
@ -14,9 +14,11 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
"storj.io/common/signing"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/sync2"
|
||||
@ -85,6 +87,15 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
||||
// Done
|
||||
return nil
|
||||
}
|
||||
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
|
||||
// delete the entry from satellite table and inform graceful exit has failed to start
|
||||
deleteErr := worker.satelliteDB.CancelGracefulExit(ctx, worker.satelliteID)
|
||||
if deleteErr != nil {
|
||||
// TODO: what to do now?
|
||||
return errs.Combine(deleteErr, err)
|
||||
}
|
||||
return errs.Wrap(err)
|
||||
}
|
||||
if err != nil {
|
||||
// TODO what happened
|
||||
return errs.Wrap(err)
|
||||
|
@ -11,12 +11,15 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/common/memory"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testblobs"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/gracefulexit"
|
||||
@ -183,3 +186,61 @@ func TestWorkerTimeout(t *testing.T) {
|
||||
require.False(t, exitStatus.ExitSuccess)
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkerFailure_IneligibleNodeAge(t *testing.T) {
|
||||
successThreshold := 4
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1,
|
||||
StorageNodeCount: 5,
|
||||
UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(logger *zap.Logger, index int, config *satellite.Config) {
|
||||
// Set the required node age to 1 month.
|
||||
config.GracefulExit.NodeMinAgeInMonths = 1
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
ul := planet.Uplinks[0]
|
||||
|
||||
satellite.GracefulExit.Chore.Loop.Pause()
|
||||
|
||||
rs := &storj.RedundancyScheme{
|
||||
Algorithm: storj.ReedSolomon,
|
||||
RequiredShares: 2,
|
||||
RepairShares: 3,
|
||||
OptimalShares: int16(successThreshold),
|
||||
TotalShares: int16(successThreshold),
|
||||
}
|
||||
|
||||
err := ul.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
|
||||
require.NoError(t, err)
|
||||
|
||||
exitingNode, err := findNodeToExit(ctx, planet, 1)
|
||||
require.NoError(t, err)
|
||||
exitingNode.GracefulExit.Chore.Loop.Pause()
|
||||
|
||||
spaceUsed, err := exitingNode.Storage2.BlobsCache.SpaceUsedForPieces(ctx)
|
||||
require.NoError(t, err)
|
||||
err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite.ID(), time.Now(), spaceUsed)
|
||||
require.NoError(t, err)
|
||||
|
||||
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(),
|
||||
gracefulexit.Config{
|
||||
ChoreInterval: 0,
|
||||
NumWorkers: 2,
|
||||
NumConcurrentTransfers: 2,
|
||||
MinBytesPerSecond: 128,
|
||||
MinDownloadTimeout: 2 * time.Minute,
|
||||
})
|
||||
defer ctx.Check(worker.Close)
|
||||
|
||||
err = worker.Run(ctx, func() {})
|
||||
require.Error(t, err)
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.FailedPrecondition))
|
||||
|
||||
result, err := exitingNode.DB.Satellites().ListGracefulExits(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result, 0)
|
||||
})
|
||||
}
|
||||
|
@ -51,6 +51,8 @@ type DB interface {
|
||||
GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite Satellite, err error)
|
||||
// InitiateGracefulExit updates the database to reflect the beginning of a graceful exit
|
||||
InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) error
|
||||
// CancelGracefulExit removes that satellite by ID
|
||||
CancelGracefulExit(ctx context.Context, satelliteID storj.NodeID) error
|
||||
// UpdateGracefulExit increments the total bytes deleted during a graceful exit
|
||||
UpdateGracefulExit(ctx context.Context, satelliteID storj.NodeID, bytesDeleted int64) error
|
||||
// CompleteGracefulExit updates the database when a graceful exit is completed or failed
|
||||
|
@ -59,6 +59,14 @@ func (db *satellitesDB) InitiateGracefulExit(ctx context.Context, satelliteID st
|
||||
}))
|
||||
}
|
||||
|
||||
// CancelGracefulExit delete an entry by satellite ID
|
||||
func (db *satellitesDB) CancelGracefulExit(ctx context.Context, satelliteID storj.NodeID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
_, err = db.ExecContext(ctx, "DELETE FROM satellite_exit_progress WHERE satellite_id = ?", satelliteID)
|
||||
return ErrSatellitesDB.Wrap(err)
|
||||
}
|
||||
|
||||
// UpdateGracefulExit increments the total bytes deleted during a graceful exit
|
||||
func (db *satellitesDB) UpdateGracefulExit(ctx context.Context, satelliteID storj.NodeID, addToBytesDeleted int64) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
Loading…
Reference in New Issue
Block a user