satellite/metainfo: remove unused method
Change-Id: I08e307e6909cdc46951c5f3112d77a685e67fe2e
This commit is contained in:
parent
df9a6e968e
commit
0303920da7
@ -65,11 +65,6 @@ type deletedRemoteSegmentInfo struct {
|
|||||||
RepairedAt *time.Time
|
RepairedAt *time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjectAnyStatusAllVersions contains arguments necessary for deleting all object versions.
|
|
||||||
type DeleteObjectAnyStatusAllVersions struct {
|
|
||||||
ObjectLocation
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteObjectsAllVersions contains arguments necessary for deleting all versions of multiple objects from the same bucket.
|
// DeleteObjectsAllVersions contains arguments necessary for deleting all versions of multiple objects from the same bucket.
|
||||||
type DeleteObjectsAllVersions struct {
|
type DeleteObjectsAllVersions struct {
|
||||||
Locations []ObjectLocation
|
Locations []ObjectLocation
|
||||||
@ -566,66 +561,6 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjectAnyStatusAllVersions deletes all object versions.
|
|
||||||
func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteObjectAnyStatusAllVersions) (result DeleteObjectResult, err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
if db.config.ServerSideCopy {
|
|
||||||
return DeleteObjectResult{}, errs.New("method cannot be used when server-side copy is enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := opts.Verify(); err != nil {
|
|
||||||
return DeleteObjectResult{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = withRows(db.db.QueryContext(ctx, `
|
|
||||||
WITH deleted_objects AS (
|
|
||||||
DELETE FROM objects
|
|
||||||
WHERE
|
|
||||||
project_id = $1 AND
|
|
||||||
bucket_name = $2 AND
|
|
||||||
object_key = $3
|
|
||||||
RETURNING
|
|
||||||
version, stream_id,
|
|
||||||
created_at, expires_at,
|
|
||||||
status, segment_count,
|
|
||||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
|
||||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
|
||||||
encryption
|
|
||||||
), deleted_segments AS (
|
|
||||||
DELETE FROM segments
|
|
||||||
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
|
||||||
RETURNING segments.stream_id,segments.root_piece_id, segments.remote_alias_pieces
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
deleted_objects.version, deleted_objects.stream_id,
|
|
||||||
deleted_objects.created_at, deleted_objects.expires_at,
|
|
||||||
deleted_objects.status, deleted_objects.segment_count,
|
|
||||||
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
|
|
||||||
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
|
|
||||||
deleted_objects.encryption,
|
|
||||||
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
|
|
||||||
FROM deleted_objects
|
|
||||||
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
|
|
||||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
|
|
||||||
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return DeleteObjectResult{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(result.Objects) == 0 {
|
|
||||||
return DeleteObjectResult{}, ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
|
|
||||||
}
|
|
||||||
|
|
||||||
mon.Meter("object_delete").Mark(len(result.Objects))
|
|
||||||
mon.Meter("segment_delete").Mark(len(result.Segments))
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
||||||
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
@ -466,205 +466,6 @@ func TestDeleteObjectExactVersion(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteObjectAnyStatusAllVersions(t *testing.T) {
|
|
||||||
metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
|
||||||
obj := metabasetest.RandObjectStream()
|
|
||||||
|
|
||||||
location := obj.Location()
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
for _, test := range metabasetest.InvalidObjectLocations(location) {
|
|
||||||
test := test
|
|
||||||
t.Run(test.Name, func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: test.ObjectLocation},
|
|
||||||
ErrClass: test.ErrClass,
|
|
||||||
ErrText: test.ErrText,
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("Object missing", func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
|
||||||
ErrClass: &metabase.ErrObjectNotFound,
|
|
||||||
ErrText: "metabase: no rows deleted",
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Delete non existing object version", func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
|
||||||
ErrClass: &metabase.ErrObjectNotFound,
|
|
||||||
ErrText: "metabase: no rows deleted",
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Delete partial object", func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.BeginObjectExactVersion{
|
|
||||||
Opts: metabase.BeginObjectExactVersion{
|
|
||||||
ObjectStream: obj,
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
},
|
|
||||||
Version: 1,
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
|
||||||
Result: metabase.DeleteObjectResult{
|
|
||||||
Objects: []metabase.Object{{
|
|
||||||
ObjectStream: obj,
|
|
||||||
CreatedAt: now,
|
|
||||||
Status: metabase.Pending,
|
|
||||||
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
}},
|
|
||||||
},
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Delete object without segments", func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
encryptedMetadata := testrand.Bytes(1024)
|
|
||||||
encryptedMetadataNonce := testrand.Nonce()
|
|
||||||
encryptedMetadataKey := testrand.Bytes(265)
|
|
||||||
|
|
||||||
object, _ := metabasetest.CreateTestObject{
|
|
||||||
CommitObject: &metabase.CommitObject{
|
|
||||||
ObjectStream: obj,
|
|
||||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
|
||||||
EncryptedMetadata: encryptedMetadata,
|
|
||||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
|
||||||
},
|
|
||||||
}.Run(ctx, t, db, obj, 0)
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
|
||||||
Result: metabase.DeleteObjectResult{
|
|
||||||
Objects: []metabase.Object{object},
|
|
||||||
},
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Delete object with segments", func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
object := metabasetest.CreateObject(ctx, t, db, obj, 2)
|
|
||||||
|
|
||||||
expectedSegmentInfo := metabase.DeletedSegmentInfo{
|
|
||||||
RootPieceID: storj.PieceID{1},
|
|
||||||
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{
|
|
||||||
ObjectLocation: location,
|
|
||||||
},
|
|
||||||
Result: metabase.DeleteObjectResult{
|
|
||||||
Objects: []metabase.Object{object},
|
|
||||||
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo},
|
|
||||||
},
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Delete object with inline segment", func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.BeginObjectExactVersion{
|
|
||||||
Opts: metabase.BeginObjectExactVersion{
|
|
||||||
ObjectStream: obj,
|
|
||||||
Encryption: metabasetest.DefaultEncryption,
|
|
||||||
},
|
|
||||||
Version: obj.Version,
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.CommitInlineSegment{
|
|
||||||
Opts: metabase.CommitInlineSegment{
|
|
||||||
ObjectStream: obj,
|
|
||||||
Position: metabase.SegmentPosition{Part: 0, Index: 0},
|
|
||||||
|
|
||||||
EncryptedKey: testrand.Bytes(32),
|
|
||||||
EncryptedKeyNonce: testrand.Bytes(32),
|
|
||||||
|
|
||||||
InlineData: testrand.Bytes(1024),
|
|
||||||
|
|
||||||
PlainSize: 512,
|
|
||||||
PlainOffset: 0,
|
|
||||||
},
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
object := metabasetest.CommitObject{
|
|
||||||
Opts: metabase.CommitObject{
|
|
||||||
ObjectStream: obj,
|
|
||||||
},
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
|
||||||
Result: metabase.DeleteObjectResult{
|
|
||||||
Objects: []metabase.Object{object},
|
|
||||||
},
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("Delete multiple versions of the same object at once", func(t *testing.T) {
|
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
expected := metabase.DeleteObjectResult{}
|
|
||||||
|
|
||||||
// committed object
|
|
||||||
obj := metabasetest.RandObjectStream()
|
|
||||||
expected.Objects = append(expected.Objects, metabasetest.CreateObject(ctx, t, db, obj, 1))
|
|
||||||
expected.Segments = append(expected.Segments, metabase.DeletedSegmentInfo{
|
|
||||||
RootPieceID: storj.PieceID{1},
|
|
||||||
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
|
||||||
})
|
|
||||||
|
|
||||||
// pending objects
|
|
||||||
for i := 1; i <= 10; i++ {
|
|
||||||
obj.StreamID = testrand.UUID()
|
|
||||||
obj.Version = metabase.NextVersion
|
|
||||||
|
|
||||||
pendingObject, err := db.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
|
||||||
ObjectStream: obj,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// nil ZombieDeletionDeadline because while deleting we are not returning this value with object metadata
|
|
||||||
pendingObject.ZombieDeletionDeadline = nil
|
|
||||||
expected.Objects = append(expected.Objects, pendingObject)
|
|
||||||
}
|
|
||||||
|
|
||||||
metabasetest.DeleteObjectAnyStatusAllVersions{
|
|
||||||
Opts: metabase.DeleteObjectAnyStatusAllVersions{ObjectLocation: obj.Location()},
|
|
||||||
Result: expected,
|
|
||||||
}.Check(ctx, t, db)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDeleteObjectsAllVersions(t *testing.T) {
|
func TestDeleteObjectsAllVersions(t *testing.T) {
|
||||||
metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
metabasetest.RunWithConfig(t, noServerSideCopyConfig, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
obj := metabasetest.RandObjectStream()
|
obj := metabasetest.RandObjectStream()
|
||||||
|
@ -454,29 +454,6 @@ func (step DeletePendingObject) Check(ctx *testcontext.Context, t testing.TB, db
|
|||||||
require.Zero(t, diff)
|
require.Zero(t, diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjectAnyStatusAllVersions is for testing metabase.DeleteObjectAnyStatusAllVersions.
|
|
||||||
type DeleteObjectAnyStatusAllVersions struct {
|
|
||||||
Opts metabase.DeleteObjectAnyStatusAllVersions
|
|
||||||
Result metabase.DeleteObjectResult
|
|
||||||
ErrClass *errs.Class
|
|
||||||
ErrText string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check runs the test.
|
|
||||||
func (step DeleteObjectAnyStatusAllVersions) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
|
||||||
result, err := db.DeleteObjectAnyStatusAllVersions(ctx, step.Opts)
|
|
||||||
checkError(t, err, step.ErrClass, step.ErrText)
|
|
||||||
|
|
||||||
sortObjects(result.Objects)
|
|
||||||
sortObjects(step.Result.Objects)
|
|
||||||
|
|
||||||
sortDeletedSegments(result.Segments)
|
|
||||||
sortDeletedSegments(step.Result.Segments)
|
|
||||||
|
|
||||||
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
|
|
||||||
require.Zero(t, diff)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteObjectsAllVersions is for testing metabase.DeleteObjectsAllVersions.
|
// DeleteObjectsAllVersions is for testing metabase.DeleteObjectsAllVersions.
|
||||||
type DeleteObjectsAllVersions struct {
|
type DeleteObjectsAllVersions struct {
|
||||||
Opts metabase.DeleteObjectsAllVersions
|
Opts metabase.DeleteObjectsAllVersions
|
||||||
|
@ -1500,50 +1500,6 @@ func (endpoint *Endpoint) DeleteCommittedObject(
|
|||||||
return deletedObjects, nil
|
return deletedObjects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjectAnyStatus deletes all the pieces of the storage nodes that belongs
|
|
||||||
// to the specified object.
|
|
||||||
//
|
|
||||||
// NOTE: this method is exported for being able to individually test it without
|
|
||||||
// having import cycles.
|
|
||||||
// TODO regarding the above note: exporting for testing is fine, but we should name
|
|
||||||
// it something that will definitely never ever be added to the rpc set in DRPC
|
|
||||||
// definitions. If we ever decide to add an RPC method called "DeleteObjectAnyStatus",
|
|
||||||
// DRPC interface definitions is all that is standing in the way from someone
|
|
||||||
// remotely calling this. We should name this InternalDeleteObjectAnyStatus or
|
|
||||||
// something.
|
|
||||||
func (endpoint *Endpoint) DeleteObjectAnyStatus(ctx context.Context, location metabase.ObjectLocation,
|
|
||||||
) (deletedObjects []*pb.Object, err error) {
|
|
||||||
defer mon.Task()(&ctx, location.ProjectID.String(), location.BucketName, location.ObjectKey)(&err)
|
|
||||||
|
|
||||||
var result metabase.DeleteObjectResult
|
|
||||||
if endpoint.config.ServerSideCopy {
|
|
||||||
result, err = endpoint.metabase.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
|
|
||||||
ObjectLocation: location,
|
|
||||||
Version: metabase.DefaultVersion,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
result, err = endpoint.metabase.DeleteObjectAnyStatusAllVersions(ctx, metabase.DeleteObjectAnyStatusAllVersions{
|
|
||||||
ObjectLocation: location,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, Error.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
deletedObjects, err = endpoint.deleteObjectResultToProto(ctx, result)
|
|
||||||
if err != nil {
|
|
||||||
endpoint.log.Error("failed to convert delete object result",
|
|
||||||
zap.Stringer("project", location.ProjectID),
|
|
||||||
zap.String("bucket", location.BucketName),
|
|
||||||
zap.Binary("object", []byte(location.ObjectKey)),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return deletedObjects, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeletePendingObject deletes all the pieces of the storage nodes that belongs
|
// DeletePendingObject deletes all the pieces of the storage nodes that belongs
|
||||||
// to the specified pending object.
|
// to the specified pending object.
|
||||||
//
|
//
|
||||||
|
@ -1590,59 +1590,6 @@ func TestEndpoint_DeletePendingObject(t *testing.T) {
|
|||||||
testDeleteObject(t, createPendingObject, deletePendingObject)
|
testDeleteObject(t, createPendingObject, deletePendingObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEndpoint_DeleteObjectAnyStatus(t *testing.T) {
|
|
||||||
createCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
|
|
||||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket, key, data)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
deleteCommittedObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
|
|
||||||
projectID := planet.Uplinks[0].Projects[0].ID
|
|
||||||
|
|
||||||
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
|
||||||
ProjectID: projectID,
|
|
||||||
BucketName: bucket,
|
|
||||||
ObjectKey: metabase.ObjectKey(encryptedKey),
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, deletedObjects, 1)
|
|
||||||
|
|
||||||
}
|
|
||||||
testDeleteObject(t, createCommittedObject, deleteCommittedObject)
|
|
||||||
|
|
||||||
createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) {
|
|
||||||
// TODO This should be replaced by a call to testplanet.Uplink.MultipartUpload when available.
|
|
||||||
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
|
||||||
require.NoError(t, err, "failed to retrieve project")
|
|
||||||
defer func() { require.NoError(t, project.Close()) }()
|
|
||||||
|
|
||||||
_, err = project.EnsureBucket(ctx, bucket)
|
|
||||||
require.NoError(t, err, "failed to create bucket")
|
|
||||||
|
|
||||||
info, err := project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{})
|
|
||||||
require.NoError(t, err, "failed to start multipart upload")
|
|
||||||
|
|
||||||
upload, err := project.UploadPart(ctx, bucket, key, info.UploadID, 1)
|
|
||||||
require.NoError(t, err, "failed to put object part")
|
|
||||||
_, err = upload.Write(data)
|
|
||||||
require.NoError(t, err, "failed to start multipart upload")
|
|
||||||
require.NoError(t, upload.Commit(), "failed to start multipart upload")
|
|
||||||
}
|
|
||||||
|
|
||||||
deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) {
|
|
||||||
projectID := planet.Uplinks[0].Projects[0].ID
|
|
||||||
|
|
||||||
deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeleteObjectAnyStatus(ctx, metabase.ObjectLocation{
|
|
||||||
ProjectID: projectID,
|
|
||||||
BucketName: bucket,
|
|
||||||
ObjectKey: metabase.ObjectKey(encryptedKey),
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, deletedObjects, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
testDeleteObject(t, createPendingObject, deletePendingObject)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testDeleteObject(t *testing.T,
|
func testDeleteObject(t *testing.T,
|
||||||
createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte),
|
createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte),
|
||||||
deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID),
|
deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID),
|
||||||
|
Loading…
Reference in New Issue
Block a user