satellite/metabase: make the precommit constraint code nicer

This condenses the precommit constraint into a single function, which
allows to cleanup a bunch of logic elsewhere. Also, avoid delete in the
first place when we are not allowed to remove the uncommited object.

This also fixes submitting the metrics and now it submits the correct
metrics.

Change-Id: If91dfa3b19bce5b24ff2a19d7c34b57a200db1dd
This commit is contained in:
Egon Elbre 2023-10-31 17:24:16 +02:00
parent e5e55ef266
commit f2eb6942c5
9 changed files with 190 additions and 168 deletions

View File

@ -637,6 +637,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
return Object{}, err
}
var precommit precommitConstraintResult
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
if err != nil {
@ -689,30 +690,17 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
encryptionParameters{&opts.Encryption},
}
var highestVersion Version
if opts.Versioned {
// TODO(ver): fold this into the queries below using a subquery.
maxVersion, err := db.queryHighestVersion(ctx, opts.Location(), tx)
if err != nil {
return Error.Wrap(err)
}
highestVersion = maxVersion
} else {
// TODO(ver): fold this into the query below using a subquery using `ON CONFLICT` on the unique index.
// Note, we are prematurely deleting the object without permissions
// and then rolling the action back, if we were not allowed to.
deleteResult, err := db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx)
if err != nil {
return Error.Wrap(err)
}
if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete {
return ErrPermissionDenied.New("no permissions to delete existing object")
}
highestVersion = deleteResult.MaxVersion
precommit, err = db.precommitConstraint(ctx, precommitConstraint{
Location: opts.Location(),
Versioned: opts.Versioned,
DisallowDelete: opts.DisallowDelete,
}, tx)
if err != nil {
return Error.Wrap(err)
}
if opts.UsePendingObjectsTable {
opts.Version = highestVersion + 1
opts.Version = precommit.HighestVersion + 1
args[versionArgIndex] = opts.Version
args = append(args,
@ -784,8 +772,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
}
} else {
nextVersion := opts.Version
if nextVersion < highestVersion {
nextVersion = highestVersion + 1
if nextVersion < precommit.HighestVersion {
nextVersion = precommit.HighestVersion + 1
}
args = append(args, nextVersion)
opts.Version = nextVersion
@ -859,6 +847,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
return Object{}, err
}
precommit.submitMetrics()
mon.Meter("object_commit").Mark(1)
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)

View File

@ -51,31 +51,18 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
return Object{}, nil, err
}
var deleted deleteObjectUnversionedCommittedResult
var precommit precommitConstraintResult
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
// TODO: should we prevent this from executing when the object has been committed
// currently this requires quite a lot of database communication, so invalid handling can be expensive.
var highestVersion Version
if !opts.Versioned {
var err error
// Note, we are prematurely deleting the object without permissions
// and then rolling the action back, if we were not allowed to.
deleted, err = db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx)
if err != nil {
return err
}
if deleted.DeletedObjectCount > 0 && opts.DisallowDelete {
return ErrPermissionDenied.New("no permissions to delete existing object")
}
highestVersion = deleted.MaxVersion
} else {
v, err := db.queryHighestVersion(ctx, opts.Location(), tx)
if err != nil {
return err
}
highestVersion = v
precommit, err = db.precommitConstraint(ctx, precommitConstraint{
Location: opts.Location(),
Versioned: opts.Versioned,
DisallowDelete: opts.DisallowDelete,
}, tx)
if err != nil {
return Error.Wrap(err)
}
segmentsInDatabase, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
@ -122,8 +109,8 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
nextStatus := committedWhereVersioned(opts.Versioned)
nextVersion := opts.Version
if nextVersion < highestVersion {
nextVersion = highestVersion + 1
if nextVersion < precommit.HighestVersion {
nextVersion = precommit.HighestVersion + 1
}
err = tx.QueryRowContext(ctx, `
@ -183,6 +170,8 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
return Object{}, nil, err
}
precommit.submitMetrics()
mon.Meter("object_commit").Mark(1)
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)

View File

@ -271,8 +271,8 @@ func TestCommitObjectWithSegments(t *testing.T) {
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
DisallowDelete: true,
},
ErrClass: &metabase.ErrPermissionDenied,
ErrText: "no permissions to delete existing object",
ErrClass: &metabase.Error,
ErrText: "permission denied: no permissions to delete existing object",
}.Check(ctx, t, db)
metabasetest.Verify{

View File

@ -423,11 +423,11 @@ const (
statusPending = "1"
statusCommittedUnversioned = "3"
statusCommittedVersioned = "4"
statusesCommitted = "(3,4)"
statusesCommitted = "(" + statusCommittedUnversioned + "," + statusCommittedVersioned + ")"
statusDeleteMarkerUnversioned = "5"
statusDeleteMarkerVersioned = "6"
statusesDeleteMarker = "(5,6)"
statusesUnversioned = "(3,5)"
statusesDeleteMarker = "(" + statusDeleteMarkerUnversioned + "," + statusDeleteMarkerVersioned + ")"
statusesUnversioned = "(" + statusCommittedUnversioned + "," + statusDeleteMarkerUnversioned + ")"
)
func committedWhereVersioned(versioned bool) ObjectStatus {
@ -437,18 +437,6 @@ func committedWhereVersioned(versioned bool) ObjectStatus {
return CommittedUnversioned
}
// stub uses so the linter wouldn't complain.
var (
_ = CommittedVersioned
_ = DeleteMarkerUnversioned
_ = DeleteMarkerVersioned
_ = statusCommittedVersioned
_ = statusesCommitted
_ = statusDeleteMarkerUnversioned
_ = statusDeleteMarkerVersioned
_ = statusesDeleteMarker
)
// IsDeleteMarker return whether the status is a delete marker.
func (status ObjectStatus) IsDeleteMarker() bool {
return status == DeleteMarkerUnversioned || status == DeleteMarkerVersioned

View File

@ -64,6 +64,15 @@ type FinishCopyObject struct {
VerifyLimits func(encryptedObjectSize int64, nSegments int64) error
}
// NewLocation returns the new object location.
func (finishCopy FinishCopyObject) NewLocation() ObjectLocation {
return ObjectLocation{
ProjectID: finishCopy.ProjectID,
BucketName: finishCopy.NewBucket,
ObjectKey: finishCopy.NewEncryptedObjectKey,
}
}
// Verify verifies metabase.FinishCopyObject data.
func (finishCopy FinishCopyObject) Verify() error {
if err := finishCopy.ObjectStream.Verify(); err != nil {
@ -111,6 +120,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
newObject := Object{}
var copyMetadata []byte
var precommit precommitConstraintResult
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
sourceObject, err := getObjectExactVersion(ctx, tx, opts)
if err != nil {
@ -225,34 +235,13 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
copyMetadata = sourceObject.EncryptedMetadata
}
var highestVersion Version
if !opts.NewVersioned {
// TODO(ver): this logic can probably merged into update query
//
// Note, we are prematurely deleting the object without permissions
// and then rolling the action back, if we were not allowed to.
deleted, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.NewBucket,
ObjectKey: opts.NewEncryptedObjectKey,
}, tx)
if err != nil {
return Error.New("unable to delete object at target location: %w", err)
}
if deleted.DeletedObjectCount > 0 && opts.NewDisallowDelete {
return ErrPermissionDenied.New("no permissions to delete existing object")
}
highestVersion = deleted.MaxVersion
} else {
highestVersion, err = db.queryHighestVersion(ctx, ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.NewBucket,
ObjectKey: opts.NewEncryptedObjectKey,
}, tx)
if err != nil {
return Error.New("unable to query highest version: %w", err)
}
precommit, err = db.precommitConstraint(ctx, precommitConstraint{
Location: opts.NewLocation(),
Versioned: opts.NewVersioned,
DisallowDelete: opts.NewDisallowDelete,
}, tx)
if err != nil {
return Error.Wrap(err)
}
newStatus := committedWhereVersioned(opts.NewVersioned)
@ -275,7 +264,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
)
RETURNING
created_at`,
opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, highestVersion+1, opts.NewStreamID,
opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, precommit.HighestVersion+1, opts.NewStreamID,
newStatus, sourceObject.ExpiresAt, sourceObject.SegmentCount,
encryptionParameters{&sourceObject.Encryption},
copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey,
@ -283,7 +272,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
)
newObject = sourceObject
newObject.Version = highestVersion + 1
newObject.Version = precommit.HighestVersion + 1
newObject.Status = newStatus
err = row.Scan(&newObject.CreatedAt)
@ -340,6 +329,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
newObject.EncryptedMetadataNonce = opts.NewEncryptedMetadataKeyNonce[:]
}
precommit.submitMetrics()
mon.Meter("finish_copy_object").Mark(1)
return newObject, nil

View File

@ -434,13 +434,15 @@ func (db *DB) DeleteObjectLastCommitted(
return DeleteObjectResult{}, Error.Wrap(err)
}
var precommit precommitConstraintResult
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
// TODO(ver) fold deleteObjectUnversionedCommitted into query below using ON CONFLICT
deleted, err := db.deleteObjectUnversionedCommitted(ctx, opts.ObjectLocation, tx)
precommit, err = db.precommitDeleteUnversioned(ctx, opts.ObjectLocation, tx)
if err != nil {
return Error.Wrap(err)
}
if deleted.MaxVersion == 0 {
// TODO(ver): currently it allows adding delete markers when pending objects are present.
if precommit.HighestVersion == 0 {
// an object didn't exist in the first place
return ErrObjectNotFound.New("unable to delete object")
}
@ -457,7 +459,7 @@ func (db *DB) DeleteObjectLastCommitted(
RETURNING
version,
created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, deleted.MaxVersion+1, deleterMarkerStreamID)
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, precommit.HighestVersion+1, deleterMarkerStreamID)
var marker Object
marker.ProjectID = opts.ProjectID
@ -472,9 +474,13 @@ func (db *DB) DeleteObjectLastCommitted(
}
result.Markers = append(result.Markers, marker)
result.Removed = deleted.Deleted
result.Removed = precommit.Deleted
return nil
})
if err != nil {
return result, err
}
precommit.submitMetrics()
return result, err
}
if opts.Versioned {

View File

@ -85,7 +85,7 @@ func sortRawSegments(segments []metabase.RawSegment) {
func checkError(t require.TestingT, err error, errClass *errs.Class, errText string) {
if errClass != nil {
require.True(t, errClass.Has(err), "expected an error %v got %v", *errClass, err)
require.True(t, errClass.Has(err), "expected an error %q got %q", *errClass, err)
}
if errText != "" {
require.EqualError(t, err, errClass.New(errText).Error())

View File

@ -133,6 +133,15 @@ type FinishMoveObject struct {
NewVersioned bool
}
// NewLocation returns the new object location.
func (finishMove FinishMoveObject) NewLocation() ObjectLocation {
return ObjectLocation{
ProjectID: finishMove.ProjectID,
BucketName: finishMove.NewBucket,
ObjectKey: finishMove.NewEncryptedObjectKey,
}
}
// Verify verifies metabase.FinishMoveObject data.
func (finishMove FinishMoveObject) Verify() error {
if err := finishMove.ObjectStream.Verify(); err != nil {
@ -157,36 +166,15 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
return err
}
var precommit precommitConstraintResult
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
var highestVersion Version
if !opts.NewVersioned {
// TODO(ver): this logic can probably merged into update query
//
// Note, we are prematurely deleting the object without permissions
// and then rolling the action back, if we were not allowed to.
deleted, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.NewBucket,
ObjectKey: opts.NewEncryptedObjectKey,
}, tx)
if err != nil {
return Error.New("unable to delete object at target location: %w", err)
}
if deleted.DeletedObjectCount > 0 && opts.NewDisallowDelete {
return ErrPermissionDenied.New("no permissions to delete existing object")
}
highestVersion = deleted.MaxVersion
} else {
highestVersion, err = db.queryHighestVersion(ctx, ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.NewBucket,
ObjectKey: opts.NewEncryptedObjectKey,
}, tx)
if err != nil {
return Error.New("unable to query highest version: %w", err)
}
precommit, err = db.precommitConstraint(ctx, precommitConstraint{
Location: opts.NewLocation(),
Versioned: opts.NewVersioned,
DisallowDelete: opts.NewDisallowDelete,
}, tx)
if err != nil {
return Error.Wrap(err)
}
var segmentsCount int
@ -219,7 +207,7 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
stream_id
`, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, opts.NewEncryptedMetadataKey,
opts.NewEncryptedMetadataKeyNonce, opts.ProjectID, []byte(opts.BucketName),
opts.ObjectKey, opts.Version, newStatus, highestVersion+1).
opts.ObjectKey, opts.Version, newStatus, precommit.HighestVersion+1).
Scan(&segmentsCount, &hasMetadata, &streamID)
if err != nil {
@ -282,6 +270,7 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err
return err
}
precommit.submitMetrics()
mon.Meter("finish_move_object").Mark(1)
return nil

View File

@ -12,28 +12,124 @@ import (
"storj.io/common/uuid"
)
type deleteObjectUnversionedCommittedResult struct {
type precommitConstraint struct {
Location ObjectLocation
Versioned bool
DisallowDelete bool
}
type precommitConstraintResult struct {
Deleted []Object
// DeletedObjectCount and DeletedSegmentCount return how many elements were deleted.
DeletedObjectCount int
// DeletedObjectCount returns how many objects were deleted.
DeletedObjectCount int
// DeletedSegmentCount returns how many segments were deleted.
DeletedSegmentCount int
// MaxVersion returns tha highest version that was present in the table.
// HighestVersion returns tha highest version that was present in the table.
// It returns 0 if there was none.
MaxVersion Version
HighestVersion Version
}
func (r *precommitConstraintResult) submitMetrics() {
mon.Meter("object_delete").Mark(r.DeletedObjectCount)
mon.Meter("segment_delete").Mark(r.DeletedSegmentCount)
}
type stmtRow interface {
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
// deleteObjectUnversionedCommitted deletes the unversioned object at the specified location inside a transaction.
//
// TODO(ver): this should have a better and clearer name.
func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLocation, stmt stmtRow) (result deleteObjectUnversionedCommittedResult, err error) {
// precommitConstraint ensures that only a single uncommitted object exists at the specified location.
func (db *DB) precommitConstraint(ctx context.Context, opts precommitConstraint, tx stmtRow) (result precommitConstraintResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Location.Verify(); err != nil {
return result, Error.Wrap(err)
}
if opts.Versioned {
highest, err := db.precommitQueryHighest(ctx, opts.Location, tx)
if err != nil {
return precommitConstraintResult{}, Error.Wrap(err)
}
result.HighestVersion = highest
return result, nil
}
if opts.DisallowDelete {
highest, unversionedExists, err := db.precommitQueryHighestAndUnversioned(ctx, opts.Location, tx)
if err != nil {
return precommitConstraintResult{}, Error.Wrap(err)
}
result.HighestVersion = highest
if unversionedExists {
return precommitConstraintResult{}, ErrPermissionDenied.New("no permissions to delete existing object")
}
return result, nil
}
return db.precommitDeleteUnversioned(ctx, opts.Location, tx)
}
// precommitQueryHighest queries the highest version for a given object.
func (db *DB) precommitQueryHighest(ctx context.Context, loc ObjectLocation, tx stmtRow) (highest Version, err error) {
defer mon.Task()(&ctx)(&err)
if err := loc.Verify(); err != nil {
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
return 0, Error.Wrap(err)
}
err = tx.QueryRowContext(ctx, `
SELECT COALESCE(MAX(version), 0) as version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest)
if err != nil {
return 0, Error.Wrap(err)
}
return highest, nil
}
// precommitQueryHighestAndUnversioned queries the highest version for a given object and whether an unversioned object or delete marker exists.
func (db *DB) precommitQueryHighestAndUnversioned(ctx context.Context, loc ObjectLocation, tx stmtRow) (highest Version, unversionedExists bool, err error) {
defer mon.Task()(&ctx)(&err)
if err := loc.Verify(); err != nil {
return 0, false, Error.Wrap(err)
}
err = tx.QueryRowContext(ctx, `
SELECT
(
SELECT COALESCE(MAX(version), 0) as version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
),
(
SELECT EXISTS (
SELECT 1
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) AND
status IN `+statusesUnversioned+`
)
)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest, &unversionedExists)
if err != nil {
return 0, false, Error.Wrap(err)
}
return highest, unversionedExists, nil
}
// precommitDeleteUnversioned deletes the unversioned object at loc and also returns the highest version.
func (db *DB) precommitDeleteUnversioned(ctx context.Context, loc ObjectLocation, tx stmtRow) (result precommitConstraintResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := loc.Verify(); err != nil {
return precommitConstraintResult{}, Error.Wrap(err)
}
var deleted Object
@ -49,7 +145,7 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo
var encryptionParams nullableValue[encryptionParameters]
encryptionParams.value.EncryptionParameters = &deleted.Encryption
err = stmt.QueryRowContext(ctx, `
err = tx.QueryRowContext(ctx, `
WITH highest_object AS (
SELECT MAX(version) as version
FROM objects
@ -105,11 +201,11 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo
&encryptionParams,
&result.DeletedObjectCount,
&result.DeletedSegmentCount,
&result.MaxVersion,
&result.HighestVersion,
)
if err != nil {
return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err)
return precommitConstraintResult{}, Error.Wrap(err)
}
deleted.ProjectID = loc.ProjectID
@ -126,10 +222,6 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo
deleted.TotalEncryptedSize = totalEncryptedSize.Int64
deleted.FixedSegmentSize = fixedSegmentSize.Int32
// TODO: this should happen outside of this func
mon.Meter("object_delete").Mark(result.DeletedObjectCount)
mon.Meter("segment_delete").Mark(result.DeletedObjectCount)
if result.DeletedObjectCount > 1 {
db.log.Error("object with multiple committed versions were found!",
zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName),
@ -146,25 +238,3 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo
return result, nil
}
// queryHighestVersion queries the latest version of an object inside an transaction.
//
// TODO(ver): this should have a better and clearer name.
func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt stmtRow) (highest Version, err error) {
defer mon.Task()(&ctx)(&err)
if err := loc.Verify(); err != nil {
return 0, Error.Wrap(err)
}
err = stmt.QueryRowContext(ctx, `
SELECT COALESCE(MAX(version), 0) as version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest)
if err != nil {
return 0, Error.Wrap(err)
}
return highest, nil
}