satellite/gracefulexit: not allow disqualified node to graceful exit (#3493)

This commit is contained in:
Yingrong Zhao 2019-11-07 12:19:34 -05:00 committed by GitHub
parent f3dccb56b1
commit 6331f839ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 211 additions and 25 deletions

View File

@ -204,6 +204,14 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
endpoint.connections.delete(nodeID)
}()
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if isDisqualified {
return rpcstatus.Error(rpcstatus.PermissionDenied, "Disqualified nodes cannot graceful exit")
}
eofHandler := func(err error) error {
if err == io.EOF {
endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("Node ID", nodeID))
@ -345,6 +353,14 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
if !morePiecesFlag && pendingCount == 0 {
processMu.Unlock()
isDisqualified, err := endpoint.handleDisqualifiedNode(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if isDisqualified {
return rpcstatus.Error(rpcstatus.PermissionDenied, "Disqualified nodes cannot graceful exit")
}
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
@ -462,7 +478,13 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
mon.Meter("graceful_exit_fail_validation").Mark(1)
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
@ -928,6 +950,38 @@ func (endpoint *Endpoint) updatePointer(ctx context.Context, originalPointer *pb
return nil
}
func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) {
// check if node is disqualified
nodeInfo, err := endpoint.overlay.Get(ctx, nodeID)
if err != nil {
return false, Error.Wrap(err)
}
if nodeInfo.Disqualified != nil {
// update graceful exit status to be failed
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
ExitSuccess: false,
}
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
return true, Error.Wrap(err)
}
// remove remaining items from the queue
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID)
if err != nil {
return true, Error.Wrap(err)
}
return true, nil
}
return false, nil
}
func (endpoint *Endpoint) getValidPointer(ctx context.Context, path string, pieceNum int32, originalRootPieceID storj.PieceID) (*pb.Pointer, error) {
pointer, err := endpoint.metainfo.Get(ctx, path)
// TODO we don't know the type of error

View File

@ -378,13 +378,131 @@ func TestInvalidStorageNodeSignature(t *testing.T) {
require.FailNow(t, "should not reach this case: %#v", m)
}
// TODO uncomment once progress reflects updated success and fail counts
// check that the exit has completed and we have the correct transferred/failed values
// progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
// require.NoError(t, err)
//
// require.Equal(t, int64(0), progress.PiecesTransferred, tt.name)
// require.Equal(t, int64(1), progress.PiecesFailed, tt.name)
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.Equal(t, int64(0), progress.PiecesTransferred)
require.Equal(t, int64(1), progress.PiecesFailed)
})
}
func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 2,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
exitingNode := planet.StorageNodes[0]
disqualifyNode(t, ctx, satellite, exitingNode.ID())
conn, err := exitingNode.Dialer.DialAddressID(ctx, satellite.Addr(), satellite.Identity.ID)
require.NoError(t, err)
defer ctx.Check(conn.Close)
client := conn.SatelliteGracefulExitClient()
processClient, err := client.Process(ctx)
require.NoError(t, err)
// Process endpoint should return immediately if node is disqualified
response, err := processClient.Recv()
require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied))
require.Nil(t, response)
// disqualified node should fail graceful exit
exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
require.NotNil(t, exitStatus.ExitFinishedAt)
require.False(t, exitStatus.ExitSuccess)
})
}
func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
testTransfers(t, numObjects, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
disqualifyNode(t, ctx, satellite, exitingNode.ID())
deletedCount := 0
for {
response, err := processClient.Recv()
if errs.Is(err, io.EOF) {
// Done
break
}
if deletedCount >= numPieces {
// when a disqualified node has finished transfer all pieces, it should receive an error
require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied))
break
} else {
require.NoError(t, err)
}
switch m := response.GetMessage().(type) {
case *pb.SatelliteMessage_TransferPiece:
require.NotNil(t, m)
pieceReader, err := exitingNode.Storage2.Store.Reader(ctx, satellite.ID(), m.TransferPiece.OriginalPieceId)
require.NoError(t, err)
header, err := pieceReader.GetPieceHeader()
require.NoError(t, err)
orderLimit := header.OrderLimit
originalPieceHash := &pb.PieceHash{
PieceId: orderLimit.PieceId,
Hash: header.GetHash(),
PieceSize: pieceReader.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
}
newPieceHash := &pb.PieceHash{
PieceId: m.TransferPiece.AddressedOrderLimit.Limit.PieceId,
Hash: originalPieceHash.Hash,
PieceSize: originalPieceHash.PieceSize,
Timestamp: time.Now(),
}
receivingNodeID := nodeFullIDs[m.TransferPiece.AddressedOrderLimit.Limit.StorageNodeId]
require.NotNil(t, receivingNodeID)
signer := signing.SignerFromFullIdentity(receivingNodeID)
signedNewPieceHash, err := signing.SignPieceHash(ctx, signer, newPieceHash)
require.NoError(t, err)
success := &pb.StorageNodeMessage{
Message: &pb.StorageNodeMessage_Succeeded{
Succeeded: &pb.TransferSucceeded{
OriginalPieceId: m.TransferPiece.OriginalPieceId,
OriginalPieceHash: originalPieceHash,
OriginalOrderLimit: &orderLimit,
ReplacementPieceHash: signedNewPieceHash,
},
},
}
err = processClient.Send(success)
require.NoError(t, err)
case *pb.SatelliteMessage_DeletePiece:
deletedCount++
default:
t.FailNow()
}
}
// check that the exit has completed and we have the correct transferred/failed values
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.EqualValues(t, numPieces, progress.PiecesTransferred)
require.EqualValues(t, numPieces, deletedCount)
// disqualified node should fail graceful exit
exitStatus, err := satellite.Overlay.DB.GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
require.NotNil(t, exitStatus.ExitFinishedAt)
require.False(t, exitStatus.ExitSuccess)
})
}
@ -453,13 +571,12 @@ func TestFailureHashMismatch(t *testing.T) {
require.FailNow(t, "should not reach this case: %#v", m)
}
// TODO uncomment once progress reflects updated success and fail counts
// check that the exit has completed and we have the correct transferred/failed values
// progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
// require.NoError(t, err)
//
// require.Equal(t, int64(0), progress.PiecesTransferred, tt.name)
// require.Equal(t, int64(1), progress.PiecesFailed, tt.name)
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.Equal(t, int64(0), progress.PiecesTransferred)
require.Equal(t, int64(1), progress.PiecesFailed)
})
}
@ -495,13 +612,12 @@ func TestFailureUnknownError(t *testing.T) {
require.FailNow(t, "should not reach this case: %#v", m)
}
// TODO uncomment once progress reflects updated success and fail counts
// check that the exit has completed and we have the correct transferred/failed values
// progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
// require.NoError(t, err)
//
// require.Equal(t, int64(0), progress.PiecesTransferred, tt.name)
// require.Equal(t, int64(1), progress.PiecesFailed, tt.name)
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.Equal(t, int64(0), progress.PiecesTransferred)
require.Equal(t, int64(0), progress.PiecesFailed)
})
}
@ -572,13 +688,12 @@ func TestFailureUplinkSignature(t *testing.T) {
require.FailNow(t, "should not reach this case: %#v", m)
}
// TODO uncomment once progress reflects updated success and fail counts
// check that the exit has completed and we have the correct transferred/failed values
// progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
// require.NoError(t, err)
//
// require.Equal(t, int64(0), progress.PiecesTransferred, tt.name)
// require.Equal(t, int64(1), progress.PiecesFailed, tt.name)
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
require.Equal(t, int64(0), progress.PiecesTransferred)
require.Equal(t, int64(1), progress.PiecesFailed)
})
}
@ -1218,3 +1333,19 @@ func findNodeToExit(ctx context.Context, planet *testplanet.Planet, objects int)
return nil, nil
}
func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) {
nodeStat, err := satellite.DB.OverlayCache().UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: nodeID,
IsUp: true,
AuditSuccess: false,
AuditLambda: 0,
AuditWeight: 1,
AuditDQ: 0.5,
UptimeLambda: 1,
UptimeWeight: 1,
UptimeDQ: 0.5,
})
require.NoError(t, err)
require.NotNil(t, nodeStat.Disqualified)
}

View File

@ -859,7 +859,7 @@ func (cache *overlaycache) UpdatePieceCounts(ctx context.Context, pieceCounts ma
return Error.Wrap(err)
}
// GetExitingNodes returns nodes who have initiated a graceful exit, but have not completed it.
// GetExitingNodes returns nodes who have initiated a graceful exit and is not disqualified, but have not completed it.
func (cache *overlaycache) GetExitingNodes(ctx context.Context) (exitingNodes []*overlay.ExitStatus, err error) {
defer mon.Task()(&ctx)(&err)
@ -867,6 +867,7 @@ func (cache *overlaycache) GetExitingNodes(ctx context.Context) (exitingNodes []
SELECT id, exit_initiated_at, exit_loop_completed_at, exit_finished_at, exit_success FROM nodes
WHERE exit_initiated_at IS NOT NULL
AND exit_finished_at IS NULL
AND disqualified is NULL
`),
)
if err != nil {