satellite/gracefulexit: revamp graceful exit

Currently, graceful exit is a complicated subsystem that keeps a queue
of all pieces expected to be on a node, and asks the node to transfer
those pieces to other nodes one by one. The complexity of the system
has, unfortunately, led to numerous bugs and unexpected behaviors.

We have decided to remove this entire subsystem and restructure graceful
exit as follows:

* Nodes will signal their intent to exit gracefully
* The satellite will not send any new pieces to gracefully exiting nodes
* Pieces on gracefully exiting nodes will be considered by the repair
  subsystem as "retrievable but unhealthy". They will be repaired off of
  the exiting node as needed.
* After one month (with an appropriately high online score), the node
  will be considered exited, and held amounts for the node will be
  released. The repair worker will continue to fetch pieces from the
  node as long as the node stays online.
* If, at the end of the month, a node's online score is below a certain
  threshold, its graceful exit will fail.

Refs: https://github.com/storj/storj/issues/6042
Change-Id: I52d4e07a4198e9cb2adf5e6cee2cb64d6f9f426b
This commit is contained in:
paul cannon 2023-07-25 12:48:36 -05:00 committed by Elek, Márton
parent 3d3785a605
commit 72189330fd
17 changed files with 712 additions and 90 deletions

View File

@ -27,7 +27,7 @@ import (
)
// generateGracefulExitCSV creates a report with graceful exit data for exiting or exited nodes in a given period.
func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Time, end time.Time, output io.Writer) error {
func generateGracefulExitCSV(ctx context.Context, timeBased bool, completed bool, start time.Time, end time.Time, output io.Writer) error {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), reportsGracefulExitCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
@ -67,11 +67,14 @@ func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Tim
if err != nil {
return err
}
exitProgress, err := db.GracefulExit().GetProgress(ctx, id)
if gracefulexit.ErrNodeNotFound.Has(err) {
exitProgress = &gracefulexit.Progress{}
} else if err != nil {
return err
exitProgress := &gracefulexit.Progress{}
if !timeBased {
exitProgress, err = db.GracefulExit().GetProgress(ctx, id)
if gracefulexit.ErrNodeNotFound.Has(err) {
exitProgress = &gracefulexit.Progress{}
} else if err != nil {
return err
}
}
exitStatus := node.ExitStatus

View File

@ -354,6 +354,7 @@ var (
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
Output string `help:"destination of report output" default:""`
Completed bool `help:"whether to output (initiated and completed) or (initiated and not completed)" default:"false"`
TimeBased bool `help:"whether the satellite is using time-based graceful exit (and thus, whether to include piece transfer progress in output)" default:"false"`
}
reportsVerifyGracefulExitReceiptCfg struct {
}
@ -660,7 +661,7 @@ func cmdReportsGracefulExit(cmd *cobra.Command, args []string) (err error) {
// send output to stdout
if reportsGracefulExitCfg.Output == "" {
return generateGracefulExitCSV(ctx, reportsGracefulExitCfg.Completed, start, end, os.Stdout)
return generateGracefulExitCSV(ctx, reportsGracefulExitCfg.TimeBased, reportsGracefulExitCfg.Completed, start, end, os.Stdout)
}
// send output to file
@ -673,7 +674,7 @@ func cmdReportsGracefulExit(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, file.Close())
}()
return generateGracefulExitCSV(ctx, reportsGracefulExitCfg.Completed, start, end, file)
return generateGracefulExitCSV(ctx, reportsGracefulExitCfg.TimeBased, reportsGracefulExitCfg.Completed, start, end, file)
}
func cmdNodeUsage(cmd *cobra.Command, args []string) (err error) {
@ -913,6 +914,9 @@ func cmdStripeCustomer(cmd *cobra.Command, args []string) (err error) {
func cmdConsistencyGECleanup(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
if runCfg.GracefulExit.TimeBased {
return errs.New("this command is not supported with time-based graceful exit")
}
before, err := time.Parse("2006-01-02", consistencyGECleanupCfg.Before)
if err != nil {
return errs.New("before flag value isn't of the expected format. %+v", err)

View File

@ -30,6 +30,8 @@ func TestChore(t *testing.T) {
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
// this test can be removed when we are using time-based GE everywhere
config.GracefulExit.TimeBased = false
},
testplanet.ReconfigureRS(4, 6, 8, 8),
),
@ -147,6 +149,8 @@ func TestChoreDurabilityRatio(t *testing.T) {
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
// this test can be removed when we are using time-based GE everywhere
config.GracefulExit.TimeBased = false
},
testplanet.ReconfigureRS(2, 3, successThreshold, 4),
),

View File

@ -25,7 +25,12 @@ var (
// Config for the chore.
type Config struct {
Enabled bool `help:"whether or not graceful exit is enabled on the satellite side." default:"true"`
Enabled bool `help:"whether or not graceful exit is enabled on the satellite side." default:"true"`
TimeBased bool `help:"whether graceful exit will be determined by a period of time, rather than by instructing nodes to transfer one piece at a time" default:"false"`
NodeMinAgeInMonths int `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6" testDefault:"0"`
// these items only apply when TimeBased=false:
ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500" testDefault:"10"`
ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s" testDefault:"$TESTINTERVAL"`
@ -38,8 +43,13 @@ type Config struct {
MaxInactiveTimeFrame time.Duration `help:"maximum inactive time frame of transfer activities per node." default:"168h" testDefault:"10s"`
RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"2h" testDefault:"1m"`
MaxOrderLimitSendCount int `help:"maximum number of order limits a satellite sends to a node before marking piece transfer failed" default:"10" testDefault:"3"`
NodeMinAgeInMonths int `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6" testDefault:"0"`
AsOfSystemTimeInterval time.Duration `help:"interval for AS OF SYSTEM TIME clause (crdb specific) to read from db at a specific time in the past" default:"-10s" testDefault:"-1µs"`
TransferQueueBatchSize int `help:"batch size (crdb specific) for deleting and adding items to the transfer queue" default:"1000"`
// these items only apply when TimeBased=true:
GracefulExitDurationInDays int `help:"number of days it takes to execute a passive graceful exit" default:"30" testDefault:"1"`
OfflineCheckInterval time.Duration `help:"how frequently to check uptime ratio of gracefully-exiting nodes" default:"30m" testDefault:"10s"`
MinimumOnlineScore float64 `help:"a gracefully exiting node will fail GE if it falls below this online score (compare AuditHistoryConfig.OfflineThreshold)" default:"0.8"`
}

View File

@ -12,6 +12,8 @@ import (
"storj.io/storj/satellite/metabase"
)
// This whole file can be removed when we are using TimeBased GE everywhere.
// Progress represents the persisted graceful exit progress record.
type Progress struct {
NodeID storj.NodeID

View File

@ -56,6 +56,8 @@ type Endpoint struct {
peerIdentities overlay.PeerIdentities
config Config
recvTimeout time.Duration
nowFunc func() time.Time
}
// connectionsTracker for tracking ongoing connections on this api server.
@ -109,9 +111,16 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overla
peerIdentities: peerIdentities,
config: config,
recvTimeout: config.RecvTimeout,
nowFunc: func() time.Time { return time.Now().UTC() },
}
}
// SetNowFunc applies a function to be used in determining the "now" time for graceful exit
// purposes.
func (endpoint *Endpoint) SetNowFunc(timeFunc func() time.Time) {
endpoint.nowFunc = timeFunc
}
// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStream) (err error) {
ctx := stream.Context()
@ -122,8 +131,49 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
return rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error())
}
nodeID := peer.ID
endpoint.log.Debug("graceful exit process", zap.Stringer("Node ID", nodeID))
endpoint.log.Debug("graceful exit process", zap.Stringer("Node ID", peer.ID))
if endpoint.config.TimeBased {
return endpoint.processTimeBased(ctx, stream, peer.ID)
}
return endpoint.processPiecewise(ctx, stream, peer.ID)
}
func (endpoint *Endpoint) processTimeBased(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
nodeInfo, err := endpoint.overlay.Get(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
isDisqualified, err := endpoint.handleDisqualifiedNodeTimeBased(ctx, nodeInfo)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if isDisqualified {
return rpcstatus.Error(rpcstatus.FailedPrecondition, "node is disqualified")
}
msg, err := endpoint.checkExitStatusTimeBased(ctx, nodeInfo)
if err != nil {
if ErrIneligibleNodeAge.Has(err) {
return rpcstatus.Error(rpcstatus.FailedPrecondition, err.Error())
}
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = stream.Send(msg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
return nil
}
// process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
func (endpoint *Endpoint) processPiecewise(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, nodeID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
// ensure that only one connection can be opened for a single node at a time
if !endpoint.connections.tryAdd(nodeID) {
@ -315,7 +365,7 @@ func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStr
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
ExitFinishedAt: endpoint.nowFunc(),
ExitSuccess: false,
}
@ -560,7 +610,7 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *PendingMap,
if err != nil {
return Error.Wrap(err)
}
now := time.Now().UTC()
now := endpoint.nowFunc()
failedCount := 1
if transferQueueItem.FailedCount != nil {
failedCount = *transferQueueItem.FailedCount + 1
@ -643,7 +693,7 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto
// update graceful exit status to be failed
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
ExitFinishedAt: endpoint.nowFunc(),
ExitSuccess: false,
}
@ -664,6 +714,30 @@ func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID sto
return false, nil
}
func (endpoint *Endpoint) handleDisqualifiedNodeTimeBased(ctx context.Context, nodeInfo *overlay.NodeDossier) (isDisqualified bool, err error) {
if nodeInfo.Disqualified != nil {
if nodeInfo.ExitStatus.ExitInitiatedAt == nil {
// node never started graceful exit before, and it is already disqualified; nothing
// for us to do here
return true, nil
}
if nodeInfo.ExitStatus.ExitFinishedAt == nil {
// node did start graceful exit and hasn't been marked as finished, although it
// has been disqualified. We'll correct that now.
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeInfo.Id,
ExitFinishedAt: endpoint.nowFunc(),
ExitSuccess: false,
}
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
return true, Error.Wrap(err)
}
return true, nil
}
return false, nil
}
func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSatelliteGracefulExit_ProcessStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason) error {
finishedMsg, err := endpoint.getFinishedMessage(ctx, exitStatusRequest.NodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, failedReason)
if err != nil {
@ -691,40 +765,48 @@ func (endpoint *Endpoint) handleFinished(ctx context.Context, stream pb.DRPCSate
func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, nodeID storj.NodeID, finishedAt time.Time, success bool, reason pb.ExitFailed_Reason) (message *pb.SatelliteMessage, err error) {
if success {
unsigned := &pb.ExitCompleted{
SatelliteId: endpoint.signer.ID(),
NodeId: nodeID,
Completed: finishedAt,
}
signed, err := signing.SignExitCompleted(ctx, endpoint.signer, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
message = &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitCompleted{
ExitCompleted: signed,
}}
} else {
unsigned := &pb.ExitFailed{
SatelliteId: endpoint.signer.ID(),
NodeId: nodeID,
Failed: finishedAt,
}
if reason >= 0 {
unsigned.Reason = reason
}
signed, err := signing.SignExitFailed(ctx, endpoint.signer, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
message = &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitFailed{
ExitFailed: signed,
}}
return endpoint.getFinishedSuccessMessage(ctx, nodeID, finishedAt)
}
return endpoint.getFinishedFailureMessage(ctx, nodeID, finishedAt, reason)
}
func (endpoint *Endpoint) getFinishedSuccessMessage(ctx context.Context, nodeID storj.NodeID, finishedAt time.Time) (message *pb.SatelliteMessage, err error) {
unsigned := &pb.ExitCompleted{
SatelliteId: endpoint.signer.ID(),
NodeId: nodeID,
Completed: finishedAt,
}
signed, err := signing.SignExitCompleted(ctx, endpoint.signer, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
return &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitCompleted{
ExitCompleted: signed,
}}, nil
}
func (endpoint *Endpoint) getFinishedFailureMessage(ctx context.Context, nodeID storj.NodeID, finishedAt time.Time, reason pb.ExitFailed_Reason) (message *pb.SatelliteMessage, err error) {
unsigned := &pb.ExitFailed{
SatelliteId: endpoint.signer.ID(),
NodeId: nodeID,
Failed: finishedAt,
}
if reason >= 0 {
unsigned.Reason = reason
}
signed, err := signing.SignExitFailed(ctx, endpoint.signer, unsigned)
if err != nil {
return nil, Error.Wrap(err)
}
message = &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitFailed{
ExitFailed: signed,
}}
if !endpoint.config.TimeBased {
err = endpoint.overlay.DisqualifyNode(ctx, nodeID, overlay.DisqualificationReasonUnknown)
if err != nil {
return nil, Error.Wrap(err)
}
}
return message, nil
}
@ -792,11 +874,11 @@ func (endpoint *Endpoint) checkExitStatus(ctx context.Context, nodeID storj.Node
return nil, Error.Wrap(err)
}
geEligibilityDate := nodeDossier.CreatedAt.AddDate(0, endpoint.config.NodeMinAgeInMonths, 0)
if time.Now().Before(geEligibilityDate) {
if endpoint.nowFunc().Before(geEligibilityDate) {
return nil, ErrIneligibleNodeAge.New("will be eligible after %s", geEligibilityDate.String())
}
request := &overlay.ExitStatusRequest{NodeID: nodeID, ExitInitiatedAt: time.Now().UTC()}
request := &overlay.ExitStatusRequest{NodeID: nodeID, ExitInitiatedAt: endpoint.nowFunc()}
node, err := endpoint.overlaydb.UpdateExitStatus(ctx, request)
if err != nil {
return nil, Error.Wrap(err)
@ -812,7 +894,7 @@ func (endpoint *Endpoint) checkExitStatus(ctx context.Context, nodeID storj.Node
}
// graceful exit initiation metrics
age := time.Now().UTC().Sub(node.CreatedAt.UTC())
age := endpoint.nowFunc().Sub(node.CreatedAt.UTC())
mon.FloatVal("graceful_exit_init_node_age_seconds").Observe(age.Seconds()) //mon:locked
mon.IntVal("graceful_exit_init_node_audit_success_count").Observe(reputationInfo.AuditSuccessCount) //mon:locked
mon.IntVal("graceful_exit_init_node_audit_total_count").Observe(reputationInfo.TotalAuditCount) //mon:locked
@ -828,6 +910,83 @@ func (endpoint *Endpoint) checkExitStatus(ctx context.Context, nodeID storj.Node
return nil, nil
}
func (endpoint *Endpoint) checkExitStatusTimeBased(ctx context.Context, nodeInfo *overlay.NodeDossier) (*pb.SatelliteMessage, error) {
if nodeInfo.ExitStatus.ExitFinishedAt != nil {
// TODO maybe we should store the reason in the DB so we know how it originally failed.
return endpoint.getFinishedMessage(ctx, nodeInfo.Id, *nodeInfo.ExitStatus.ExitFinishedAt, nodeInfo.ExitStatus.ExitSuccess, -1)
}
if nodeInfo.ExitStatus.ExitInitiatedAt == nil {
// the node has just requested to begin GE. verify eligibility and set it up in the DB.
geEligibilityDate := nodeInfo.CreatedAt.AddDate(0, endpoint.config.NodeMinAgeInMonths, 0)
if endpoint.nowFunc().Before(geEligibilityDate) {
return nil, ErrIneligibleNodeAge.New("will be eligible after %s", geEligibilityDate.String())
}
request := &overlay.ExitStatusRequest{
NodeID: nodeInfo.Id,
ExitInitiatedAt: endpoint.nowFunc(),
}
node, err := endpoint.overlaydb.UpdateExitStatus(ctx, request)
if err != nil {
return nil, Error.Wrap(err)
}
reputationInfo, err := endpoint.reputation.Get(ctx, nodeInfo.Id)
if err != nil {
return nil, Error.Wrap(err)
}
// graceful exit initiation metrics
age := endpoint.nowFunc().Sub(node.CreatedAt)
mon.FloatVal("graceful_exit_init_node_age_seconds").Observe(age.Seconds()) //mon:locked
mon.IntVal("graceful_exit_init_node_audit_success_count").Observe(reputationInfo.AuditSuccessCount) //mon:locked
mon.IntVal("graceful_exit_init_node_audit_total_count").Observe(reputationInfo.TotalAuditCount) //mon:locked
mon.IntVal("graceful_exit_init_node_piece_count").Observe(node.PieceCount) //mon:locked
} else {
// the node has already initiated GE and hasn't finished yet... or has it?!?!
geDoneDate := nodeInfo.ExitStatus.ExitInitiatedAt.AddDate(0, 0, endpoint.config.GracefulExitDurationInDays)
if endpoint.nowFunc().After(geDoneDate) {
// ok actually it has finished, and this is the first time we've noticed it
reputationInfo, err := endpoint.reputation.Get(ctx, nodeInfo.Id)
if err != nil {
return nil, Error.Wrap(err)
}
request := &overlay.ExitStatusRequest{
NodeID: nodeInfo.Id,
ExitFinishedAt: endpoint.nowFunc(),
ExitSuccess: true,
}
// We don't check the online score constantly over the course of the graceful exit,
// because we want to give the node a chance to get the score back up if it's
// temporarily low.
//
// Instead, we check the overall score at the end of the GE period.
if reputationInfo.OnlineScore < endpoint.config.MinimumOnlineScore {
request.ExitSuccess = false
}
endpoint.log.Info("node completed graceful exit",
zap.Float64("online score", reputationInfo.OnlineScore),
zap.Bool("success", request.ExitSuccess),
zap.Stringer("node ID", nodeInfo.Id))
updatedNode, err := endpoint.overlaydb.UpdateExitStatus(ctx, request)
if err != nil {
return nil, Error.Wrap(err)
}
if request.ExitSuccess {
mon.Meter("graceful_exit_success").Mark(1) //mon:locked
return endpoint.getFinishedSuccessMessage(ctx, updatedNode.Id, *updatedNode.ExitStatus.ExitFinishedAt)
}
mon.Meter("graceful_exit_failure").Mark(1)
return endpoint.getFinishedFailureMessage(ctx, updatedNode.Id, *updatedNode.ExitStatus.ExitFinishedAt, pb.ExitFailed_INACTIVE_TIMEFRAME_EXCEEDED)
}
}
// this will cause the node to disconnect, wait a bit, and then try asking again.
return &pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}, nil
}
func (endpoint *Endpoint) generateExitStatusRequest(ctx context.Context, nodeID storj.NodeID) (*overlay.ExitStatusRequest, pb.ExitFailed_Reason, error) {
var exitFailedReason pb.ExitFailed_Reason = -1
progress, err := endpoint.db.GetProgress(ctx, nodeID)
@ -846,7 +1005,7 @@ func (endpoint *Endpoint) generateExitStatusRequest(ctx context.Context, nodeID
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: progress.NodeID,
ExitFinishedAt: time.Now().UTC(),
ExitFinishedAt: endpoint.nowFunc(),
}
// check node's exiting progress to see if it has failed passed max failure threshold
if processed > 0 && float64(progress.PiecesFailed)/float64(processed)*100 >= float64(endpoint.config.OverallMaxFailuresPercentage) {
@ -923,14 +1082,14 @@ func (endpoint *Endpoint) GracefulExitFeasibility(ctx context.Context, req *pb.G
var response pb.GracefulExitFeasibilityResponse
nodeDossier, err := endpoint.overlaydb.Get(ctx, peer.ID)
nodeDossier, err := endpoint.overlay.Get(ctx, peer.ID)
if err != nil {
endpoint.log.Error("unable to retrieve node dossier for attempted exiting node", zap.Stringer("node ID", peer.ID))
return nil, Error.Wrap(err)
}
eligibilityDate := nodeDossier.CreatedAt.AddDate(0, endpoint.config.NodeMinAgeInMonths, 0)
if time.Now().Before(eligibilityDate) {
if endpoint.nowFunc().Before(eligibilityDate) {
response.IsAllowed = false
} else {
response.IsAllowed = true

View File

@ -31,6 +31,7 @@ import (
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/reputation"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/blobstore/testblobs"
"storj.io/storj/storagenode/gracefulexit"
@ -46,7 +47,7 @@ type exitProcessClient interface {
Recv() (*pb.SatelliteMessage, error)
}
func TestSuccess(t *testing.T) {
func TestSuccessOld(t *testing.T) {
testTransfers(t, numObjects, numMultipartObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var pieceID storj.PieceID
failedCount := 0
@ -148,6 +149,60 @@ func TestSuccess(t *testing.T) {
})
}
func TestSuccess(t *testing.T) {
const steps = 5
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.TimeBased = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// check that there are no exiting nodes.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 0)
exitingNode := planet.StorageNodes[0]
simTime := time.Now()
satellite.GracefulExit.Endpoint.SetNowFunc(func() time.Time { return simTime })
doneTime := simTime.AddDate(0, 0, satellite.Config.GracefulExit.GracefulExitDurationInDays)
interval := doneTime.Sub(simTime) / steps
// we should get NotReady responses until after the GE time has elapsed.
for simTime.Before(doneTime) {
response, err := callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
require.IsType(t, (*pb.SatelliteMessage_NotReady)(nil), response.GetMessage())
// check that the exiting node is still currently exiting.
exitingNodes, err = satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
simTime = simTime.Add(interval)
}
simTime = doneTime.Add(time.Second)
// now we should get a successful finish message
response, err := callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
require.IsType(t, (*pb.SatelliteMessage_ExitCompleted)(nil), response.GetMessage())
// verify signature on exit receipt and we're done
m := response.GetMessage().(*pb.SatelliteMessage_ExitCompleted)
signee := signing.SigneeFromPeerIdentity(satellite.Identity.PeerIdentity())
err = signing.VerifyExitCompleted(ctx, signee, m.ExitCompleted)
require.NoError(t, err)
})
}
func TestConcurrentConnections(t *testing.T) {
successThreshold := 4
testplanet.Run(t, testplanet.Config{
@ -155,7 +210,13 @@ func TestConcurrentConnections(t *testing.T) {
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
@ -263,6 +324,8 @@ func TestRecvTimeout(t *testing.T) {
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
config.GracefulExit.RecvTimeout = 10 * time.Millisecond
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
),
@ -398,11 +461,15 @@ func TestInvalidStorageNodeSignature(t *testing.T) {
})
}
func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
func TestExitDisqualifiedNodeFailOnStartOld(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 2,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
}},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
exitingNode := planet.StorageNodes[0]
@ -434,6 +501,46 @@ func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
}
func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 2,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.TimeBased = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
exitingNode := planet.StorageNodes[0]
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
require.NoError(t, err)
conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL())
require.NoError(t, err)
defer ctx.Check(conn.Close)
client := pb.NewDRPCSatelliteGracefulExitClient(conn)
processClient, err := client.Process(ctx)
require.NoError(t, err)
// Process endpoint should return immediately if node is disqualified
response, err := processClient.Recv()
require.True(t, errs2.IsRPC(err, rpcstatus.FailedPrecondition))
require.Nil(t, response)
require.NoError(t, processClient.Close())
// make sure GE was not initiated for the disqualified node
exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
require.Nil(t, exitStatus.ExitInitiatedAt)
require.False(t, exitStatus.ExitSuccess)
})
}
func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
testTransfers(t, numObjects, numMultipartObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var disqualifiedError error
@ -954,6 +1061,7 @@ func testUpdateSegmentFailureDuplicatedNodeID(t *testing.T, ctx *testcontext.Con
require.True(t, ok)
require.Equal(t, 1, count)
}
func TestExitDisabled(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
@ -994,7 +1102,13 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
@ -1086,7 +1200,13 @@ func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
@ -1264,6 +1384,8 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
// so we set the max failures percentage extra high.
config.GracefulExit.OverallMaxFailuresPercentage = 101
config.GracefulExit.MaxOrderLimitSendCount = maxOrderLimitSendCount
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
testplanet.ReconfigureRS(2, 3, 4, 4),
),
@ -1380,6 +1502,40 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
}
func TestIneligibleNodeAge(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 5,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// Set the required node age to 1 month.
config.GracefulExit.NodeMinAgeInMonths = 1
config.GracefulExit.TimeBased = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// check that there are no exiting nodes.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 0)
exitingNode := planet.StorageNodes[0]
// try to initiate GE; expect to get a node ineligible error
response, err := callProcess(ctx, exitingNode, satellite)
require.Error(t, err)
require.Nil(t, response)
require.True(t, errs2.IsRPC(err, rpcstatus.FailedPrecondition))
// check that there are still no exiting nodes
exitingNodes, err = satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 0)
})
}
func TestIneligibleNodeAgeOld(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 5,
@ -1445,7 +1601,13 @@ func testTransfers(t *testing.T, objects int, multipartObjects int, verifier fun
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
// These tests can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
@ -1609,6 +1771,128 @@ func TestUpdatePiecesCheckDuplicates(t *testing.T) {
})
}
func TestNodeAlreadyExited(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.TimeBased = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// check that there are no exiting nodes.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 0)
exitingNode := planet.StorageNodes[0]
simTime := time.Now()
satellite.GracefulExit.Endpoint.SetNowFunc(func() time.Time { return simTime })
doneTime := simTime.AddDate(0, 0, satellite.Config.GracefulExit.GracefulExitDurationInDays)
// initiate GE
response, err := callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
require.IsType(t, (*pb.SatelliteMessage_NotReady)(nil), response.GetMessage())
// jump to when GE will be done
simTime = doneTime.Add(time.Second)
// should get ExitCompleted now
response, err = callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
require.IsType(t, (*pb.SatelliteMessage_ExitCompleted)(nil), response.GetMessage())
// now that the node has successfully exited, try doing it again! we expect to get the
// ExitCompleted message again.
response, err = callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
require.IsType(t, (*pb.SatelliteMessage_ExitCompleted)(nil), response.GetMessage())
// check that node is not marked as exiting still
exitingNodes, err = satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 0)
})
}
func TestNodeFailingGracefulExitWithLowOnlineScore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 4,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Reputation.AuditHistory.WindowSize = 24 * time.Hour
config.Reputation.AuditHistory.TrackingPeriod = 3 * 24 * time.Hour
config.Reputation.FlushInterval = 0
config.GracefulExit.MinimumOnlineScore = 0.6
config.GracefulExit.TimeBased = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
exitingNode := planet.StorageNodes[0]
simTime := time.Now()
satellite.GracefulExit.Endpoint.SetNowFunc(func() time.Time { return simTime })
doneTime := simTime.AddDate(0, 0, satellite.Config.GracefulExit.GracefulExitDurationInDays)
// initiate GE
response, err := callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
require.IsType(t, (*pb.SatelliteMessage_NotReady)(nil), response.GetMessage())
// set the audit history for that node to reflect a poor online score
last := reputation.AuditSuccess
for simTime.Before(doneTime) {
// alternate between Success and Offline to get a ~50% score
if last == reputation.AuditSuccess {
last = reputation.AuditOffline
} else {
last = reputation.AuditSuccess
}
_, err := satellite.DB.Reputation().Update(ctx, reputation.UpdateRequest{
NodeID: exitingNode.ID(),
AuditOutcome: last,
Config: satellite.Config.Reputation,
}, simTime)
require.NoError(t, err)
// GE shouldn't fail until the end of the period, so node has a chance to get score back up
response, err := callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
require.IsTypef(t, (*pb.SatelliteMessage_NotReady)(nil), response.GetMessage(), "simTime=%s, doneTime=%s", simTime, doneTime)
simTime = simTime.Add(time.Hour)
}
err = satellite.Reputation.Service.TestFlushAllNodeInfo(ctx)
require.NoError(t, err)
simTime = doneTime.Add(time.Second)
response, err = callProcess(ctx, exitingNode, satellite)
require.NoError(t, err)
msg := response.GetMessage()
require.IsType(t, (*pb.SatelliteMessage_ExitFailed)(nil), msg)
failure := msg.(*pb.SatelliteMessage_ExitFailed)
// validate signature on failure message
signee := signing.SigneeFromPeerIdentity(satellite.Identity.PeerIdentity())
err = signing.VerifyExitFailed(ctx, signee, failure.ExitFailed)
require.Equal(t, exitingNode.ID(), failure.ExitFailed.NodeId)
// truncate to micros since the Failed time has gone through the database
expectedFailTime := simTime.Truncate(time.Microsecond)
require.Falsef(t, failure.ExitFailed.Failed.Before(expectedFailTime),
"failure time should have been at or after %s: %s", simTime, failure.ExitFailed.Failed)
require.Equal(t, satellite.ID(), failure.ExitFailed.SatelliteId)
require.Equal(t, pb.ExitFailed_INACTIVE_TIMEFRAME_EXCEEDED, failure.ExitFailed.Reason)
require.NoError(t, err)
})
}
func hasDuplicates(pieces metabase.Pieces) bool {
nodePieceCounts := make(map[storj.NodeID]int)
for _, piece := range pieces {
@ -1623,3 +1907,21 @@ func hasDuplicates(pieces metabase.Pieces) bool {
return false
}
func callProcess(ctx *testcontext.Context, exitingNode *testplanet.StorageNode, satellite *testplanet.Satellite) (*pb.SatelliteMessage, error) {
conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL())
if err != nil {
return nil, err
}
defer ctx.Check(conn.Close)
client := pb.NewDRPCSatelliteGracefulExitClient(conn)
c, err := client.Process(ctx)
if err != nil {
return nil, err
}
defer ctx.Check(c.CloseSend)
return c.Recv()
}

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"storj.io/common/memory"
@ -28,6 +29,12 @@ import (
func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
geDB := planet.Satellites[0].DB.GracefulExit()
@ -81,6 +88,12 @@ func TestGracefulexitDB_DeleteFinishedExitProgress(t *testing.T) {
func TestGracefulExit_HandleAsOfSystemTimeBadInput(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
gracefulexitDB := planet.Satellites[0].DB.GracefulExit()
now := time.Now().UTC()
@ -97,6 +110,12 @@ func TestGracefulExit_HandleAsOfSystemTimeBadInput(t *testing.T) {
func TestGracefulExit_DeleteAllFinishedTransferQueueItems(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 7,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
var (
cache = planet.Satellites[0].DB.OverlayCache()
@ -233,7 +252,13 @@ func TestGracefulExit_CopiedObjects(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, 4, 4),
func(log *zap.Logger, index int, config *satellite.Config) {
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
@ -291,6 +316,8 @@ func TestGracefulExit_CopiedObjects(t *testing.T) {
// TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batch
// ensures that deletion works as expected using different batch sizes.
//
// This test can be removed entirely when we are using time-based GE everywhere.
func TestGracefulExit_Enqueue_And_DeleteAllFinishedTransferQueueItems_batchsize(t *testing.T) {
var testCases = []struct {
name string
@ -394,6 +421,8 @@ func generateExitedNodes(t *testing.T, ctx *testcontext.Context, db satellite.DB
// TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch verifies that
// the CRDB batch logic for delete all the transfer queue items of exited nodes
// works as expected.
//
// This test can be removed entirely when we are using time-based GE everywhere.
func TestGracefulExit_DeleteAllFinishedTransferQueueItems_batch(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
const (

View File

@ -31,6 +31,8 @@ func TestObserver(t *testing.T) {
func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
config.GracefulExit.UseRangedLoop = true
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
testplanet.ReconfigureRS(4, 6, 8, 8),
),
@ -150,6 +152,8 @@ func TestObserverDurabilityRatio(t *testing.T) {
func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.MaxInactiveTimeFrame = maximumInactiveTimeFrame
config.GracefulExit.UseRangedLoop = true
// This test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
testplanet.ReconfigureRS(2, 3, successThreshold, 4),
),

View File

@ -113,13 +113,15 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
}
{ // setup gracefulexit
peer.GracefulExit.Observer = gracefulexit.NewObserver(
peer.Log.Named("gracefulexit:observer"),
peer.DB.GracefulExit(),
peer.DB.OverlayCache(),
metabaseDB,
config.GracefulExit,
)
if config.GracefulExit.Enabled && !config.GracefulExit.TimeBased {
peer.GracefulExit.Observer = gracefulexit.NewObserver(
peer.Log.Named("gracefulexit:observer"),
peer.DB.GracefulExit(),
peer.DB.OverlayCache(),
metabaseDB,
config.GracefulExit,
)
}
}
{ // setup node tally observer
@ -188,7 +190,7 @@ func NewRangedLoop(log *zap.Logger, db DB, metabaseDB *metabase.DB, config *Conf
observers = append(observers, peer.Accounting.NodeTallyObserver)
}
if config.GracefulExit.Enabled && config.GracefulExit.UseRangedLoop {
if peer.GracefulExit.Observer != nil && config.GracefulExit.UseRangedLoop {
observers = append(observers, peer.GracefulExit.Observer)
}

View File

@ -370,6 +370,8 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(piecesCheck.Clumped))) //mon:locked
stats.segmentStats.segmentClumpedCount.Observe(int64(len(piecesCheck.Clumped)))
mon.IntVal("checker_segment_exiting_count").Observe(int64(len(piecesCheck.Exiting)))
stats.segmentStats.segmentExitingCount.Observe(int64(len(piecesCheck.Exiting)))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(len(piecesCheck.OutOfPlacement))) //mon:locked
stats.segmentStats.segmentOffPlacementCount.Observe(int64(len(piecesCheck.OutOfPlacement)))

View File

@ -103,6 +103,7 @@ type segmentRSStats struct {
segmentTotalCount *monkit.IntVal
segmentHealthyCount *monkit.IntVal
segmentClumpedCount *monkit.IntVal
segmentExitingCount *monkit.IntVal
segmentOffPlacementCount *monkit.IntVal
segmentAge *monkit.IntVal
segmentHealth *monkit.FloatVal
@ -116,6 +117,7 @@ func newSegmentRSStats(rs string) *segmentRSStats {
segmentTotalCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_total_count").WithTag("rs_scheme", rs)),
segmentHealthyCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_healthy_count").WithTag("rs_scheme", rs)),
segmentClumpedCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_clumped_count").WithTag("rs_scheme", rs)),
segmentExitingCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_exiting_count").WithTag("rs_scheme", rs)),
segmentOffPlacementCount: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_off_placement_count").WithTag("rs_scheme", rs)),
segmentAge: monkit.NewIntVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_age").WithTag("rs_scheme", rs)),
segmentHealth: monkit.NewFloatVal(monkit.NewSeriesKey("tagged_repair_stats").WithTag("name", "checker_segment_health").WithTag("rs_scheme", rs)),

View File

@ -32,6 +32,9 @@ type PiecesCheckResult struct {
// Clumped is a set of Piece Numbers which are to be considered unhealthy because of IP
// clumping. (If DoDeclumping is disabled, this set will be empty.)
Clumped map[uint16]struct{}
// Exiting is a set of Piece Numbers which are considered unhealthy because the node on
// which they reside has initiated graceful exit.
Exiting map[uint16]struct{}
// OutOfPlacement is a set of Piece Numbers which are unhealthy because of placement rules.
// (If DoPlacementCheck is disabled, this set will be empty.)
OutOfPlacement map[uint16]struct{}
@ -43,14 +46,14 @@ type PiecesCheckResult struct {
// includes, currently, only pieces in OutOfPlacement).
ForcingRepair map[uint16]struct{}
// Unhealthy contains all Piece Numbers which are in Missing OR Suspended OR Clumped OR
// OutOfPlacement OR InExcludedCountry.
// Exiting OR OutOfPlacement OR InExcludedCountry.
Unhealthy map[uint16]struct{}
// UnhealthyRetrievable is the set of pieces that are "unhealthy-but-retrievable". That is,
// pieces that are in Unhealthy AND Retrievable.
UnhealthyRetrievable map[uint16]struct{}
// Healthy contains all Piece Numbers from the segment which are not in Unhealthy.
// (Equivalently: all Piece Numbers from the segment which are NOT in Missing OR
// Suspended OR Clumped OR OutOfPlacement OR InExcludedCountry).
// Suspended OR Clumped OR Exiting OR OutOfPlacement OR InExcludedCountry).
Healthy map[uint16]struct{}
}
@ -66,6 +69,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
// check excluded countries and remove online nodes from missing pieces
result.Missing = make(map[uint16]struct{})
result.Suspended = make(map[uint16]struct{})
result.Exiting = make(map[uint16]struct{})
result.Retrievable = make(map[uint16]struct{})
result.InExcludedCountry = make(map[uint16]struct{})
for index, nodeRecord := range nodes {
@ -87,6 +91,9 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
if nodeRecord.Suspended {
result.Suspended[pieceNum] = struct{}{}
}
if nodeRecord.Exiting {
result.Exiting[pieceNum] = struct{}{}
}
if _, excluded := excludedCountryCodes[nodeRecord.CountryCode]; excluded {
result.InExcludedCountry[pieceNum] = struct{}{}
@ -150,6 +157,7 @@ func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.Selecte
maps.Copy(result.Unhealthy, result.Missing)
maps.Copy(result.Unhealthy, result.Suspended)
maps.Copy(result.Unhealthy, result.Clumped)
maps.Copy(result.Unhealthy, result.Exiting)
maps.Copy(result.Unhealthy, result.OutOfPlacement)
maps.Copy(result.Unhealthy, result.InExcludedCountry)

View File

@ -296,7 +296,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
mon.Meter("repair_unnecessary").Mark(1) //mon:locked
stats.repairUnnecessary.Mark(1)
repairer.log.Debug("segment above repair threshold", zap.Int("numHealthy", len(piecesCheck.Healthy)), zap.Int32("repairThreshold", repairThreshold),
zap.Int("numClumped", len(piecesCheck.Clumped)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacement)),
zap.Int("numClumped", len(piecesCheck.Clumped)), zap.Int("numExiting", len(piecesCheck.Exiting)), zap.Int("numOffPieces", len(piecesCheck.OutOfPlacement)),
zap.Int("numExcluded", len(piecesCheck.InExcludedCountry)), zap.Int("droppedPieces", len(dropPieces)))
return true, nil
}
@ -658,6 +658,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
zap.Stringer("Stream ID", segment.StreamID),
zap.Uint64("Position", segment.Position.Encode()),
zap.Int("clumped pieces", len(piecesCheck.Clumped)),
zap.Int("exiting-node pieces", len(piecesCheck.Exiting)),
zap.Int("out of placement pieces", len(piecesCheck.OutOfPlacement)),
zap.Int("in excluded countries", len(piecesCheck.InExcludedCountry)),
zap.Int("missing pieces", len(piecesCheck.Missing)),

View File

@ -562,6 +562,9 @@ contact.external-address: ""
# size of the buffer used to batch transfer queue reads and sends to the storage node.
# graceful-exit.endpoint-batch-size: 300
# number of days it takes to execute a passive graceful exit
# graceful-exit.graceful-exit-duration-in-days: 30
# maximum number of transfer failures per piece.
# graceful-exit.max-failures-per-piece: 5
@ -571,15 +574,24 @@ contact.external-address: ""
# maximum number of order limits a satellite sends to a node before marking piece transfer failed
# graceful-exit.max-order-limit-send-count: 10
# a gracefully exiting node will fail GE if it falls below this online score (compare AuditHistoryConfig.OfflineThreshold)
# graceful-exit.minimum-online-score: 0.8
# minimum age for a node on the network in order to initiate graceful exit
# graceful-exit.node-min-age-in-months: 6
# how frequently to check uptime ratio of gracefully-exiting nodes
# graceful-exit.offline-check-interval: 30m0s
# maximum percentage of transfer failures per node.
# graceful-exit.overall-max-failures-percentage: 10
# the minimum duration for receiving a stream from a storage node before timing out
# graceful-exit.recv-timeout: 2h0m0s
# whether graceful exit will be determined by a period of time, rather than by instructing nodes to transfer one piece at a time
# graceful-exit.time-based: false
# batch size (crdb specific) for deleting and adding items to the transfer queue
# graceful-exit.transfer-queue-batch-size: 1000

View File

@ -5,29 +5,37 @@ package gracefulexit_test
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode/blobstore"
)
func TestChore(t *testing.T) {
func TestChoreOld(t *testing.T) {
const successThreshold = 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: successThreshold + 2,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.TimeBased = false
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite1 := planet.Satellites[0]
@ -62,6 +70,34 @@ func TestChore(t *testing.T) {
})
}
func TestChore(t *testing.T) {
const successThreshold = 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: successThreshold + 2,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
config.GracefulExit.TimeBased = true
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite1 := planet.Satellites[0]
uplinkPeer := planet.Uplinks[0]
err := uplinkPeer.Upload(ctx, satellite1, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
exitingNode, err := findNodeToExit(ctx, planet)
require.NoError(t, err)
exitSatellite(ctx, t, planet, exitingNode)
})
}
func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, exitingNode *testplanet.StorageNode) {
satellite1 := planet.Satellites[0]
exitingNode.GracefulExit.Chore.Loop.Pause()
@ -74,6 +110,13 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
}
var timeMutex sync.Mutex
var timeForward time.Duration
satellite1.GracefulExit.Endpoint.SetNowFunc(func() time.Time {
timeMutex.Lock()
defer timeMutex.Unlock()
return time.Now().Add(timeForward)
})
_, err = satellite1.Overlay.DB.UpdateExitStatus(ctx, &exitStatus)
require.NoError(t, err)
@ -88,10 +131,17 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
// initiate graceful exit on satellite side by running the SN chore.
exitingNode.GracefulExit.Chore.Loop.TriggerWait()
// jump ahead in time (the +2 is to account for things like daylight savings shifts that may
// be happening in the next while, since we're not using AddDate here).
timeMutex.Lock()
timeForward += time.Duration(satellite1.Config.GracefulExit.GracefulExitDurationInDays*24+2) * time.Hour
timeMutex.Unlock()
// run the satellite ranged loop to build the transfer queue.
_, err = satellite1.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
if !satellite1.Config.GracefulExit.TimeBased {
// run the satellite ranged loop to build the transfer queue.
_, err = satellite1.RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
}
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite1.DB.OverlayCache().GetExitingNodes(ctx)
@ -99,9 +149,11 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
if !satellite1.Config.GracefulExit.TimeBased {
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
}
// run the SN chore again to start processing transfers.
exitingNode.GracefulExit.Chore.Loop.TriggerWait()
@ -109,10 +161,12 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
err = exitingNode.GracefulExit.Chore.TestWaitForNoWorkers(ctx)
require.NoError(t, err)
// check that there are no more items to process
queueItems, err = satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 0)
if !satellite1.Config.GracefulExit.TimeBased {
// check that there are no more items to process
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 0)
}
exitProgress, err = exitingNode.DB.Satellites().ListGracefulExits(ctx)
require.NoError(t, err)
@ -120,18 +174,24 @@ func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet,
if progress.SatelliteID == satellite1.ID() {
require.NotNil(t, progress.CompletionReceipt)
require.NotNil(t, progress.FinishedAt)
require.EqualValues(t, progress.StartingDiskUsage, progress.BytesDeleted)
if satellite1.Config.GracefulExit.TimeBased {
require.EqualValues(t, 0, progress.BytesDeleted)
} else {
require.EqualValues(t, progress.StartingDiskUsage, progress.BytesDeleted)
}
}
}
// make sure there are no more pieces on the node.
namespaces, err := exitingNode.DB.Pieces().ListNamespaces(ctx)
require.NoError(t, err)
for _, ns := range namespaces {
err = exitingNode.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo blobstore.BlobInfo) error {
return errs.New("found a piece on the node. this shouldn't happen.")
})
if !satellite1.Config.GracefulExit.TimeBased {
// make sure there are no more pieces on the node.
namespaces, err := exitingNode.DB.Pieces().ListNamespaces(ctx)
require.NoError(t, err)
for _, ns := range namespaces {
err = exitingNode.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo blobstore.BlobInfo) error {
return errs.New("found a piece on the node. this shouldn't happen.")
})
require.NoError(t, err)
}
}
}

View File

@ -31,7 +31,13 @@ func TestWorkerSuccess(t *testing.T) {
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
// this test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
),
StorageNode: func(index int, config *storagenode.Config) {
config.GracefulExit.NumWorkers = 2
config.GracefulExit.NumConcurrentTransfers = 2
@ -98,7 +104,13 @@ func TestWorkerTimeout(t *testing.T) {
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
func(log *zap.Logger, index int, config *satellite.Config) {
// this test can be removed entirely when we are using time-based GE everywhere.
config.GracefulExit.TimeBased = false
},
),
StorageNode: func(index int, config *storagenode.Config) {
config.GracefulExit.NumWorkers = 2
config.GracefulExit.NumConcurrentTransfers = 2
@ -163,6 +175,11 @@ func TestWorkerTimeout(t *testing.T) {
}
func TestWorkerFailure_IneligibleNodeAge(t *testing.T) {
t.Run("TimeBased=true", func(t *testing.T) { testWorkerFailure_IneligibleNodeAge(t, true) })
t.Run("TimeBased=false", func(t *testing.T) { testWorkerFailure_IneligibleNodeAge(t, false) })
}
func testWorkerFailure_IneligibleNodeAge(t *testing.T, timeBased bool) {
const successThreshold = 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
@ -173,6 +190,7 @@ func TestWorkerFailure_IneligibleNodeAge(t *testing.T) {
func(log *zap.Logger, index int, config *satellite.Config) {
// Set the required node age to 1 month.
config.GracefulExit.NodeMinAgeInMonths = 1
config.GracefulExit.TimeBased = timeBased
},
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
),