satellite/{metainfo,metabase}: delete from pending_objects table
We are deleting pending objects while aborting multipart upload. We are using metainfo BeginDeleteObject to do that. This change starts using DeletePendingObjectNew to delete entry from pending_objects table when request indicates that object is in this table. Part of https://github.com/storj/storj/issues/6048 Change-Id: I4478a9c13c8e3db48dc5de3087ef03d1b4c47a5c
This commit is contained in:
parent
33c0a82fb7
commit
5a8ef89824
@ -243,6 +243,50 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeletePendingObjectNew deletes a pending object.
|
||||||
|
// TODO DeletePendingObjectNew will replace DeletePendingObject when objects table will be free from pending objects.
|
||||||
|
func (db *DB) DeletePendingObjectNew(ctx context.Context, opts DeletePendingObject) (result DeleteObjectResult, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if err := opts.Verify(); err != nil {
|
||||||
|
return DeleteObjectResult{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = withRows(db.db.QueryContext(ctx, `
|
||||||
|
WITH deleted_objects AS (
|
||||||
|
DELETE FROM pending_objects
|
||||||
|
WHERE
|
||||||
|
project_id = $1 AND
|
||||||
|
bucket_name = $2 AND
|
||||||
|
object_key = $3 AND
|
||||||
|
stream_id = $4
|
||||||
|
RETURNING
|
||||||
|
stream_id, created_at, expires_at,
|
||||||
|
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
||||||
|
encryption
|
||||||
|
), deleted_segments AS (
|
||||||
|
DELETE FROM segments
|
||||||
|
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||||
|
RETURNING segments.stream_id
|
||||||
|
)
|
||||||
|
SELECT * FROM deleted_objects
|
||||||
|
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID))(func(rows tagsql.Rows) error {
|
||||||
|
result.Objects, err = db.scanPendingObjectDeletion(ctx, opts.Location(), rows)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return DeleteObjectResult{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(result.Objects) == 0 {
|
||||||
|
return DeleteObjectResult{}, ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
|
||||||
|
}
|
||||||
|
|
||||||
|
mon.Meter("object_delete").Mark(len(result.Objects))
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
||||||
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
@ -317,7 +361,6 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
|
|||||||
|
|
||||||
func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, err error) {
|
func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
||||||
|
|
||||||
objects = make([]Object, 0, 10)
|
objects = make([]Object, 0, 10)
|
||||||
|
|
||||||
@ -341,16 +384,11 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r
|
|||||||
objects = append(objects, object)
|
objects = append(objects, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return nil, Error.New("unable to delete object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return objects, nil
|
return objects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows) (objects []Object, err error) {
|
func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows) (objects []Object, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
||||||
|
|
||||||
objects = make([]Object, 0, 10)
|
objects = make([]Object, 0, 10)
|
||||||
|
|
||||||
@ -370,10 +408,6 @@ func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows)
|
|||||||
objects = append(objects, object)
|
objects = append(objects, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return nil, Error.New("unable to delete object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(objects) == 0 {
|
if len(objects) == 0 {
|
||||||
objects = nil
|
objects = nil
|
||||||
}
|
}
|
||||||
@ -381,6 +415,32 @@ func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows)
|
|||||||
return objects, nil
|
return objects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) scanPendingObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
objects = make([]Object, 0, 10)
|
||||||
|
|
||||||
|
var object Object
|
||||||
|
for rows.Next() {
|
||||||
|
object.ProjectID = location.ProjectID
|
||||||
|
object.BucketName = location.BucketName
|
||||||
|
object.ObjectKey = location.ObjectKey
|
||||||
|
|
||||||
|
err = rows.Scan(&object.StreamID,
|
||||||
|
&object.CreatedAt, &object.ExpiresAt,
|
||||||
|
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
|
||||||
|
encryptionParameters{&object.Encryption},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, Error.New("unable to delete pending object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
object.Status = Pending
|
||||||
|
objects = append(objects, object)
|
||||||
|
}
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object.
|
// DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object.
|
||||||
type DeleteObjectLastCommitted struct {
|
type DeleteObjectLastCommitted struct {
|
||||||
ObjectLocation
|
ObjectLocation
|
||||||
|
@ -263,6 +263,281 @@ func TestDeletePendingObject(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeletePendingObjectNew(t *testing.T) {
|
||||||
|
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
|
obj := metabasetest.RandObjectStream()
|
||||||
|
now := time.Now()
|
||||||
|
zombieDeadline := now.Add(24 * time.Hour)
|
||||||
|
|
||||||
|
for _, test := range metabasetest.InvalidObjectStreams(obj) {
|
||||||
|
test := test
|
||||||
|
t.Run(test.Name, func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: test.ObjectStream,
|
||||||
|
},
|
||||||
|
ErrClass: test.ErrClass,
|
||||||
|
ErrText: test.ErrText,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("object missing", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrObjectNotFound,
|
||||||
|
ErrText: "metabase: no rows deleted",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("non existing object version", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.BeginObjectExactVersion{
|
||||||
|
Opts: metabase.BeginObjectExactVersion{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
},
|
||||||
|
Version: 1,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: metabase.ObjectStream{
|
||||||
|
ProjectID: obj.ProjectID,
|
||||||
|
BucketName: obj.BucketName,
|
||||||
|
ObjectKey: obj.ObjectKey,
|
||||||
|
Version: 33,
|
||||||
|
StreamID: obj.StreamID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrObjectNotFound,
|
||||||
|
ErrText: "metabase: no rows deleted",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
metabasetest.Verify{
|
||||||
|
Objects: []metabase.RawObject{
|
||||||
|
{
|
||||||
|
ObjectStream: obj,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Pending,
|
||||||
|
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
ZombieDeletionDeadline: &zombieDeadline,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("delete committed object", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
object := metabasetest.CreateObject(ctx, t, db, obj, 0)
|
||||||
|
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: object.ObjectStream,
|
||||||
|
},
|
||||||
|
ErrClass: &metabase.ErrObjectNotFound,
|
||||||
|
ErrText: "metabase: no rows deleted",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{
|
||||||
|
Objects: []metabase.RawObject{
|
||||||
|
{
|
||||||
|
ObjectStream: obj,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Committed,
|
||||||
|
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("without segments with wrong StreamID", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.BeginObjectExactVersion{
|
||||||
|
Opts: metabase.BeginObjectExactVersion{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
},
|
||||||
|
Version: 1,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: metabase.ObjectStream{
|
||||||
|
ProjectID: obj.ProjectID,
|
||||||
|
BucketName: obj.BucketName,
|
||||||
|
ObjectKey: obj.ObjectKey,
|
||||||
|
Version: obj.Version,
|
||||||
|
StreamID: uuid.UUID{33},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Result: metabase.DeleteObjectResult{},
|
||||||
|
ErrClass: &metabase.ErrObjectNotFound,
|
||||||
|
ErrText: "metabase: no rows deleted",
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{
|
||||||
|
Objects: []metabase.RawObject{
|
||||||
|
{
|
||||||
|
ObjectStream: obj,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Pending,
|
||||||
|
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
ZombieDeletionDeadline: &zombieDeadline,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("without segments", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
obj := obj
|
||||||
|
obj.Version = metabase.NextVersion
|
||||||
|
metabasetest.BeginObjectNextVersion{
|
||||||
|
Opts: metabase.BeginObjectNextVersion{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
UsePendingObjectsTable: true,
|
||||||
|
},
|
||||||
|
Version: 1,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
object := metabase.RawObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Pending,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
}
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
},
|
||||||
|
Result: metabase.DeleteObjectResult{
|
||||||
|
Objects: []metabase.Object{metabase.Object(object)},
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with segments", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
obj := obj
|
||||||
|
obj.Version = metabase.NextVersion
|
||||||
|
metabasetest.BeginObjectNextVersion{
|
||||||
|
Opts: metabase.BeginObjectNextVersion{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
UsePendingObjectsTable: true,
|
||||||
|
},
|
||||||
|
Version: 1,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.CommitSegment{
|
||||||
|
Opts: metabase.CommitSegment{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Position: metabase.SegmentPosition{Part: 0, Index: 0},
|
||||||
|
RootPieceID: testrand.PieceID(),
|
||||||
|
Pieces: metabase.Pieces{{
|
||||||
|
Number: 1,
|
||||||
|
StorageNode: testrand.NodeID(),
|
||||||
|
}},
|
||||||
|
|
||||||
|
EncryptedKey: testrand.Bytes(32),
|
||||||
|
EncryptedKeyNonce: testrand.Bytes(32),
|
||||||
|
|
||||||
|
EncryptedSize: 1024,
|
||||||
|
PlainSize: 512,
|
||||||
|
PlainOffset: 0,
|
||||||
|
Redundancy: metabasetest.DefaultRedundancy,
|
||||||
|
UsePendingObjectsTable: true,
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
},
|
||||||
|
Result: metabase.DeleteObjectResult{
|
||||||
|
Objects: []metabase.Object{
|
||||||
|
{
|
||||||
|
ObjectStream: obj,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Pending,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with inline segment", func(t *testing.T) {
|
||||||
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
obj := obj
|
||||||
|
obj.Version = metabase.NextVersion
|
||||||
|
metabasetest.BeginObjectNextVersion{
|
||||||
|
Opts: metabase.BeginObjectNextVersion{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
UsePendingObjectsTable: true,
|
||||||
|
},
|
||||||
|
Version: 1,
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.CommitInlineSegment{
|
||||||
|
Opts: metabase.CommitInlineSegment{
|
||||||
|
ObjectStream: obj,
|
||||||
|
Position: metabase.SegmentPosition{Part: 0, Index: 0},
|
||||||
|
|
||||||
|
EncryptedKey: testrand.Bytes(32),
|
||||||
|
EncryptedKeyNonce: testrand.Bytes(32),
|
||||||
|
|
||||||
|
InlineData: testrand.Bytes(1024),
|
||||||
|
|
||||||
|
PlainSize: 512,
|
||||||
|
PlainOffset: 0,
|
||||||
|
UsePendingObjectsTable: true,
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.DeletePendingObjectNew{
|
||||||
|
Opts: metabase.DeletePendingObject{
|
||||||
|
ObjectStream: obj,
|
||||||
|
},
|
||||||
|
Result: metabase.DeleteObjectResult{
|
||||||
|
Objects: []metabase.Object{
|
||||||
|
{
|
||||||
|
ObjectStream: obj,
|
||||||
|
CreatedAt: now,
|
||||||
|
Status: metabase.Pending,
|
||||||
|
Encryption: metabasetest.DefaultEncryption,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeleteObjectExactVersion(t *testing.T) {
|
func TestDeleteObjectExactVersion(t *testing.T) {
|
||||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||||
obj := metabasetest.RandObjectStream()
|
obj := metabasetest.RandObjectStream()
|
||||||
|
@ -32,6 +32,9 @@ func (obj *Object) IsMigrated() bool {
|
|||||||
return obj.TotalPlainSize <= 0
|
return obj.TotalPlainSize <= 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PendingObject pending object metadata.
|
||||||
|
type PendingObject RawPendingObject
|
||||||
|
|
||||||
// Segment segment metadata.
|
// Segment segment metadata.
|
||||||
// TODO define separated struct.
|
// TODO define separated struct.
|
||||||
type Segment RawSegment
|
type Segment RawSegment
|
||||||
@ -378,14 +381,6 @@ func (db *DB) TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUI
|
|||||||
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, Committed)
|
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, Committed)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestingAllPendingObjects gets all objects from bucket.
|
|
||||||
// Use only for testing purposes.
|
|
||||||
func (db *DB) TestingAllPendingObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []ObjectEntry, err error) {
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, Pending)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, status ObjectStatus) (objects []ObjectEntry, err error) {
|
func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, status ObjectStatus) (objects []ObjectEntry, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
@ -451,6 +446,23 @@ func (db *DB) TestingAllObjects(ctx context.Context) (objects []Object, err erro
|
|||||||
return objects, nil
|
return objects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestingAllPendingObjects gets all pending objects.
|
||||||
|
// Use only for testing purposes.
|
||||||
|
func (db *DB) TestingAllPendingObjects(ctx context.Context) (objects []PendingObject, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
rawObjects, err := db.testingGetAllPendingObjects(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, o := range rawObjects {
|
||||||
|
objects = append(objects, PendingObject(o))
|
||||||
|
}
|
||||||
|
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TestingAllSegments gets all segments.
|
// TestingAllSegments gets all segments.
|
||||||
// Use only for testing purposes.
|
// Use only for testing purposes.
|
||||||
func (db *DB) TestingAllSegments(ctx context.Context) (segments []Segment, err error) {
|
func (db *DB) TestingAllSegments(ctx context.Context) (segments []Segment, err error) {
|
||||||
|
@ -448,6 +448,26 @@ func (step DeletePendingObject) Check(ctx *testcontext.Context, t testing.TB, db
|
|||||||
require.Zero(t, diff)
|
require.Zero(t, diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeletePendingObjectNew is for testing metabase.DeletePendingObjectNew.
|
||||||
|
type DeletePendingObjectNew struct {
|
||||||
|
Opts metabase.DeletePendingObject
|
||||||
|
Result metabase.DeleteObjectResult
|
||||||
|
ErrClass *errs.Class
|
||||||
|
ErrText string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check runs the test.
|
||||||
|
func (step DeletePendingObjectNew) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||||
|
result, err := db.DeletePendingObjectNew(ctx, step.Opts)
|
||||||
|
checkError(t, err, step.ErrClass, step.ErrText)
|
||||||
|
|
||||||
|
sortObjects(result.Objects)
|
||||||
|
sortObjects(step.Result.Objects)
|
||||||
|
|
||||||
|
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
|
||||||
|
require.Zero(t, diff)
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteObjectsAllVersions is for testing metabase.DeleteObjectsAllVersions.
|
// DeleteObjectsAllVersions is for testing metabase.DeleteObjectsAllVersions.
|
||||||
type DeleteObjectsAllVersions struct {
|
type DeleteObjectsAllVersions struct {
|
||||||
Opts metabase.DeleteObjectsAllVersions
|
Opts metabase.DeleteObjectsAllVersions
|
||||||
|
@ -1086,7 +1086,7 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
|
|||||||
ObjectKey: metabase.ObjectKey(pbStreamID.EncryptedObjectKey),
|
ObjectKey: metabase.ObjectKey(pbStreamID.EncryptedObjectKey),
|
||||||
Version: metabase.Version(pbStreamID.Version),
|
Version: metabase.Version(pbStreamID.Version),
|
||||||
StreamID: streamID,
|
StreamID: streamID,
|
||||||
})
|
}, pbStreamID.UsePendingObjectsTable)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1510,11 +1510,17 @@ func (endpoint *Endpoint) DeleteCommittedObject(
|
|||||||
// NOTE: this method is exported for being able to individually test it without
|
// NOTE: this method is exported for being able to individually test it without
|
||||||
// having import cycles.
|
// having import cycles.
|
||||||
// TODO: see note on DeleteObjectAnyStatus.
|
// TODO: see note on DeleteObjectAnyStatus.
|
||||||
func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metabase.ObjectStream) (deletedObjects []*pb.Object, err error) {
|
func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metabase.ObjectStream, usePendingObjectTable bool) (deletedObjects []*pb.Object, err error) {
|
||||||
req := metabase.DeletePendingObject{
|
req := metabase.DeletePendingObject{
|
||||||
ObjectStream: stream,
|
ObjectStream: stream,
|
||||||
}
|
}
|
||||||
result, err := endpoint.metabase.DeletePendingObject(ctx, req)
|
|
||||||
|
var result metabase.DeleteObjectResult
|
||||||
|
if usePendingObjectTable {
|
||||||
|
result, err = endpoint.metabase.DeletePendingObjectNew(ctx, req)
|
||||||
|
} else {
|
||||||
|
result, err = endpoint.metabase.DeletePendingObject(ctx, req)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1583,13 +1583,45 @@ func TestEndpoint_DeletePendingObject(t *testing.T) {
|
|||||||
ObjectKey: metabase.ObjectKey(encryptedKey),
|
ObjectKey: metabase.ObjectKey(encryptedKey),
|
||||||
Version: metabase.DefaultVersion,
|
Version: metabase.DefaultVersion,
|
||||||
StreamID: streamID,
|
StreamID: streamID,
|
||||||
})
|
}, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, deletedObjects, 1)
|
require.Len(t, deletedObjects, 1)
|
||||||
}
|
}
|
||||||
testDeleteObject(t, createPendingObject, deletePendingObject)
|
testDeleteObject(t, createPendingObject, deletePendingObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEndpoint_AbortMultipartUpload_UsePendingObjectsTable(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
|
Reconfigure: testplanet.Reconfigure{
|
||||||
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
|
config.Metainfo.UsePendingObjectsTable = true
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ctx.Check(project.Close)
|
||||||
|
|
||||||
|
_, err = project.CreateBucket(ctx, "testbucket")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
uploadInfo, err := project.BeginUpload(ctx, "testbucket", "key", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, objects, 1)
|
||||||
|
|
||||||
|
err = project.AbortUpload(ctx, "testbucket", "key", uploadInfo.UploadID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objects, err = planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, objects, 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func testDeleteObject(t *testing.T,
|
func testDeleteObject(t *testing.T,
|
||||||
createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte),
|
createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte),
|
||||||
deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID),
|
deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID),
|
||||||
|
Loading…
Reference in New Issue
Block a user