satellite:gracefulexit: Update pointer after success (#3369)
This commit is contained in:
parent
f4f142bf3e
commit
f0caa6ce5e
@ -556,6 +556,11 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, pending *pendingM
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.updatePointer(ctx, exitingNodeID, receivingNodeID, transfer.path, transfer.pieceNum)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
var failed int64
|
||||
if transferQueueItem.FailedCount != nil && *transferQueueItem.FailedCount >= endpoint.config.MaxFailuresPerPiece {
|
||||
failed = -1
|
||||
@ -620,3 +625,39 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap,
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) updatePointer(ctx context.Context, exitingNodeID storj.NodeID, receivingNodeID storj.NodeID, path []byte, pieceNum int32) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// remove the node from the pointer
|
||||
pointer, err := endpoint.metainfo.Get(ctx, string(path))
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
remote := pointer.GetRemote()
|
||||
// nothing to do here
|
||||
if remote == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var toRemove []*pb.RemotePiece
|
||||
for _, piece := range remote.GetRemotePieces() {
|
||||
if piece.NodeId == exitingNodeID && piece.PieceNum == pieceNum {
|
||||
toRemove = []*pb.RemotePiece{piece}
|
||||
break
|
||||
}
|
||||
}
|
||||
var toAdd []*pb.RemotePiece
|
||||
if !receivingNodeID.IsZero() {
|
||||
toAdd = []*pb.RemotePiece{{
|
||||
PieceNum: pieceNum,
|
||||
NodeId: receivingNodeID,
|
||||
}}
|
||||
}
|
||||
_, err = endpoint.metainfo.UpdatePieces(ctx, string(path), pointer, toAdd, toRemove)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -367,6 +367,103 @@ func TestFailureUplinkSignature(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestSuccessPointerUpdate(t *testing.T) {
|
||||
testTransfers(t, 1, func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
|
||||
var recNodeID storj.NodeID
|
||||
|
||||
response, err := processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch m := response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_TransferPiece:
|
||||
require.NotNil(t, m)
|
||||
|
||||
pieceReader, err := exitingNode.Storage2.Store.Reader(ctx, satellite.ID(), m.TransferPiece.OriginalPieceId)
|
||||
require.NoError(t, err)
|
||||
|
||||
header, err := pieceReader.GetPieceHeader()
|
||||
require.NoError(t, err)
|
||||
|
||||
orderLimit := header.OrderLimit
|
||||
originalPieceHash := &pb.PieceHash{
|
||||
PieceId: orderLimit.PieceId,
|
||||
Hash: header.GetHash(),
|
||||
PieceSize: pieceReader.Size(),
|
||||
Timestamp: header.GetCreationTime(),
|
||||
Signature: header.GetSignature(),
|
||||
}
|
||||
|
||||
newPieceHash := &pb.PieceHash{
|
||||
PieceId: m.TransferPiece.AddressedOrderLimit.Limit.PieceId,
|
||||
Hash: originalPieceHash.Hash,
|
||||
PieceSize: originalPieceHash.PieceSize,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
receivingIdentity := nodeFullIDs[m.TransferPiece.AddressedOrderLimit.Limit.StorageNodeId]
|
||||
require.NotNil(t, receivingIdentity)
|
||||
|
||||
// get the receiving node piece count before processing
|
||||
recNodeID = receivingIdentity.ID
|
||||
|
||||
signer := signing.SignerFromFullIdentity(receivingIdentity)
|
||||
|
||||
signedNewPieceHash, err := signing.SignPieceHash(ctx, signer, newPieceHash)
|
||||
require.NoError(t, err)
|
||||
|
||||
success := &pb.StorageNodeMessage{
|
||||
Message: &pb.StorageNodeMessage_Succeeded{
|
||||
Succeeded: &pb.TransferSucceeded{
|
||||
OriginalPieceId: m.TransferPiece.OriginalPieceId,
|
||||
OriginalPieceHash: originalPieceHash,
|
||||
OriginalOrderLimit: &orderLimit,
|
||||
ReplacementPieceHash: signedNewPieceHash,
|
||||
},
|
||||
},
|
||||
}
|
||||
err = processClient.Send(success)
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
response, err = processClient.Recv()
|
||||
require.NoError(t, err)
|
||||
|
||||
switch response.GetMessage().(type) {
|
||||
case *pb.SatelliteMessage_DeletePiece:
|
||||
// expect the delete piece message
|
||||
default:
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// check that the exit has completed and we have the correct transferred/failed values
|
||||
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, numPieces, progress.PiecesTransferred)
|
||||
// even though we failed 1, it eventually succeeded, so the count should be 0
|
||||
require.EqualValues(t, 0, progress.PiecesFailed)
|
||||
|
||||
keys, err := satellite.Metainfo.Database.List(ctx, nil, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointer, err := satellite.Metainfo.Service.Get(ctx, string(keys[0]))
|
||||
require.NoError(t, err)
|
||||
|
||||
found := 0
|
||||
require.NotNil(t, pointer.GetRemote())
|
||||
require.True(t, len(pointer.GetRemote().GetRemotePieces()) > 0)
|
||||
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
||||
require.NotEqual(t, exitingNode.ID(), piece.NodeId)
|
||||
if piece.NodeId == recNodeID {
|
||||
found++
|
||||
}
|
||||
}
|
||||
require.Equal(t, 1, found)
|
||||
})
|
||||
}
|
||||
|
||||
func testTransfers(t *testing.T, objects int, verifier func(ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.SatelliteSystem, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
|
||||
successThreshold := 8
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
|
Loading…
Reference in New Issue
Block a user