gracefulexit: use GetSegmentByLocation instead of GetObjectLatestVersion

This enables the transfer of pieces from an on-going multipart upload.
Tests are also modified to take into account pending multipart uploads.

See https://storjlabs.atlassian.net/browse/PG-161

Change-Id: I35d433c44dd6e618667e5e8f9f998ef867b9f1ad
This commit is contained in:
Fadila Khadar 2021-02-10 11:15:19 +01:00 committed by Kaloyan Raev
parent 79d6294dbe
commit 5dd76522af
3 changed files with 443 additions and 287 deletions

View File

@ -4,6 +4,7 @@
package gracefulexit_test
import (
"bytes"
"context"
"testing"
"time"
@ -21,6 +22,7 @@ import (
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/uplink/private/multipart"
)
func TestChore(t *testing.T) {
@ -42,14 +44,24 @@ func TestChore(t *testing.T) {
satellite := planet.Satellites[0]
exitingNode := planet.StorageNodes[1]
project, err := uplinkPeer.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
satellite.GracefulExit.Chore.Loop.Pause()
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path2", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
info, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path3", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path3", info.StreamID, 1, bytes.NewReader(testrand.Bytes(5*memory.KiB)))
require.NoError(t, err)
exitStatusRequest := overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
@ -72,7 +84,7 @@ func TestChore(t *testing.T) {
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 2)
require.Len(t, incompleteTransfers, 3)
for _, incomplete := range incompleteTransfers {
require.True(t, incomplete.DurabilityRatio > 0)
require.NotNil(t, incomplete.RootPieceID)
@ -104,7 +116,7 @@ func TestChore(t *testing.T) {
incompleteTransfers, err = satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 2)
require.Len(t, incompleteTransfers, 3)
// node should fail graceful exit if it has been inactive for maximum inactive time frame since last activity
time.Sleep(maximumInactiveTimeFrame + time.Second*1)
@ -145,9 +157,18 @@ func TestDurabilityRatio(t *testing.T) {
nodeToRemove := planet.StorageNodes[0]
exitingNode := planet.StorageNodes[1]
project, err := uplinkPeer.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
satellite.GracefulExit.Chore.Loop.Pause()
err := uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
err = uplinkPeer.Upload(ctx, satellite, "testbucket", "test/path1", testrand.Bytes(5*memory.KiB))
require.NoError(t, err)
info, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path2", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path2", info.StreamID, 1, bytes.NewReader(testrand.Bytes(5*memory.KiB)))
require.NoError(t, err)
exitStatusRequest := overlay.ExitStatusRequest{
@ -171,9 +192,10 @@ func TestDurabilityRatio(t *testing.T) {
// retrieve remote segment
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments, 2)
remotePieces := segments[0].Pieces
for _, segment := range segments {
remotePieces := segment.Pieces
var newPieces metabase.Pieces = make(metabase.Pieces, len(remotePieces)-1)
idx := 0
for _, p := range remotePieces {
@ -182,21 +204,21 @@ func TestDurabilityRatio(t *testing.T) {
idx++
}
}
err = satellite.Metainfo.Metabase.UpdateSegmentPieces(ctx, metabase.UpdateSegmentPieces{
StreamID: segments[0].StreamID,
Position: segments[0].Position,
StreamID: segment.StreamID,
Position: segment.Position,
OldPieces: segments[0].Pieces,
OldPieces: segment.Pieces,
NewPieces: newPieces,
})
require.NoError(t, err)
}
satellite.GracefulExit.Chore.Loop.TriggerWait()
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 20, 0)
require.NoError(t, err)
require.Len(t, incompleteTransfers, 1)
require.Len(t, incompleteTransfers, 2)
for _, incomplete := range incompleteTransfers {
require.Equal(t, float64(successThreshold-1)/float64(successThreshold), incomplete.DurabilityRatio)
require.NotNil(t, incomplete.RootPieceID)

View File

@ -862,26 +862,15 @@ func (endpoint *Endpoint) getValidSegment(ctx context.Context, key metabase.Segm
// TODO refactor PendingTransfer and TransferQueueItem to provide StreamID/Position to be able
// to get segment from object with specific version, this will work only until we won't have
// multiple object versions
object, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: location.Object(),
segment, err := endpoint.metabase.GetSegmentByLocation(ctx, metabase.GetSegmentByLocation{
SegmentLocation: location,
})
if err != nil {
return metabase.Segment{}, Error.Wrap(err)
}
segment, err := endpoint.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: object.StreamID,
Position: location.Position,
})
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
return metabase.Segment{}, Error.New("segment (%s, %d) no longer exists.", object.StreamID, location.Position.Encode())
}
return metabase.Segment{}, Error.Wrap(err)
}
if !originalRootPieceID.IsZero() && originalRootPieceID != segment.RootPieceID {
return metabase.Segment{}, Error.New("segment (%s, %d) has changed.", object.StreamID, location.Position.Encode())
return metabase.Segment{}, Error.New("segment has changed")
}
return segment, nil
}

View File

@ -4,6 +4,7 @@
package gracefulexit_test
import (
"bytes"
"context"
"io"
"strconv"
@ -34,9 +35,11 @@ import (
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/gracefulexit"
"storj.io/uplink/private/multipart"
)
const numObjects = 6
const numMultipartObjects = 6
// exitProcessClient is used so we can pass the graceful exit process clients regardless of implementation.
type exitProcessClient interface {
@ -45,7 +48,7 @@ type exitProcessClient interface {
}
func TestSuccess(t *testing.T) {
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, numObjects, numMultipartObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var pieceID storj.PieceID
failedCount := 0
deletedCount := 0
@ -129,7 +132,7 @@ func TestSuccess(t *testing.T) {
err = signing.VerifyExitCompleted(ctx, signee, m.ExitCompleted)
require.NoError(t, err)
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
}
@ -216,10 +219,10 @@ func TestConcurrentConnections(t *testing.T) {
require.NoError(t, err)
response, err := c.Recv()
require.NoError(t, err)
switch response.GetMessage().(type) {
switch m := response.GetMessage().(type) {
case *pb.SatelliteMessage_NotReady:
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
require.NoError(t, c.Close())
}
@ -323,7 +326,7 @@ func TestRecvTimeout(t *testing.T) {
}
func TestInvalidStorageNodeSignature(t *testing.T) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, 0, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -436,7 +439,7 @@ func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
}
func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
testTransfers(t, numObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, numObjects, numMultipartObjects, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var disqualifiedError error
isDisqualified := false
for {
@ -503,7 +506,7 @@ func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
case *pb.SatelliteMessage_DeletePiece:
continue
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
}
// check that the exit has failed due to node has been disqualified
@ -524,7 +527,14 @@ func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
}
func TestFailureHashMismatch(t *testing.T) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, 0, testFailureHashMismatch)
}
func TestFailureHashMismatchMultipart(t *testing.T) {
testTransfers(t, 0, 1, testFailureHashMismatch)
}
func testFailureHashMismatch(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -598,11 +608,10 @@ func TestFailureHashMismatch(t *testing.T) {
require.Equal(t, int64(0), progress.PiecesTransferred)
require.Equal(t, int64(1), progress.PiecesFailed)
})
}
func TestFailureUnknownError(t *testing.T) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, 0, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -643,7 +652,7 @@ func TestFailureUnknownError(t *testing.T) {
}
func TestFailureUplinkSignature(t *testing.T) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, 0, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -723,7 +732,14 @@ func TestFailureUplinkSignature(t *testing.T) {
}
func TestSuccessSegmentUpdate(t *testing.T) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, 0, testSuccessSegmentUpdate)
}
func TestSuccessSegmentUpdateMultipart(t *testing.T) {
testTransfers(t, 0, 1, testSuccessSegmentUpdate)
}
func testSuccessSegmentUpdate(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
var recNodeID storj.NodeID
response, err := processClient.Recv()
@ -779,7 +795,7 @@ func TestSuccessSegmentUpdate(t *testing.T) {
err = processClient.Send(success)
require.NoError(t, err)
default:
t.FailNow()
require.FailNow(t, "did not get a TransferPiece message")
}
response, err = processClient.Recv()
@ -789,9 +805,8 @@ func TestSuccessSegmentUpdate(t *testing.T) {
case *pb.SatelliteMessage_DeletePiece:
// expect the delete piece message
default:
t.FailNow()
require.FailNow(t, "did not get a DeletePiece message")
}
// check that the exit has completed and we have the correct transferred/failed values
progress, err := satellite.DB.GracefulExit().GetProgress(ctx, exitingNode.ID())
require.NoError(t, err)
@ -803,7 +818,6 @@ func TestSuccessSegmentUpdate(t *testing.T) {
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
found := 0
require.True(t, len(segments[0].Pieces) > 0)
for _, piece := range segments[0].Pieces {
@ -813,11 +827,17 @@ func TestSuccessSegmentUpdate(t *testing.T) {
}
}
require.Equal(t, 1, found)
})
}
func TestUpdateSegmentFailure_DuplicatedNodeID(t *testing.T) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, 0, testUpdateSegmentFailureDuplicatedNodeID)
}
func TestUpdateSegmentFailure_DuplicatedNodeIDMultipart(t *testing.T) {
testTransfers(t, 0, 1, testUpdateSegmentFailureDuplicatedNodeID)
}
func testUpdateSegmentFailureDuplicatedNodeID(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -896,7 +916,7 @@ func TestUpdateSegmentFailure_DuplicatedNodeID(t *testing.T) {
err = processClient.Send(success)
require.NoError(t, err)
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
response, err = processClient.Recv()
@ -908,13 +928,14 @@ func TestUpdateSegmentFailure_DuplicatedNodeID(t *testing.T) {
require.True(t, m.TransferPiece.OriginalPieceId == pieceID)
require.True(t, m.TransferPiece.AddressedOrderLimit.Limit.StorageNodeId != firstRecNodeID)
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
// check exiting node is still in the segment
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.True(t, len(segments[0].Pieces) > 0)
pieces := segments[0].Pieces
@ -930,9 +951,7 @@ func TestUpdateSegmentFailure_DuplicatedNodeID(t *testing.T) {
count, ok = pieceMap[firstRecNodeID]
require.True(t, ok)
require.Equal(t, 1, count)
})
}
func TestExitDisabled(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
@ -1051,7 +1070,116 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
require.NotNil(t, exitStatus.ExitFinishedAt)
require.True(t, exitStatus.ExitSuccess)
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0)
require.NoError(t, err)
require.Len(t, queueItems, 0)
})
}
func TestSegmentChangedOrDeletedMultipart(t *testing.T) {
successThreshold := 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: successThreshold + 1,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, successThreshold, successThreshold),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
project, err := uplinkPeer.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
satellite.GracefulExit.Chore.Loop.Pause()
_, err = project.EnsureBucket(ctx, "testbucket")
require.NoError(t, err)
// TODO: activate when an object part can be overwritten
// info0, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path0", nil)
// require.NoError(t, err)
// _, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path0", info0.StreamID, 1, bytes.NewReader(testrand.Bytes(5*memory.KiB)))
// require.NoError(t, err)
info1, err := multipart.NewMultipartUpload(ctx, project, "testbucket", "test/path1", nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path1", info1.StreamID, 1, bytes.NewReader(testrand.Bytes(5*memory.KiB)))
require.NoError(t, err)
// check that there are no exiting nodes.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 0)
exitingNode, err := findNodeToExit(ctx, planet, 2)
require.NoError(t, err)
exitRequest := &overlay.ExitStatusRequest{
NodeID: exitingNode.ID(),
ExitInitiatedAt: time.Now(),
}
_, err = satellite.DB.OverlayCache().UpdateExitStatus(ctx, exitRequest)
require.NoError(t, err)
err = satellite.DB.GracefulExit().IncrementProgress(ctx, exitingNode.ID(), 0, 0, 0)
require.NoError(t, err)
exitingNodes, err = satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
require.Len(t, exitingNodes, 1)
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
// trigger the metainfo loop chore so we can get some pieces to transfer
satellite.GracefulExit.Chore.Loop.TriggerWait()
// make sure all the pieces are in the transfer queue
incomplete, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 10, 0)
require.NoError(t, err)
require.Len(t, incomplete, 1)
// TODO: change to this when an object part can be overwritten
// require.Len(t, incomplete, 2)
// updating the first object and deleting the second. this will cause a root piece ID change which will result in
// a successful graceful exit instead of a request to transfer pieces since the root piece IDs will have changed.
// TODO: activate when an object part can be overwritten
// _, err = multipart.PutObjectPart(ctx, project, "testbucket", "test/path0", info0.StreamID, 1, bytes.NewReader(testrand.Bytes(5*memory.KiB)))
// require.NoError(t, err)
err = multipart.AbortMultipartUpload(ctx, project, "testbucket", "test/path1", info1.StreamID)
require.NoError(t, err)
// reconnect to the satellite.
conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL())
require.NoError(t, err)
defer ctx.Check(conn.Close)
client := pb.NewDRPCSatelliteGracefulExitClient(conn)
c, err := client.Process(ctx)
require.NoError(t, err)
defer ctx.Check(c.CloseSend)
response, err := c.Recv()
require.NoError(t, err)
// we expect an exit completed b/c there is nothing to do here
switch m := response.GetMessage().(type) {
case *pb.SatelliteMessage_ExitCompleted:
signee := signing.SigneeFromPeerIdentity(satellite.Identity.PeerIdentity())
err = signing.VerifyExitCompleted(ctx, signee, m.ExitCompleted)
require.NoError(t, err)
exitStatus, err := satellite.DB.OverlayCache().GetExitStatus(ctx, exitingNode.ID())
require.NoError(t, err)
require.NotNil(t, exitStatus.ExitFinishedAt)
require.True(t, exitStatus.ExitSuccess)
default:
require.FailNow(t, "should not reach this case: %#v", m)
}
queueItems, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), 2, 0)
@ -1061,7 +1189,7 @@ func TestSegmentChangedOrDeleted(t *testing.T) {
}
func TestFailureNotFound(t *testing.T) {
testTransfers(t, 1, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
testTransfers(t, 1, 0, func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int) {
response, err := processClient.Recv()
require.NoError(t, err)
@ -1172,7 +1300,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
require.NoError(t, err)
// should get a NotReady since the metainfo loop would not be finished at this point.
switch response.GetMessage().(type) {
switch m := response.GetMessage().(type) {
case *pb.SatelliteMessage_NotReady:
// now check that the exiting node is initiated.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
@ -1181,7 +1309,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
// close the old client
require.NoError(t, c.CloseSend())
@ -1213,7 +1341,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
require.NoError(t, err)
}
switch response.GetMessage().(type) {
switch m := response.GetMessage().(type) {
case *pb.SatelliteMessage_ExitCompleted:
break MessageLoop
case *pb.SatelliteMessage_TransferPiece:
@ -1227,7 +1355,7 @@ func TestFailureStorageNodeIgnoresTransferMessages(t *testing.T) {
require.NoError(t, err)
require.NoError(t, c.CloseSend())
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
}
}
@ -1309,7 +1437,7 @@ func TestIneligibleNodeAge(t *testing.T) {
})
}
func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
func testTransfers(t *testing.T, objects int, multipartObjects int, verifier func(t *testing.T, ctx *testcontext.Context, nodeFullIDs map[storj.NodeID]*identity.FullIdentity, satellite *testplanet.Satellite, processClient exitProcessClient, exitingNode *storagenode.Peer, numPieces int)) {
const successThreshold = 4
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
@ -1322,6 +1450,13 @@ func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *t
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
project, err := uplinkPeer.GetProject(ctx, satellite)
require.NoError(t, err)
defer func() { require.NoError(t, project.Close()) }()
_, err = project.EnsureBucket(ctx, "testbucket")
require.NoError(t, err)
satellite.GracefulExit.Chore.Loop.Pause()
nodeFullIDs := make(map[storj.NodeID]*identity.FullIdentity)
@ -1334,6 +1469,16 @@ func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *t
require.NoError(t, err)
}
for i := 0; i < multipartObjects; i++ {
objectName := "test/multipart" + strconv.Itoa(i)
info, err := multipart.NewMultipartUpload(ctx, project, "testbucket", objectName, nil)
require.NoError(t, err)
_, err = multipart.PutObjectPart(ctx, project, "testbucket", objectName, info.StreamID, 1, bytes.NewReader(testrand.Bytes(5*memory.KiB)))
require.NoError(t, err)
}
// check that there are no exiting nodes.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
require.NoError(t, err)
@ -1356,7 +1501,7 @@ func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *t
require.NoError(t, err)
// should get a NotReady since the metainfo loop would not be finished at this point.
switch response.GetMessage().(type) {
switch m := response.GetMessage().(type) {
case *pb.SatelliteMessage_NotReady:
// now check that the exiting node is initiated.
exitingNodes, err := satellite.DB.OverlayCache().GetExitingNodes(ctx)
@ -1365,7 +1510,7 @@ func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *t
require.Equal(t, exitingNode.ID(), exitingNodes[0].NodeID)
default:
t.FailNow()
require.FailNow(t, "should not reach this case: %#v", m)
}
// close the old client
require.NoError(t, c.CloseSend())
@ -1374,7 +1519,7 @@ func testTransfers(t *testing.T, objects int, verifier func(t *testing.T, ctx *t
satellite.GracefulExit.Chore.Loop.TriggerWait()
// make sure all the pieces are in the transfer queue
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects, 0)
incompleteTransfers, err := satellite.DB.GracefulExit().GetIncomplete(ctx, exitingNode.ID(), objects+multipartObjects, 0)
require.NoError(t, err)
// connect to satellite again to start receiving transfers