satellite/gracefulexit: handle piece not found messages from storagenode (#3456)
* If a node claims to fail a transfer due to piece not found, remove that node from the pointer, delete the transfer queue item. * If the pointer is piece hash verified, penalize the node. Otherwise, do not penalize the node.
This commit is contained in:
parent
19a59c9d59
commit
78fedf5db3
@ -755,7 +755,63 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap,
|
||||
|
||||
errorCode := int(pb.TransferFailed_Error_value[message.Failed.Error.String()])
|
||||
|
||||
// TODO if error code is NOT_FOUND, the node no longer has the piece. remove the queue item and the pointer
|
||||
// If the error code is NOT_FOUND, the node no longer has the piece.
|
||||
// Remove the queue item and remove the node from the pointer.
|
||||
// If the pointer is not piece hash verified, do not count this as a failure.
|
||||
if pb.TransferFailed_Error(errorCode) == pb.TransferFailed_NOT_FOUND {
|
||||
endpoint.log.Debug("piece not found on node", zap.Stringer("node ID", nodeID), zap.ByteString("path", transfer.path), zap.Int32("piece num", transfer.pieceNum))
|
||||
pointer, err := endpoint.metainfo.Get(ctx, string(transfer.path))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
remote := pointer.GetRemote()
|
||||
if remote == nil {
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
pending.delete(pieceID)
|
||||
return nil
|
||||
}
|
||||
pieces := remote.GetRemotePieces()
|
||||
|
||||
var nodePiece *pb.RemotePiece
|
||||
for _, piece := range pieces {
|
||||
if piece.NodeId == nodeID && piece.PieceNum == transfer.pieceNum {
|
||||
nodePiece = piece
|
||||
}
|
||||
}
|
||||
if nodePiece == nil {
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
pending.delete(pieceID)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = endpoint.metainfo.UpdatePieces(ctx, string(transfer.path), pointer, nil, []*pb.RemotePiece{nodePiece})
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// If the pointer was piece hash verified, we know this node definitely should have the piece
|
||||
// Otherwise, no penalty.
|
||||
if pointer.PieceHashesVerified {
|
||||
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, transfer.path, transfer.pieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
pending.delete(pieceID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
transferQueueItem.LastFailedAt = &now
|
||||
transferQueueItem.FailedCount = &failedCount
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
@ -30,6 +31,7 @@ import (
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/gracefulexit"
|
||||
"storj.io/storj/storagenode/pieces"
|
||||
@ -832,6 +834,163 @@ func TestExitDisabled(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestFailureNotFoundPieceHashVerified(t *testing.T) {
|
||||
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_TransferPiece:
|
||||
require.NotNil(t, m)
|
||||
|
||||
message := &pb.StorageNodeMessage{
|
||||
Message: &pb.StorageNodeMessage_Failed{
|
||||
Failed: &pb.TransferFailed{
|
||||
OriginalPieceId: m.TransferPiece.OriginalPieceId,
|
||||
Error: pb.TransferFailed_NOT_FOUND,
|
||||
},
|
||||
},
|
||||
}
|
||||
err = processClient.Send(message)
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
response, err = processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_ExitFailed:
|
||||
require.NotNil(t, m)
|
||||
require.NotNil(t, m.ExitFailed)
|
||||
require.Equal(t, m.ExitFailed.Reason, pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED)
|
||||
default:
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
// check that node is no longer in the pointer
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var pointer *pb.Pointer
|
||||
for _, key := range keys {
|
||||
p, err := satellite.Metainfo.Service.Get(ctx, string(key))
|
||||
require.NoError(t, err)
|
||||
|
||||
if p.GetRemote() != nil {
|
||||
pointer = p
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNil(t, pointer)
|
||||
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
||||
require.NotEqual(t, piece.NodeId, exitingNode.ID())
|
||||
}
|
||||
|
||||
// check that the exit has completed and we have the correct transferred/failed values
|
||||
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, int64(0), progress.PiecesTransferred)
|
||||
require.Equal(t, int64(1), progress.PiecesFailed)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestFailureNotFoundPieceHashUnverified(t *testing.T) {
|
||||
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
// retrieve remote segment
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var oldPointer *pb.Pointer
|
||||
var path []byte
|
||||
for _, key := range keys {
|
||||
p, err := satellite.Metainfo.Service.Get(ctx, string(key))
|
||||
require.NoError(t, err)
|
||||
|
||||
if p.GetRemote() != nil {
|
||||
oldPointer = p
|
||||
path = key
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// replace pointer with non-piece-hash-verified pointer
|
||||
require.NotNil(t, oldPointer)
|
||||
oldPointerBytes, err := proto.Marshal(oldPointer)
|
||||
require.NoError(t, err)
|
||||
newPointer := &pb.Pointer{}
|
||||
err = proto.Unmarshal(oldPointerBytes, newPointer)
|
||||
require.NoError(t, err)
|
||||
newPointer.PieceHashesVerified = false
|
||||
newPointerBytes, err := proto.Marshal(newPointer)
|
||||
require.NoError(t, err)
|
||||
err = satellite.Metainfo.Service.DB.CompareAndSwap(ctx, storage.Key(path), oldPointerBytes, newPointerBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// begin processing graceful exit messages
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_TransferPiece:
|
||||
require.NotNil(t, m)
|
||||
|
||||
message := &pb.StorageNodeMessage{
|
||||
Message: &pb.StorageNodeMessage_Failed{
|
||||
Failed: &pb.TransferFailed{
|
||||
OriginalPieceId: m.TransferPiece.OriginalPieceId,
|
||||
Error: pb.TransferFailed_NOT_FOUND,
|
||||
},
|
||||
},
|
||||
}
|
||||
err = processClient.Send(message)
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
response, err = processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_ExitCompleted:
|
||||
require.NotNil(t, m)
|
||||
default:
|
||||
require.FailNow(t, "should not reach this case: %#v", m)
|
||||
}
|
||||
|
||||
// check that node is no longer in the pointer
|
||||
keys, err = satellite.Metainfo.Database.List(ctx, nil, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var pointer *pb.Pointer
|
||||
for _, key := range keys {
|
||||
p, err := satellite.Metainfo.Service.Get(ctx, string(key))
|
||||
require.NoError(t, err)
|
||||
|
||||
if p.GetRemote() != nil {
|
||||
pointer = p
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNil(t, pointer)
|
||||
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
||||
require.NotEqual(t, piece.NodeId, exitingNode.ID())
|
||||
}
|
||||
|
||||
// check that the exit has completed and we have the correct transferred/failed values
|
||||
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, int64(0), progress.PiecesTransferred)
|
||||
require.Equal(t, int64(0), progress.PiecesFailed)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func testTransfers(t *testing.T, objects int, verifier func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
|
||||
successThreshold := 4
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
|
Loading…
Reference in New Issue
Block a user