satellite/metainfo: fix deletion tests

Main issue with those tests was that for case where all objects were
uploaded at once (case "some nodes down" and "all nodes down").
Because all objects had the same name while upload each new object
was overwriting existing object. Because of that instead had several
objects to delete by test explicitly we had just 1. Test were not
failing because while overwriting existing object we were deleting it
but it was not what this test should do.

Change-Id: I602116f00be66589c7c0e68fe28c25e5c03e6b5d
This commit is contained in:
Michal Niewrzal 2022-09-12 12:25:57 +02:00
parent 158eb2381e
commit 464ab43558

View File

@ -1115,146 +1115,111 @@ func createGeofencedBucket(t *testing.T, ctx *testcontext.Context, buckets *buck
}
func TestEndpoint_DeleteCommittedObject(t *testing.T) {
bucketName := "a-bucket"
createObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, data []byte) {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "object-filename", data)
createObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket, key, data)
require.NoError(t, err)
}
deleteAllObjects := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
deleteObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
projectID := planet.Uplinks[0].Projects[0].ID
items, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, projectID, bucketName)
require.NoError(t, err)
require.GreaterOrEqual(t, len(items), 1)
for _, item := range items {
_, err = planet.Satellites[0].Metainfo.Endpoint.DeleteCommittedObject(ctx, projectID, bucketName, item.ObjectKey)
require.NoError(t, err)
}
items, err = planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, projectID, bucketName)
_, err := planet.Satellites[0].Metainfo.Endpoint.DeleteCommittedObject(ctx, projectID, bucket, metabase.ObjectKey(encryptedKey))
require.NoError(t, err)
require.Len(t, items, 0)
}
testDeleteObject(t, createObject, deleteAllObjects)
testDeleteObject(t, createObject, deleteObject)
}
func TestEndpoint_DeletePendingObject(t *testing.T) {
bucketName := "a-bucket"
createObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, data []byte) {
createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
// TODO This should be replaced by a call to testplanet.Uplink.MultipartUpload when available.
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err, "failed to retrieve project")
_, err = project.EnsureBucket(ctx, bucketName)
_, err = project.EnsureBucket(ctx, bucket)
require.NoError(t, err, "failed to create bucket")
info, err := project.BeginUpload(ctx, bucketName, "object-filename", &uplink.UploadOptions{})
info, err := project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{})
require.NoError(t, err, "failed to start multipart upload")
upload, err := project.UploadPart(ctx, bucketName, bucketName, info.UploadID, 1)
upload, err := project.UploadPart(ctx, bucket, key, info.UploadID, 1)
require.NoError(t, err, "failed to put object part")
_, err = upload.Write(data)
require.NoError(t, err, "failed to put object part")
require.NoError(t, upload.Commit(), "failed to put object part")
}
deleteAllObjects := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
projectID := planet.Uplinks[0].Projects[0].ID
items, err := planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx, projectID, bucketName)
require.NoError(t, err)
require.GreaterOrEqual(t, len(items), 1)
for _, item := range items {
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeletePendingObject(ctx,
metabase.ObjectStream{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: item.ObjectKey,
Version: metabase.DefaultVersion,
StreamID: item.StreamID,
})
require.NoError(t, err)
require.Len(t, deletedObjects, 1)
}
items, err = planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx, projectID, bucketName)
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeletePendingObject(ctx,
metabase.ObjectStream{
ProjectID: projectID,
BucketName: bucket,
ObjectKey: metabase.ObjectKey(encryptedKey),
Version: metabase.DefaultVersion,
StreamID: streamID,
})
require.NoError(t, err)
require.Len(t, items, 0)
require.Len(t, deletedObjects, 1)
}
testDeleteObject(t, createObject, deleteAllObjects)
testDeleteObject(t, createPendingObject, deletePendingObject)
}
func TestEndpoint_DeleteObjectAnyStatus(t *testing.T) {
bucketName := "a-bucket"
createCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, data []byte) {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "object-filename", data)
createCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket, key, data)
require.NoError(t, err)
}
deleteAllCommittedObjects := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
deleteCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
projectID := planet.Uplinks[0].Projects[0].ID
items, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, projectID, bucketName)
require.NoError(t, err)
require.GreaterOrEqual(t, len(items), 1)
for _, item := range items {
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: item.ObjectKey,
})
require.NoError(t, err)
require.Len(t, deletedObjects, 1)
}
items, err = planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx, projectID, bucketName)
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
ProjectID: projectID,
BucketName: bucket,
ObjectKey: metabase.ObjectKey(encryptedKey),
})
require.NoError(t, err)
require.Len(t, items, 0)
require.Len(t, deletedObjects, 1)
}
testDeleteObject(t, createCommittedObject, deleteAllCommittedObjects)
testDeleteObject(t, createCommittedObject, deleteCommittedObject)
createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, data []byte) {
createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
// TODO This should be replaced by a call to testplanet.Uplink.MultipartUpload when available.
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err, "failed to retrieve project")
_, err = project.EnsureBucket(ctx, bucketName)
_, err = project.EnsureBucket(ctx, bucket)
require.NoError(t, err, "failed to create bucket")
info, err := project.BeginUpload(ctx, bucketName, "object-filename", &uplink.UploadOptions{})
info, err := project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{})
require.NoError(t, err, "failed to start multipart upload")
upload, err := project.UploadPart(ctx, bucketName, bucketName, info.UploadID, 1)
upload, err := project.UploadPart(ctx, bucket, key, info.UploadID, 1)
require.NoError(t, err, "failed to put object part")
_, err = upload.Write(data)
require.NoError(t, err, "failed to start multipart upload")
require.NoError(t, upload.Commit(), "failed to start multipart upload")
}
deleteAllPendingObjects := func(ctx context.Context, t *testing.T, planet *testplanet.Planet) {
deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
projectID := planet.Uplinks[0].Projects[0].ID
items, err := planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx, projectID, bucketName)
require.NoError(t, err)
require.GreaterOrEqual(t, len(items), 1)
for _, item := range items {
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
ProjectID: projectID,
BucketName: bucketName,
ObjectKey: item.ObjectKey,
})
require.NoError(t, err)
require.Len(t, deletedObjects, 1)
}
items, err = planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx, projectID, bucketName)
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
ProjectID: projectID,
BucketName: bucket,
ObjectKey: metabase.ObjectKey(encryptedKey),
})
require.NoError(t, err)
require.Len(t, items, 0)
require.Len(t, deletedObjects, 1)
}
testDeleteObject(t, createPendingObject, deleteAllPendingObjects)
testDeleteObject(t, createPendingObject, deletePendingObject)
}
func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet,
data []byte), deleteAllObjects func(ctx context.Context, t *testing.T, planet *testplanet.Planet)) {
func testDeleteObject(t *testing.T,
createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte),
deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID),
) {
bucketName := "deleteobjects"
t.Run("all nodes up", func(t *testing.T) {
t.Parallel()
@ -1287,7 +1252,7 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
createObject(ctx, t, planet, tc.objData)
createObject(ctx, t, planet, bucketName, tc.caseDescription, tc.objData)
// calculate the SNs total used space after data upload
var totalUsedSpace int64
@ -1297,7 +1262,11 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
totalUsedSpace += piecesTotal
}
deleteAllObjects(ctx, t, planet)
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
for _, object := range objects {
deleteObject(ctx, t, planet, bucketName, string(object.ObjectKey), object.StreamID)
}
planet.WaitForStorageNodeDeleters(ctx)
@ -1347,7 +1316,7 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
numToShutdown := 2
for _, tc := range testCases {
createObject(ctx, t, planet, tc.objData)
createObject(ctx, t, planet, bucketName, tc.caseDescription, tc.objData)
}
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
@ -1363,7 +1332,11 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
require.NoError(t, planet.StopPeer(planet.StorageNodes[i]))
}
deleteAllObjects(ctx, t, planet)
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
for _, object := range objects {
deleteObject(ctx, t, planet, bucketName, string(object.ObjectKey), object.StreamID)
}
planet.WaitForStorageNodeDeleters(ctx)
@ -1406,7 +1379,7 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
for _, tc := range testCases {
createObject(ctx, t, planet, tc.objData)
createObject(ctx, t, planet, bucketName, tc.caseDescription, tc.objData)
}
// calculate the SNs total used space after data upload
@ -1422,7 +1395,11 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
require.NoError(t, planet.StopPeer(sn))
}
deleteAllObjects(ctx, t, planet)
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
for _, object := range objects {
deleteObject(ctx, t, planet, bucketName, string(object.ObjectKey), object.StreamID)
}
// Check that storage nodes that were offline when deleting the pieces
// they are still holding data