satellite/metabase: add deleteObjectUnversionedCommitted
By using a separate function for deleting the latest object and fetching the latest version we can simplify some of the code. However, there can be more performant approaches, such as using ON CONFLICT for updating the existing object or using select and delete in the same query in databases that support it. Change-Id: I52bc3f9fa025f44d05ee010723ffb81f5bd2a2d7
This commit is contained in:
parent
2578580e21
commit
7ba8a627bc
@ -11,7 +11,6 @@ import (
|
|||||||
|
|
||||||
pgxerrcode "github.com/jackc/pgerrcode"
|
pgxerrcode "github.com/jackc/pgerrcode"
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"storj.io/common/memory"
|
"storj.io/common/memory"
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
@ -695,61 +694,21 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
encryptionParameters{&opts.Encryption},
|
encryptionParameters{&opts.Encryption},
|
||||||
}
|
}
|
||||||
|
|
||||||
versionsToDelete := []Version{}
|
deleteResult, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{
|
||||||
if err := withRows(tx.QueryContext(ctx, `
|
ProjectID: opts.ProjectID,
|
||||||
SELECT version
|
BucketName: opts.BucketName,
|
||||||
FROM objects
|
ObjectKey: opts.ObjectKey,
|
||||||
WHERE
|
}, tx)
|
||||||
project_id = $1 AND
|
if err != nil {
|
||||||
bucket_name = $2 AND
|
return Error.Wrap(err)
|
||||||
object_key = $3 AND
|
|
||||||
status = `+statusCommittedUnversioned,
|
|
||||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
|
|
||||||
for rows.Next() {
|
|
||||||
var version Version
|
|
||||||
if err := rows.Scan(&version); err != nil {
|
|
||||||
return Error.New("failed to scan previous object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
versionsToDelete = append(versionsToDelete, version)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
return Error.New("failed to find previous objects: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(versionsToDelete) > 1 {
|
if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete {
|
||||||
db.log.Warn("object with multiple committed versions were found!",
|
|
||||||
zap.Stringer("Project ID", opts.ProjectID), zap.String("Bucket Name", opts.BucketName),
|
|
||||||
zap.ByteString("Object Key", []byte(opts.ObjectKey)), zap.Int("deleted", len(versionsToDelete)))
|
|
||||||
|
|
||||||
mon.Meter("multiple_committed_versions").Mark(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(versionsToDelete) != 0 && opts.DisallowDelete {
|
|
||||||
return ErrPermissionDenied.New("no permissions to delete existing object")
|
return ErrPermissionDenied.New("no permissions to delete existing object")
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.UsePendingObjectsTable {
|
if opts.UsePendingObjectsTable {
|
||||||
// remove existing object(s) before inserting new one
|
opts.Version = deleteResult.MaxVersion + 1
|
||||||
// TODO after switching to pending_objects table completely we should
|
|
||||||
// be able to just delete all objects under this key and avoid
|
|
||||||
// selecting versions from above
|
|
||||||
for _, version := range versionsToDelete {
|
|
||||||
_, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{
|
|
||||||
ObjectLocation: ObjectLocation{
|
|
||||||
ProjectID: opts.ProjectID,
|
|
||||||
BucketName: opts.BucketName,
|
|
||||||
ObjectKey: opts.ObjectKey,
|
|
||||||
},
|
|
||||||
Version: version,
|
|
||||||
}, tx)
|
|
||||||
if err != nil {
|
|
||||||
return Error.New("failed to delete existing object: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
opts.Version = DefaultVersion
|
|
||||||
args[versionArgIndex] = opts.Version
|
args[versionArgIndex] = opts.Version
|
||||||
|
|
||||||
args = append(args,
|
args = append(args,
|
||||||
@ -878,20 +837,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
|||||||
}
|
}
|
||||||
return Error.New("failed to update object: %w", err)
|
return Error.New("failed to update object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, version := range versionsToDelete {
|
|
||||||
_, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{
|
|
||||||
ObjectLocation: ObjectLocation{
|
|
||||||
ProjectID: opts.ProjectID,
|
|
||||||
BucketName: opts.BucketName,
|
|
||||||
ObjectKey: opts.ObjectKey,
|
|
||||||
},
|
|
||||||
Version: version,
|
|
||||||
}, tx)
|
|
||||||
if err != nil {
|
|
||||||
return Error.New("failed to delete existing object: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object.StreamID = opts.StreamID
|
object.StreamID = opts.StreamID
|
||||||
|
@ -3262,6 +3262,8 @@ func TestCommitObject(t *testing.T) {
|
|||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// TODO(ver): tests for DisallowDelete = false/true and ErrPermissionDenied
|
||||||
|
|
||||||
t.Run("assign plain_offset", func(t *testing.T) {
|
t.Run("assign plain_offset", func(t *testing.T) {
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
@ -4246,7 +4248,7 @@ func TestCommitObject(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Version: metabase.PendingVersion,
|
Version: metabase.PendingVersion,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
obj.Version++
|
obj.Version = metabase.Version(i + 1)
|
||||||
|
|
||||||
expectedInlineData := testrand.Bytes(8)
|
expectedInlineData := testrand.Bytes(8)
|
||||||
expectedEncryptedKey := testrand.Bytes(32)
|
expectedEncryptedKey := testrand.Bytes(32)
|
||||||
|
@ -371,6 +371,8 @@ const (
|
|||||||
// Pending means that the object is being uploaded or that the client failed during upload.
|
// Pending means that the object is being uploaded or that the client failed during upload.
|
||||||
// The failed upload may be continued in the future.
|
// The failed upload may be continued in the future.
|
||||||
Pending = ObjectStatus(1)
|
Pending = ObjectStatus(1)
|
||||||
|
// Deleting used to one of the stages, which is not in use anymore.
|
||||||
|
_ = ObjectStatus(2)
|
||||||
// CommittedUnversioned means that the object is finished and should be visible for general listing.
|
// CommittedUnversioned means that the object is finished and should be visible for general listing.
|
||||||
CommittedUnversioned = ObjectStatus(3)
|
CommittedUnversioned = ObjectStatus(3)
|
||||||
// CommittedVersioned means that the object is finished and should be visible for general listing.
|
// CommittedVersioned means that the object is finished and should be visible for general listing.
|
||||||
@ -388,6 +390,7 @@ const (
|
|||||||
statusDeleteMarkerUnversioned = "5"
|
statusDeleteMarkerUnversioned = "5"
|
||||||
statusDeleteMarkerVersioned = "6"
|
statusDeleteMarkerVersioned = "6"
|
||||||
statusesDeleteMarker = "(5,6)"
|
statusesDeleteMarker = "(5,6)"
|
||||||
|
statusesUnversioned = "(3,5)"
|
||||||
)
|
)
|
||||||
|
|
||||||
// stub uses so the linter wouldn't complain.
|
// stub uses so the linter wouldn't complain.
|
||||||
|
@ -6,9 +6,11 @@ package metabase
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/private/dbutil/pgutil"
|
"storj.io/private/dbutil/pgutil"
|
||||||
@ -149,6 +151,9 @@ func (db *DB) DeleteObjectExactVersion(
|
|||||||
type stmt interface {
|
type stmt interface {
|
||||||
QueryContext(ctx context.Context, query string, args ...interface{}) (tagsql.Rows, error)
|
QueryContext(ctx context.Context, query string, args ...interface{}) (tagsql.Rows, error)
|
||||||
}
|
}
|
||||||
|
type stmtRow interface {
|
||||||
|
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||||
|
}
|
||||||
|
|
||||||
// implementation of DB.DeleteObjectExactVersion for re-use internally in metabase package.
|
// implementation of DB.DeleteObjectExactVersion for re-use internally in metabase package.
|
||||||
func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, stmt stmt) (result DeleteObjectResult, err error) {
|
func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, stmt stmt) (result DeleteObjectResult, err error) {
|
||||||
@ -287,6 +292,69 @@ func (db *DB) DeletePendingObjectNew(ctx context.Context, opts DeletePendingObje
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type deleteObjectUnversionedCommittedResult struct {
|
||||||
|
// DeletedObjectCount and DeletedSegmentCount return how many elements were deleted.
|
||||||
|
DeletedObjectCount int
|
||||||
|
DeletedSegmentCount int
|
||||||
|
// MaxVersion returns tha highest version that was present in the table.
|
||||||
|
// It returns 0 if there was none.
|
||||||
|
MaxVersion Version
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteObjectUnversionedCommitted deletes the unversioned object at the specified location inside a transaction.
|
||||||
|
//
|
||||||
|
// TODO(ver): this should have a better and clearer name.
|
||||||
|
func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLocation, stmt stmtRow) (result deleteObjectUnversionedCommittedResult, err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
|
||||||
|
if err := loc.Verify(); err != nil {
|
||||||
|
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = stmt.QueryRowContext(ctx, `
|
||||||
|
WITH highest_object AS (
|
||||||
|
SELECT MAX(version) as version
|
||||||
|
FROM objects
|
||||||
|
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||||
|
), deleted_objects AS (
|
||||||
|
DELETE FROM objects
|
||||||
|
WHERE
|
||||||
|
(project_id, bucket_name, object_key) = ($1, $2, $3)
|
||||||
|
AND status IN `+statusesUnversioned+`
|
||||||
|
RETURNING stream_id
|
||||||
|
), deleted_segments AS (
|
||||||
|
DELETE FROM segments
|
||||||
|
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
|
||||||
|
RETURNING segments.stream_id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
(SELECT count(*) FROM deleted_objects),
|
||||||
|
(SELECT count(*) FROM deleted_segments),
|
||||||
|
coalesce((SELECT version FROM highest_object), 0)
|
||||||
|
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).
|
||||||
|
Scan(&result.DeletedObjectCount, &result.DeletedSegmentCount, &result.MaxVersion)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: this should happen outside of this func
|
||||||
|
mon.Meter("object_delete").Mark(result.DeletedObjectCount)
|
||||||
|
mon.Meter("segment_delete").Mark(result.DeletedObjectCount)
|
||||||
|
|
||||||
|
if result.DeletedObjectCount > 1 {
|
||||||
|
db.log.Error("object with multiple committed versions were found!",
|
||||||
|
zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName),
|
||||||
|
zap.ByteString("Object Key", []byte(loc.ObjectKey)), zap.Int("deleted", result.DeletedObjectCount))
|
||||||
|
|
||||||
|
mon.Meter("multiple_committed_versions").Mark(1)
|
||||||
|
|
||||||
|
return result, Error.New("internal error: multiple committed unversioned objects")
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
|
||||||
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
Loading…
Reference in New Issue
Block a user