satellite/gracefulexit: check duplicate node id before update pointer (#3380)
* check duplicate node id before update pointer * add test for transfer failure when pointer already contain the receiving node id * check exiting and receiving nod are still in the pointer * check node id only exists once in a pointer * return error if the existing node doesn't match with the piece info in the pointer * try to recreate the issue on jenkins * should not remove exiting node piece in test * Update satellite/gracefulexit/endpoint.go Co-Authored-By: Maximillian von Briesen <mobyvb@gmail.com> * Update satellite/gracefulexit/endpoint.go Co-Authored-By: Maximillian von Briesen <mobyvb@gmail.com>
This commit is contained in:
parent
a4e618fd1f
commit
292e64ee2f
@ -723,14 +723,28 @@ func (endpoint *Endpoint) updatePointer(ctx context.Context, exitingNodeID storj
|
||||
return nil
|
||||
}
|
||||
|
||||
var toRemove []*pb.RemotePiece
|
||||
pieceMap := make(map[storj.NodeID]*pb.RemotePiece)
|
||||
for _, piece := range remote.GetRemotePieces() {
|
||||
if piece.NodeId == exitingNodeID && piece.PieceNum == pieceNum {
|
||||
toRemove = []*pb.RemotePiece{piece}
|
||||
break
|
||||
}
|
||||
pieceMap[piece.NodeId] = piece
|
||||
}
|
||||
|
||||
var toRemove []*pb.RemotePiece
|
||||
existingPiece, ok := pieceMap[exitingNodeID]
|
||||
if !ok {
|
||||
return Error.New("node no longer has the piece. Node ID: %s", exitingNodeID.String())
|
||||
}
|
||||
if existingPiece != nil && existingPiece.PieceNum != pieceNum {
|
||||
return Error.New("invalid existing piece info. Exiting Node ID: %s, PieceNum: %d", exitingNodeID.String(), pieceNum)
|
||||
}
|
||||
toRemove = []*pb.RemotePiece{existingPiece}
|
||||
delete(pieceMap, exitingNodeID)
|
||||
|
||||
var toAdd []*pb.RemotePiece
|
||||
// check receiving node id is not already in the pointer
|
||||
_, ok = pieceMap[receivingNodeID]
|
||||
if ok {
|
||||
return Error.New("node id already exists in piece. Path: %s, NodeID: %s", path, receivingNodeID.String())
|
||||
}
|
||||
if !receivingNodeID.IsZero() {
|
||||
toAdd = []*pb.RemotePiece{{
|
||||
PieceNum: pieceNum,
|
||||
|
@ -13,12 +13,14 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/internal/errs2"
|
||||
"storj.io/storj/internal/memory"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/internal/testrand"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/rpc/rpcstatus"
|
||||
"storj.io/storj/pkg/signing"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/storagenode"
|
||||
@ -493,6 +495,126 @@ func TestSuccessPointerUpdate(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestUpdatePointerFailure_DuplicatedNodeID(t *testing.T) {
|
||||
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
|
||||
var recNodeID storj.NodeID
|
||||
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_TransferPiece:
|
||||
require.NotNil(t, m)
|
||||
|
||||
pieceReader, err := exitingNode.Storage2.Store.Reader(ctx, satellite.ID(), m.TransferPiece.OriginalPieceId)
|
||||
require.NoError(t, err)
|
||||
|
||||
header, err := pieceReader.GetPieceHeader()
|
||||
require.NoError(t, err)
|
||||
|
||||
orderLimit := header.OrderLimit
|
||||
originalPieceHash := &pb.PieceHash{
|
||||
PieceId: orderLimit.PieceId,
|
||||
Hash: header.GetHash(),
|
||||
PieceSize: pieceReader.Size(),
|
||||
Timestamp: header.GetCreationTime(),
|
||||
Signature: header.GetSignature(),
|
||||
}
|
||||
|
||||
newPieceHash := &pb.PieceHash{
|
||||
PieceId: m.TransferPiece.AddressedOrderLimit.Limit.PieceId,
|
||||
Hash: originalPieceHash.Hash,
|
||||
PieceSize: originalPieceHash.PieceSize,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
receivingIdentity := nodeFullIDs[m.TransferPiece.AddressedOrderLimit.Limit.StorageNodeId]
|
||||
require.NotNil(t, receivingIdentity)
|
||||
|
||||
// get the receiving node piece count before processing
|
||||
recNodeID = receivingIdentity.ID
|
||||
|
||||
// update pointer to include the new receiving node before responding to satellite
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, 1)
|
||||
require.NoError(t, err)
|
||||
path := string(keys[0])
|
||||
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, pointer.GetRemote())
|
||||
require.True(t, len(pointer.GetRemote().GetRemotePieces()) > 0)
|
||||
|
||||
pieceToRemove := make([]*pb.RemotePiece, 1)
|
||||
pieceToAdd := make([]*pb.RemotePiece, 1)
|
||||
pieces := pointer.GetRemote().GetRemotePieces()
|
||||
|
||||
for _, piece := range pieces {
|
||||
if pieceToRemove[0] == nil && piece.NodeId != exitingNode.ID() {
|
||||
pieceToRemove[0] = piece
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// create a piece with deleted piece number and receiving node ID from the pointer
|
||||
pieceToAdd[0] = &pb.RemotePiece{
|
||||
PieceNum: pieceToRemove[0].PieceNum,
|
||||
NodeId: recNodeID,
|
||||
}
|
||||
|
||||
_, err = satellite.Metainfo.Service.UpdatePieces(ctx, path, pointer, pieceToAdd, pieceToRemove)
|
||||
require.NoError(t, err)
|
||||
|
||||
signer := signing.SignerFromFullIdentity(receivingIdentity)
|
||||
|
||||
signedNewPieceHash, err := signing.SignPieceHash(ctx, signer, newPieceHash)
|
||||
require.NoError(t, err)
|
||||
|
||||
success := &pb.StorageNodeMessage{
|
||||
Message: &pb.StorageNodeMessage_Succeeded{
|
||||
Succeeded: &pb.TransferSucceeded{
|
||||
OriginalPieceId: m.TransferPiece.OriginalPieceId,
|
||||
OriginalPieceHash: originalPieceHash,
|
||||
OriginalOrderLimit: &orderLimit,
|
||||
ReplacementPieceHash: signedNewPieceHash,
|
||||
},
|
||||
},
|
||||
}
|
||||
err = processClient.Send(success)
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
_, err = processClient.Recv()
|
||||
require.Error(t, err)
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.Internal))
|
||||
|
||||
// check exiting node is still in the pointer
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, 1)
|
||||
require.NoError(t, err)
|
||||
path := string(keys[0])
|
||||
pointer, err := satellite.Metainfo.Service.Get(ctx, path)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, pointer.GetRemote())
|
||||
require.True(t, len(pointer.GetRemote().GetRemotePieces()) > 0)
|
||||
|
||||
pieces := pointer.GetRemote().GetRemotePieces()
|
||||
|
||||
pieceMap := make(map[storj.NodeID]int)
|
||||
for _, piece := range pieces {
|
||||
pieceMap[piece.NodeId]++
|
||||
}
|
||||
|
||||
exitingNodeID := exitingNode.ID()
|
||||
count, ok := pieceMap[exitingNodeID]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, count)
|
||||
count, ok = pieceMap[recNodeID]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, count)
|
||||
})
|
||||
}
|
||||
|
||||
func testTransfers(t *testing.T, objects int, verifier func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
|
||||
successThreshold := 8
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
|
Loading…
Reference in New Issue
Block a user