c33475f63e
If the storagenode chore is left running and it has a chance to check in again after we move time forward (line 139), then the satellite will mark it as having finished GE before we check which nodes are still in GE (line 149). Change-Id: I350e1ef2e943f758d44132aaddd05fe248b30f3e
258 lines
7.7 KiB
Go
258 lines
7.7 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package gracefulexit_test
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/memory"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/overlay"
|
|
"storj.io/storj/storagenode/blobstore"
|
|
)
|
|
|
|
func TestChoreOld(t *testing.T) {
|
|
const successThreshold = 4
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: successThreshold + 2,
|
|
UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.Combine(
|
|
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.GracefulExit.TimeBased = false
|
|
},
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellite1 := planet.Satellites[0]
|
|
uplinkPeer := planet.Uplinks[0]
|
|
|
|
err := uplinkPeer.Upload(ctx, satellite1, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
exitingNode, err := findNodeToExit(ctx, planet)
|
|
require.NoError(t, err)
|
|
|
|
nodePieceCounts, err := getNodePieceCounts(ctx, planet)
|
|
require.NoError(t, err)
|
|
|
|
exitSatellite(ctx, t, planet, exitingNode)
|
|
|
|
newNodePieceCounts, err := getNodePieceCounts(ctx, planet)
|
|
require.NoError(t, err)
|
|
var newExitingNodeID storj.NodeID
|
|
for k, v := range newNodePieceCounts {
|
|
if v > nodePieceCounts[k] {
|
|
newExitingNodeID = k
|
|
}
|
|
}
|
|
require.NotNil(t, newExitingNodeID)
|
|
require.NotEqual(t, exitingNode.ID(), newExitingNodeID)
|
|
|
|
newExitingNode := planet.FindNode(newExitingNodeID)
|
|
require.NotNil(t, newExitingNode)
|
|
|
|
exitSatellite(ctx, t, planet, newExitingNode)
|
|
})
|
|
}
|
|
|
|
func TestChore(t *testing.T) {
|
|
const successThreshold = 4
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
StorageNodeCount: successThreshold + 2,
|
|
UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.Combine(
|
|
testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
|
|
func(log *zap.Logger, index int, config *satellite.Config) {
|
|
config.GracefulExit.TimeBased = true
|
|
},
|
|
),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
satellite1 := planet.Satellites[0]
|
|
uplinkPeer := planet.Uplinks[0]
|
|
|
|
err := uplinkPeer.Upload(ctx, satellite1, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
|
|
require.NoError(t, err)
|
|
|
|
exitingNode, err := findNodeToExit(ctx, planet)
|
|
require.NoError(t, err)
|
|
|
|
exitSatellite(ctx, t, planet, exitingNode)
|
|
})
|
|
}
|
|
|
|
func exitSatellite(ctx context.Context, t *testing.T, planet *testplanet.Planet, exitingNode *testplanet.StorageNode) {
|
|
satellite1 := planet.Satellites[0]
|
|
exitingNode.GracefulExit.Chore.Loop.Pause()
|
|
|
|
_, piecesContentSize, err := exitingNode.Storage2.BlobsCache.SpaceUsedBySatellite(ctx, satellite1.ID())
|
|
require.NoError(t, err)
|
|
require.NotZero(t, piecesContentSize)
|
|
|
|
exitStatus := overlay.ExitStatusRequest{
|
|
NodeID: exitingNode.ID(),
|
|
ExitInitiatedAt: time.Now(),
|
|
}
|
|
var timeMutex sync.Mutex
|
|
var timeForward time.Duration
|
|
satellite1.GracefulExit.Endpoint.SetNowFunc(func() time.Time {
|
|
timeMutex.Lock()
|
|
defer timeMutex.Unlock()
|
|
return time.Now().Add(timeForward)
|
|
})
|
|
|
|
_, err = satellite1.Overlay.DB.UpdateExitStatus(ctx, &exitStatus)
|
|
require.NoError(t, err)
|
|
|
|
err = exitingNode.DB.Satellites().InitiateGracefulExit(ctx, satellite1.ID(), time.Now(), piecesContentSize)
|
|
require.NoError(t, err)
|
|
|
|
// check that the storage node is exiting
|
|
exitProgress, err := exitingNode.DB.Satellites().ListGracefulExits(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, exitProgress, 1)
|
|
|
|
// initiate graceful exit on satellite side by running the SN chore.
|
|
exitingNode.GracefulExit.Chore.Loop.TriggerWait()
|
|
exitingNode.GracefulExit.Chore.Loop.Pause()
|
|
|
|
// jump ahead in time (the +2 is to account for things like daylight savings shifts that may
|
|
// be happening in the next while, since we're not using AddDate here).
|
|
timeMutex.Lock()
|
|
timeForward += time.Duration(satellite1.Config.GracefulExit.GracefulExitDurationInDays*24+2) * time.Hour
|
|
timeMutex.Unlock()
|
|
|
|
if !satellite1.Config.GracefulExit.TimeBased {
|
|
// run the satellite ranged loop to build the transfer queue.
|
|
_, err = satellite1.RangedLoop.RangedLoop.Service.RunOnce(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// check that the satellite knows the storage node is exiting.
|
|
exitingNodes, err := satellite1.DB.OverlayCache().GetExitingNodes(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, exitingNodes, 1)
|
|
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
|
|
|
|
if !satellite1.Config.GracefulExit.TimeBased {
|
|
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
|
|
require.NoError(t, err)
|
|
require.Len(t, queueItems, 1)
|
|
}
|
|
|
|
// run the SN chore again to start processing transfers.
|
|
exitingNode.GracefulExit.Chore.Loop.TriggerWait()
|
|
// wait for workers to finish
|
|
err = exitingNode.GracefulExit.Chore.TestWaitForNoWorkers(ctx)
|
|
require.NoError(t, err)
|
|
|
|
if !satellite1.Config.GracefulExit.TimeBased {
|
|
// check that there are no more items to process
|
|
queueItems, err := satellite1.DB.GracefulExit().GetIncomplete(ctx, exitStatus.NodeID, 10, 0)
|
|
require.NoError(t, err)
|
|
require.Len(t, queueItems, 0)
|
|
}
|
|
|
|
exitProgress, err = exitingNode.DB.Satellites().ListGracefulExits(ctx)
|
|
require.NoError(t, err)
|
|
for _, progress := range exitProgress {
|
|
if progress.SatelliteID == satellite1.ID() {
|
|
require.NotNil(t, progress.CompletionReceipt)
|
|
require.NotNil(t, progress.FinishedAt)
|
|
if satellite1.Config.GracefulExit.TimeBased {
|
|
require.EqualValues(t, 0, progress.BytesDeleted)
|
|
} else {
|
|
require.EqualValues(t, progress.StartingDiskUsage, progress.BytesDeleted)
|
|
}
|
|
}
|
|
}
|
|
|
|
if !satellite1.Config.GracefulExit.TimeBased {
|
|
// make sure there are no more pieces on the node.
|
|
namespaces, err := exitingNode.DB.Pieces().ListNamespaces(ctx)
|
|
require.NoError(t, err)
|
|
for _, ns := range namespaces {
|
|
err = exitingNode.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo blobstore.BlobInfo) error {
|
|
return errs.New("found a piece on the node. this shouldn't happen.")
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// getNodePieceCounts tallies all the pieces per node.
|
|
func getNodePieceCounts(ctx context.Context, planet *testplanet.Planet) (_ map[storj.NodeID]int, err error) {
|
|
nodePieceCounts := make(map[storj.NodeID]int)
|
|
for _, n := range planet.StorageNodes {
|
|
node := n
|
|
namespaces, err := node.DB.Pieces().ListNamespaces(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, ns := range namespaces {
|
|
err = node.DB.Pieces().WalkNamespace(ctx, ns, func(blobInfo blobstore.BlobInfo) error {
|
|
nodePieceCounts[node.ID()]++
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
return nodePieceCounts, err
|
|
}
|
|
|
|
// findNodeToExit selects the node storing the most pieces as the node to graceful exit.
|
|
func findNodeToExit(ctx context.Context, planet *testplanet.Planet) (*testplanet.StorageNode, error) {
|
|
satellite := planet.Satellites[0]
|
|
|
|
objects, err := satellite.Metabase.DB.TestingAllSegments(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pieceCountMap := make(map[storj.NodeID]int, len(planet.StorageNodes))
|
|
for _, sn := range planet.StorageNodes {
|
|
pieceCountMap[sn.ID()] = 0
|
|
}
|
|
|
|
for _, object := range objects {
|
|
for _, piece := range object.Pieces {
|
|
pieceCountMap[piece.StorageNode]++
|
|
}
|
|
}
|
|
|
|
var exitingNodeID storj.NodeID
|
|
maxCount := 0
|
|
for k, v := range pieceCountMap {
|
|
if exitingNodeID.IsZero() {
|
|
exitingNodeID = k
|
|
maxCount = v
|
|
continue
|
|
}
|
|
if v > maxCount {
|
|
exitingNodeID = k
|
|
maxCount = v
|
|
}
|
|
}
|
|
|
|
return planet.FindNode(exitingNodeID), nil
|
|
}
|