satellite/metainfo: test to confirm that retried pieces can't be submitted with originals

closes https://github.com/storj/storj/issues/5709

Change-Id: I2030825d85987d97ae8c10f02ab1f70086cbcb97
This commit is contained in:
JT Olio 2023-04-10 12:53:32 -04:00 committed by Storj Robot
parent a21afeddd1
commit 16ccffb6f7

View File

@ -724,6 +724,117 @@ func TestRetryBeginSegmentPieces_Validation(t *testing.T) {
})
}
func TestCommitSegment_RejectRetryDuplicate(t *testing.T) {
// this test is going to make sure commit segment won't allow someone to commit a segment with
// piece 2 and retried piece 2 together.
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
bucket := createTestBucket(ctx, t, planet)
metainfoClient := createMetainfoClient(ctx, t, planet)
params := metaclient.BeginObjectParams{
Bucket: []byte(bucket.Name),
EncryptedObjectKey: []byte("encrypted-path"),
EncryptionParameters: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 256,
},
}
beginObjectResp, err := metainfoClient.BeginObject(ctx, params)
require.NoError(t, err)
beginSegmentResp, err := metainfoClient.BeginSegment(ctx, metaclient.BeginSegmentParams{
StreamID: beginObjectResp.StreamID,
Position: metaclient.SegmentPosition{},
MaxOrderLimit: 1024,
})
require.NoError(t, err)
exchangeSegmentPieceOrdersResp, err := metainfoClient.RetryBeginSegmentPieces(ctx, metaclient.RetryBeginSegmentPiecesParams{
SegmentID: beginSegmentResp.SegmentID,
RetryPieceNumbers: []int{2},
})
require.NoError(t, err)
var nodes = make(map[string]bool)
makeResult := func(i int, limits []*pb.AddressedOrderLimit) *pb.SegmentPieceUploadResult {
limit := limits[i].Limit
node := planet.FindNode(limit.StorageNodeId)
nodes[node.Addr()] = true
require.NotNil(t, node, "failed to locate node to sign hash for piece %d", i)
signer := signing.SignerFromFullIdentity(node.Identity)
hash, err := signing.SignPieceHash(ctx, signer, &pb.PieceHash{
PieceSize: 512,
PieceId: limit.PieceId,
Timestamp: time.Now(),
})
require.NoError(t, err)
return &pb.SegmentPieceUploadResult{
PieceNum: int32(i),
NodeId: limit.StorageNodeId,
Hash: hash,
}
}
commitSegmentParams := metaclient.CommitSegmentParams{
SegmentID: beginSegmentResp.SegmentID,
PlainSize: 512,
SizeEncryptedData: 512,
Encryption: metaclient.SegmentEncryption{
EncryptedKey: testrand.Bytes(256),
},
UploadResult: []*pb.SegmentPieceUploadResult{
makeResult(0, beginSegmentResp.Limits),
makeResult(1, beginSegmentResp.Limits),
makeResult(2, beginSegmentResp.Limits),
makeResult(2, exchangeSegmentPieceOrdersResp.Limits),
makeResult(3, beginSegmentResp.Limits),
makeResult(4, beginSegmentResp.Limits),
},
}
require.Equal(t, 6, len(nodes))
err = metainfoClient.CommitSegment(ctx, commitSegmentParams)
rpctest.RequireStatusContains(t, err, rpcstatus.InvalidArgument, "metaclient: Number of valid pieces (5) is less than the success threshold (6).")
commitSegmentParams.SegmentID = exchangeSegmentPieceOrdersResp.SegmentID
err = metainfoClient.CommitSegment(ctx, commitSegmentParams)
rpctest.RequireStatusContains(t, err, rpcstatus.InvalidArgument, "metaclient: Number of valid pieces (5) is less than the success threshold (6).")
commitSegmentParams.UploadResult = append(commitSegmentParams.UploadResult, makeResult(5, beginSegmentResp.Limits))
require.Equal(t, 7, len(nodes))
err = metainfoClient.CommitSegment(ctx, commitSegmentParams)
require.NoError(t, err)
err = metainfoClient.CommitObject(ctx, metaclient.CommitObjectParams{
StreamID: beginObjectResp.StreamID,
})
require.NoError(t, err)
// now we need to make sure that what just happened only used 6 pieces. it would be nice if
// we had an API that was more direct than this:
resp, err := metainfoClient.GetObjectIPs(ctx, metaclient.GetObjectIPsParams{
Bucket: []byte(bucket.Name),
EncryptedObjectKey: []byte("encrypted-path"),
})
require.NoError(t, err)
require.Equal(t, int64(1), resp.SegmentCount)
require.Equal(t, int64(6), resp.PieceCount)
piece2Count := 0
for _, addr := range resp.IPPorts {
switch string(addr) {
case planet.FindNode(beginSegmentResp.Limits[2].Limit.StorageNodeId).Addr():
piece2Count++
case planet.FindNode(exchangeSegmentPieceOrdersResp.Limits[2].Limit.StorageNodeId).Addr():
piece2Count++
}
}
require.Equal(t, 1, piece2Count)
})
}
func createTestBucket(ctx context.Context, tb testing.TB, planet *testplanet.Planet) storj.Bucket {
bucket, err := planet.Satellites[0].API.Buckets.Service.CreateBucket(ctx, storj.Bucket{
Name: "test",