satellite/gracefulexit, storagenode/gracefulexit: add timeouts (#3407)

This commit is contained in:
Natalie Villasana 2019-10-30 13:40:57 -04:00 committed by GitHub
parent 5453886231
commit 4878135068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 321 additions and 26 deletions

View File

@ -371,6 +371,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
MaxFailuresPerPiece: 5,
MaxInactiveTimeFrame: time.Second * 10,
OverallMaxFailuresPercentage: 10,
RecvTimeout: time.Minute * 1,
},
Metrics: metrics.Config{
ChoreInterval: defaultInterval,

View File

@ -124,8 +124,10 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
Interval: defaultInterval,
},
GracefulExit: gracefulexit.Config{
ChoreInterval: time.Second * 1,
NumWorkers: 3,
ChoreInterval: time.Second * 1,
NumWorkers: 3,
MinBytesPerSecond: 128 * memory.B,
MinDownloadTimeout: 2 * time.Minute,
},
}
if planet.config.Reconfigure.StorageNode != nil {

View File

@ -27,8 +27,8 @@ type Config struct {
EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"100"`
MaxFailuresPerPiece int `help:"maximum number of transfer failures per piece." default:"3"`
// TODO: what's the default number?
MaxFailuresPerPiece int `help:"maximum number of transfer failures per piece." default:"3"`
OverallMaxFailuresPercentage int `help:"maximum percentage of transfer failures per node." default:"10"`
MaxInactiveTimeFrame time.Duration `help:"maximum inactive time frame of transfer activities per node." default:"500h"`
RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"10m"`
}

View File

@ -57,6 +57,7 @@ type Endpoint struct {
connections *connectionsTracker
peerIdentities overlay.PeerIdentities
config Config
recvTimeout time.Duration
}
type pendingTransfer struct {
@ -168,6 +169,7 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overla
connections: newConnectionsTracker(),
peerIdentities: peerIdentities,
config: config,
recvTimeout: config.RecvTimeout,
}
}
@ -417,9 +419,24 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}
processMu.Unlock()
request, err := stream.Recv()
if err != nil {
return eofHandler(err)
done := make(chan struct{})
var request *pb.StorageNodeMessage
var recvErr error
go func() {
request, recvErr = stream.Recv()
close(done)
}()
timer := time.NewTimer(endpoint.recvTimeout)
select {
case <-ctx.Done():
return rpcstatus.Error(rpcstatus.Internal, Error.New("context canceled while waiting to receive message from storagenode").Error())
case <-timer.C:
return rpcstatus.Error(rpcstatus.DeadlineExceeded, Error.New("timeout while waiting to receive message from storagenode").Error())
case <-done:
}
if recvErr != nil {
return eofHandler(recvErr)
}
switch m := request.GetMessage().(type) {

View File

@ -13,10 +13,13 @@ import (
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testblobs"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/testrand"
@ -25,7 +28,11 @@ import (
"storj.io/storj/pkg/rpc/rpcstatus"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/gracefulexit"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/uplink"
)
@ -232,6 +239,80 @@ func TestConcurrentConnections(t *testing.T) {
})
}
func TestRecvTimeout(t *testing.T) {
var geConfig gracefulexit.Config
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 9,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
Satellite: func(logger *zap.Logger, index int, config *satellite.Config) {
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
config.GracefulExit.RecvTimeout = 10 * time.Millisecond
},
StorageNode: func(index int, config *storagenode.Config) {
geConfig = config.GracefulExit
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
rs := &uplink.RSConfig{
MinThreshold: 4,
RepairThreshold: 6,
SuccessThreshold: 8,
MaxThreshold: 8,
}
err := ul.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
exitingNode, err := findNodeToExit(ctx, planet, 1)
require.NoError(t, err)
exitingNode.GracefulExit.Chore.Loop.Pause()
exitStatusReq := overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
}
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq)
require.NoError(t, err)
// run the satellite chore to build the transfer queue.
satellite.GracefulExit.Chore.Loop.TriggerWait()
satellite.GracefulExit.Chore.Loop.Pause()
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
storageNodeDB := exitingNode.DB.(*testblobs.SlowDB)
// make uploads on storage node slower than the timeout for transferring bytes to another node
delay := 200 * time.Millisecond
storageNodeDB.SetLatency(delay)
store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB())
// run the SN chore again to start processing transfers.
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), geConfig)
err = worker.Run(ctx, func() {})
require.Error(t, err)
require.True(t, errs2.IsRPC(err, rpcstatus.DeadlineExceeded))
})
}
func TestInvalidStorageNodeSignature(t *testing.T) {
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()

View File

@ -127,6 +127,9 @@ contact.external-address: ""
# maximum percentage of transfer failures per node.
# graceful-exit.overall-max-failures-percentage: 10
# the minimum duration for receiving a stream from a storage node before timing out
# graceful-exit.recv-timeout: 10m0s
# path to the certificate chain for this identity
identity.cert-path: /root/.local/share/storj/identity/satellite/identity.cert

View File

@ -11,6 +11,7 @@ import (
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/rpc"
"storj.io/storj/storagenode/pieces"
@ -39,8 +40,10 @@ type Chore struct {
// Config for the chore
type Config struct {
ChoreInterval time.Duration `help:"how often to run the chore to check for satellites for the node to exit." releaseDefault:"15m" devDefault:"10s"`
NumWorkers int `help:"number of workers to handle satellite exits" default:"3"`
ChoreInterval time.Duration `help:"how often to run the chore to check for satellites for the node to exit." releaseDefault:"15m" devDefault:"10s"`
NumWorkers int `help:"number of workers to handle satellite exits" default:"3"`
MinBytesPerSecond memory.Size `help:"the minimum acceptable bytes that an exiting node can transfer per second to the new node" default:"128B"`
MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a piece from storage nodes before timing out" default:"2m"`
}
// NewChore instantiates Chore.
@ -87,7 +90,7 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
continue
}
worker := NewWorker(chore.log, chore.store, chore.satelliteDB, chore.dialer, satelliteID, addr)
worker := NewWorker(chore.log, chore.store, chore.satelliteDB, chore.dialer, satelliteID, addr, chore.config)
if _, ok := chore.exitingMap.LoadOrStore(satelliteID, worker); ok {
// already running a worker for this satellite
chore.log.Debug("skipping for satellite, worker already exists.", zap.Stringer("satellite ID", satelliteID))

View File

@ -13,6 +13,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/memory"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/signing"
@ -25,25 +26,29 @@ import (
// Worker is responsible for completing the graceful exit for a given satellite.
type Worker struct {
log *zap.Logger
store *pieces.Store
satelliteDB satellites.DB
dialer rpc.Dialer
satelliteID storj.NodeID
satelliteAddr string
ecclient ecclient.Client
log *zap.Logger
store *pieces.Store
satelliteDB satellites.DB
dialer rpc.Dialer
satelliteID storj.NodeID
satelliteAddr string
ecclient ecclient.Client
minBytesPerSecond memory.Size
minDownloadTimeout time.Duration
}
// NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, store *pieces.Store, satelliteDB satellites.DB, dialer rpc.Dialer, satelliteID storj.NodeID, satelliteAddr string) *Worker {
func NewWorker(log *zap.Logger, store *pieces.Store, satelliteDB satellites.DB, dialer rpc.Dialer, satelliteID storj.NodeID, satelliteAddr string, choreConfig Config) *Worker {
return &Worker{
log: log,
store: store,
satelliteDB: satelliteDB,
dialer: dialer,
satelliteID: satelliteID,
satelliteAddr: satelliteAddr,
ecclient: ecclient.NewClient(log, dialer, 0),
log: log,
store: store,
satelliteDB: satelliteDB,
dialer: dialer,
satelliteID: satelliteID,
satelliteAddr: satelliteAddr,
ecclient: ecclient.NewClient(log, dialer, 0),
minBytesPerSecond: choreConfig.MinBytesPerSecond,
minDownloadTimeout: choreConfig.MinDownloadTimeout,
}
}
@ -157,7 +162,15 @@ func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.Trans
return err
}
putCtx, cancel := context.WithCancel(ctx)
if worker.minBytesPerSecond == 0 {
// set minBytesPerSecond to default 128B if set to 0
worker.minBytesPerSecond = 128 * memory.B
}
maxTransferTime := time.Duration(int64(time.Second) * originalHash.PieceSize / worker.minBytesPerSecond.Int64())
if maxTransferTime < worker.minDownloadTimeout {
maxTransferTime = worker.minDownloadTimeout
}
putCtx, cancel := context.WithTimeout(ctx, maxTransferTime)
defer cancel()
pieceHash, peerID, err := worker.ecclient.PutPiece(putCtx, ctx, addrLimit, pk, reader)

View File

@ -0,0 +1,175 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/testblobs"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/internal/testrand"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/gracefulexit"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/uplink"
)
func TestWorkerSuccess(t *testing.T) {
var geConfig gracefulexit.Config
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 9,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
geConfig = config.GracefulExit
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
rs := &uplink.RSConfig{
MinThreshold: 4,
RepairThreshold: 6,
SuccessThreshold: 8,
MaxThreshold: 8,
}
err := ul.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
exitingNode, err := findNodeToExit(ctx, planet, 1)
require.NoError(t, err)
exitingNode.GracefulExit.Chore.Loop.Pause()
exitStatusReq := overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
}
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq)
require.NoError(t, err)
// run the satellite chore to build the transfer queue.
satellite.GracefulExit.Chore.Loop.TriggerWait()
satellite.GracefulExit.Chore.Loop.Pause()
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
// run the SN chore again to start processing transfers.
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), exitingNode.Storage2.Store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), geConfig)
err = worker.Run(ctx, func() {})
require.NoError(t, err)
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.EqualValues(t, progress.PiecesFailed, 0)
require.EqualValues(t, progress.PiecesTransferred, 1)
exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
require.NotNil(t, exitStatus.ExitFinishedAt)
require.True(t, exitStatus.ExitSuccess)
})
}
func TestWorkerTimeout(t *testing.T) {
var geConfig gracefulexit.Config
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 9,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
NewStorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
StorageNode: func(index int, config *storagenode.Config) {
// This config value will create a very short timeframe allowed for receiving
// data from storage nodes. This will cause context to cancel with timeout.
config.GracefulExit.MinDownloadTimeout = 2 * time.Millisecond
config.GracefulExit.MinBytesPerSecond = 10 * memory.MiB
geConfig = config.GracefulExit
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
ul := planet.Uplinks[0]
satellite.GracefulExit.Chore.Loop.Pause()
rs := &uplink.RSConfig{
MinThreshold: 4,
RepairThreshold: 6,
SuccessThreshold: 8,
MaxThreshold: 8,
}
err := ul.UploadWithConfig(ctx, satellite, rs, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
exitingNode, err := findNodeToExit(ctx, planet, 1)
require.NoError(t, err)
exitingNode.GracefulExit.Chore.Loop.Pause()
exitStatusReq := overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
}
_, err = satellite.Overlay.DB.UpdateExitStatus(ctx, &exitStatusReq)
require.NoError(t, err)
// run the satellite chore to build the transfer queue.
satellite.GracefulExit.Chore.Loop.TriggerWait()
satellite.GracefulExit.Chore.Loop.Pause()
// check that the satellite knows the storage node is exiting.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, queueItems, 1)
storageNodeDB := exitingNode.DB.(*testblobs.SlowDB)
// make uploads on storage node slower than the timeout for transferring bytes to another node
delay := 200 * time.Millisecond
storageNodeDB.SetLatency(delay)
store := pieces.NewStore(zaptest.NewLogger(t), storageNodeDB.Pieces(), nil, nil, storageNodeDB.PieceSpaceUsedDB())
// run the SN chore again to start processing transfers.
worker := gracefulexit.NewWorker(zaptest.NewLogger(t), store, exitingNode.DB.Satellites(), exitingNode.Dialer, satellite.ID(), satellite.Addr(), geConfig)
err = worker.Run(ctx, func() {})
require.NoError(t, err)
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.EqualValues(t, progress.PiecesFailed, 1)
require.EqualValues(t, progress.PiecesTransferred, 0)
exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
require.NotNil(t, exitStatus.ExitFinishedAt)
require.False(t, exitStatus.ExitSuccess)
})
}