From f2eb6942c5b1be11e3d2683bf32dd7ad31013969 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 31 Oct 2023 17:24:16 +0200 Subject: [PATCH] satellite/metabase: make the precommit constraint code nicer This condenses the precommit constraint into a single function, which allows to cleanup a bunch of logic elsewhere. Also, avoid delete in the first place when we are not allowed to remove the uncommited object. This also fixes submitting the metrics and now it submits the correct metrics. Change-Id: If91dfa3b19bce5b24ff2a19d7c34b57a200db1dd --- satellite/metabase/commit.go | 36 ++---- satellite/metabase/commit_object.go | 35 ++--- satellite/metabase/commit_object_test.go | 4 +- satellite/metabase/common.go | 18 +-- satellite/metabase/copy_object.go | 50 +++----- satellite/metabase/delete.go | 16 ++- satellite/metabase/metabasetest/common.go | 2 +- satellite/metabase/move_object.go | 49 +++---- satellite/metabase/precommit.go | 148 ++++++++++++++++------ 9 files changed, 190 insertions(+), 168 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index 82182739e..74408b937 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -637,6 +637,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } + var precommit precommitConstraintResult err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID) if err != nil { @@ -689,30 +690,17 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec encryptionParameters{&opts.Encryption}, } - var highestVersion Version - if opts.Versioned { - // TODO(ver): fold this into the queries below using a subquery. - maxVersion, err := db.queryHighestVersion(ctx, opts.Location(), tx) - if err != nil { - return Error.Wrap(err) - } - highestVersion = maxVersion - } else { - // TODO(ver): fold this into the query below using a subquery using `ON CONFLICT` on the unique index. - // Note, we are prematurely deleting the object without permissions - // and then rolling the action back, if we were not allowed to. - deleteResult, err := db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx) - if err != nil { - return Error.Wrap(err) - } - if deleteResult.DeletedObjectCount > 0 && opts.DisallowDelete { - return ErrPermissionDenied.New("no permissions to delete existing object") - } - highestVersion = deleteResult.MaxVersion + precommit, err = db.precommitConstraint(ctx, precommitConstraint{ + Location: opts.Location(), + Versioned: opts.Versioned, + DisallowDelete: opts.DisallowDelete, + }, tx) + if err != nil { + return Error.Wrap(err) } if opts.UsePendingObjectsTable { - opts.Version = highestVersion + 1 + opts.Version = precommit.HighestVersion + 1 args[versionArgIndex] = opts.Version args = append(args, @@ -784,8 +772,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec } } else { nextVersion := opts.Version - if nextVersion < highestVersion { - nextVersion = highestVersion + 1 + if nextVersion < precommit.HighestVersion { + nextVersion = precommit.HighestVersion + 1 } args = append(args, nextVersion) opts.Version = nextVersion @@ -859,6 +847,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } + precommit.submitMetrics() + mon.Meter("object_commit").Mark(1) mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount)) mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize) diff --git a/satellite/metabase/commit_object.go b/satellite/metabase/commit_object.go index 67b8fa847..cd7784aad 100644 --- a/satellite/metabase/commit_object.go +++ b/satellite/metabase/commit_object.go @@ -51,31 +51,18 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit return Object{}, nil, err } - var deleted deleteObjectUnversionedCommittedResult - + var precommit precommitConstraintResult err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { // TODO: should we prevent this from executing when the object has been committed // currently this requires quite a lot of database communication, so invalid handling can be expensive. - var highestVersion Version - if !opts.Versioned { - var err error - // Note, we are prematurely deleting the object without permissions - // and then rolling the action back, if we were not allowed to. - deleted, err = db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx) - if err != nil { - return err - } - if deleted.DeletedObjectCount > 0 && opts.DisallowDelete { - return ErrPermissionDenied.New("no permissions to delete existing object") - } - highestVersion = deleted.MaxVersion - } else { - v, err := db.queryHighestVersion(ctx, opts.Location(), tx) - if err != nil { - return err - } - highestVersion = v + precommit, err = db.precommitConstraint(ctx, precommitConstraint{ + Location: opts.Location(), + Versioned: opts.Versioned, + DisallowDelete: opts.DisallowDelete, + }, tx) + if err != nil { + return Error.Wrap(err) } segmentsInDatabase, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID) @@ -122,8 +109,8 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit nextStatus := committedWhereVersioned(opts.Versioned) nextVersion := opts.Version - if nextVersion < highestVersion { - nextVersion = highestVersion + 1 + if nextVersion < precommit.HighestVersion { + nextVersion = precommit.HighestVersion + 1 } err = tx.QueryRowContext(ctx, ` @@ -183,6 +170,8 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit return Object{}, nil, err } + precommit.submitMetrics() + mon.Meter("object_commit").Mark(1) mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount)) mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize) diff --git a/satellite/metabase/commit_object_test.go b/satellite/metabase/commit_object_test.go index ffc0a27be..646af48fb 100644 --- a/satellite/metabase/commit_object_test.go +++ b/satellite/metabase/commit_object_test.go @@ -271,8 +271,8 @@ func TestCommitObjectWithSegments(t *testing.T) { EncryptedMetadataEncryptedKey: encryptedMetadataKey, DisallowDelete: true, }, - ErrClass: &metabase.ErrPermissionDenied, - ErrText: "no permissions to delete existing object", + ErrClass: &metabase.Error, + ErrText: "permission denied: no permissions to delete existing object", }.Check(ctx, t, db) metabasetest.Verify{ diff --git a/satellite/metabase/common.go b/satellite/metabase/common.go index e17fb38b8..295e873ac 100644 --- a/satellite/metabase/common.go +++ b/satellite/metabase/common.go @@ -423,11 +423,11 @@ const ( statusPending = "1" statusCommittedUnversioned = "3" statusCommittedVersioned = "4" - statusesCommitted = "(3,4)" + statusesCommitted = "(" + statusCommittedUnversioned + "," + statusCommittedVersioned + ")" statusDeleteMarkerUnversioned = "5" statusDeleteMarkerVersioned = "6" - statusesDeleteMarker = "(5,6)" - statusesUnversioned = "(3,5)" + statusesDeleteMarker = "(" + statusDeleteMarkerUnversioned + "," + statusDeleteMarkerVersioned + ")" + statusesUnversioned = "(" + statusCommittedUnversioned + "," + statusDeleteMarkerUnversioned + ")" ) func committedWhereVersioned(versioned bool) ObjectStatus { @@ -437,18 +437,6 @@ func committedWhereVersioned(versioned bool) ObjectStatus { return CommittedUnversioned } -// stub uses so the linter wouldn't complain. -var ( - _ = CommittedVersioned - _ = DeleteMarkerUnversioned - _ = DeleteMarkerVersioned - _ = statusCommittedVersioned - _ = statusesCommitted - _ = statusDeleteMarkerUnversioned - _ = statusDeleteMarkerVersioned - _ = statusesDeleteMarker -) - // IsDeleteMarker return whether the status is a delete marker. func (status ObjectStatus) IsDeleteMarker() bool { return status == DeleteMarkerUnversioned || status == DeleteMarkerVersioned diff --git a/satellite/metabase/copy_object.go b/satellite/metabase/copy_object.go index 780f7789c..e22e82e12 100644 --- a/satellite/metabase/copy_object.go +++ b/satellite/metabase/copy_object.go @@ -64,6 +64,15 @@ type FinishCopyObject struct { VerifyLimits func(encryptedObjectSize int64, nSegments int64) error } +// NewLocation returns the new object location. +func (finishCopy FinishCopyObject) NewLocation() ObjectLocation { + return ObjectLocation{ + ProjectID: finishCopy.ProjectID, + BucketName: finishCopy.NewBucket, + ObjectKey: finishCopy.NewEncryptedObjectKey, + } +} + // Verify verifies metabase.FinishCopyObject data. func (finishCopy FinishCopyObject) Verify() error { if err := finishCopy.ObjectStream.Verify(); err != nil { @@ -111,6 +120,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje newObject := Object{} var copyMetadata []byte + var precommit precommitConstraintResult err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { sourceObject, err := getObjectExactVersion(ctx, tx, opts) if err != nil { @@ -225,34 +235,13 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje copyMetadata = sourceObject.EncryptedMetadata } - var highestVersion Version - if !opts.NewVersioned { - // TODO(ver): this logic can probably merged into update query - // - // Note, we are prematurely deleting the object without permissions - // and then rolling the action back, if we were not allowed to. - deleted, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{ - ProjectID: opts.ProjectID, - BucketName: opts.NewBucket, - ObjectKey: opts.NewEncryptedObjectKey, - }, tx) - if err != nil { - return Error.New("unable to delete object at target location: %w", err) - } - if deleted.DeletedObjectCount > 0 && opts.NewDisallowDelete { - return ErrPermissionDenied.New("no permissions to delete existing object") - } - - highestVersion = deleted.MaxVersion - } else { - highestVersion, err = db.queryHighestVersion(ctx, ObjectLocation{ - ProjectID: opts.ProjectID, - BucketName: opts.NewBucket, - ObjectKey: opts.NewEncryptedObjectKey, - }, tx) - if err != nil { - return Error.New("unable to query highest version: %w", err) - } + precommit, err = db.precommitConstraint(ctx, precommitConstraint{ + Location: opts.NewLocation(), + Versioned: opts.NewVersioned, + DisallowDelete: opts.NewDisallowDelete, + }, tx) + if err != nil { + return Error.Wrap(err) } newStatus := committedWhereVersioned(opts.NewVersioned) @@ -275,7 +264,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje ) RETURNING created_at`, - opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, highestVersion+1, opts.NewStreamID, + opts.ProjectID, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, precommit.HighestVersion+1, opts.NewStreamID, newStatus, sourceObject.ExpiresAt, sourceObject.SegmentCount, encryptionParameters{&sourceObject.Encryption}, copyMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey, @@ -283,7 +272,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje ) newObject = sourceObject - newObject.Version = highestVersion + 1 + newObject.Version = precommit.HighestVersion + 1 newObject.Status = newStatus err = row.Scan(&newObject.CreatedAt) @@ -340,6 +329,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje newObject.EncryptedMetadataNonce = opts.NewEncryptedMetadataKeyNonce[:] } + precommit.submitMetrics() mon.Meter("finish_copy_object").Mark(1) return newObject, nil diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index c235cbab6..9823dc661 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -434,13 +434,15 @@ func (db *DB) DeleteObjectLastCommitted( return DeleteObjectResult{}, Error.Wrap(err) } + var precommit precommitConstraintResult err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { - // TODO(ver) fold deleteObjectUnversionedCommitted into query below using ON CONFLICT - deleted, err := db.deleteObjectUnversionedCommitted(ctx, opts.ObjectLocation, tx) + precommit, err = db.precommitDeleteUnversioned(ctx, opts.ObjectLocation, tx) if err != nil { return Error.Wrap(err) } - if deleted.MaxVersion == 0 { + // TODO(ver): currently it allows adding delete markers when pending objects are present. + if precommit.HighestVersion == 0 { + // an object didn't exist in the first place return ErrObjectNotFound.New("unable to delete object") } @@ -457,7 +459,7 @@ func (db *DB) DeleteObjectLastCommitted( RETURNING version, created_at - `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, deleted.MaxVersion+1, deleterMarkerStreamID) + `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, precommit.HighestVersion+1, deleterMarkerStreamID) var marker Object marker.ProjectID = opts.ProjectID @@ -472,9 +474,13 @@ func (db *DB) DeleteObjectLastCommitted( } result.Markers = append(result.Markers, marker) - result.Removed = deleted.Deleted + result.Removed = precommit.Deleted return nil }) + if err != nil { + return result, err + } + precommit.submitMetrics() return result, err } if opts.Versioned { diff --git a/satellite/metabase/metabasetest/common.go b/satellite/metabase/metabasetest/common.go index d482ab86e..c1825511b 100644 --- a/satellite/metabase/metabasetest/common.go +++ b/satellite/metabase/metabasetest/common.go @@ -85,7 +85,7 @@ func sortRawSegments(segments []metabase.RawSegment) { func checkError(t require.TestingT, err error, errClass *errs.Class, errText string) { if errClass != nil { - require.True(t, errClass.Has(err), "expected an error %v got %v", *errClass, err) + require.True(t, errClass.Has(err), "expected an error %q got %q", *errClass, err) } if errText != "" { require.EqualError(t, err, errClass.New(errText).Error()) diff --git a/satellite/metabase/move_object.go b/satellite/metabase/move_object.go index db5da1c5d..b6d3f645a 100644 --- a/satellite/metabase/move_object.go +++ b/satellite/metabase/move_object.go @@ -133,6 +133,15 @@ type FinishMoveObject struct { NewVersioned bool } +// NewLocation returns the new object location. +func (finishMove FinishMoveObject) NewLocation() ObjectLocation { + return ObjectLocation{ + ProjectID: finishMove.ProjectID, + BucketName: finishMove.NewBucket, + ObjectKey: finishMove.NewEncryptedObjectKey, + } +} + // Verify verifies metabase.FinishMoveObject data. func (finishMove FinishMoveObject) Verify() error { if err := finishMove.ObjectStream.Verify(); err != nil { @@ -157,36 +166,15 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err return err } + var precommit precommitConstraintResult err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { - var highestVersion Version - - if !opts.NewVersioned { - // TODO(ver): this logic can probably merged into update query - // - // Note, we are prematurely deleting the object without permissions - // and then rolling the action back, if we were not allowed to. - deleted, err := db.deleteObjectUnversionedCommitted(ctx, ObjectLocation{ - ProjectID: opts.ProjectID, - BucketName: opts.NewBucket, - ObjectKey: opts.NewEncryptedObjectKey, - }, tx) - if err != nil { - return Error.New("unable to delete object at target location: %w", err) - } - if deleted.DeletedObjectCount > 0 && opts.NewDisallowDelete { - return ErrPermissionDenied.New("no permissions to delete existing object") - } - - highestVersion = deleted.MaxVersion - } else { - highestVersion, err = db.queryHighestVersion(ctx, ObjectLocation{ - ProjectID: opts.ProjectID, - BucketName: opts.NewBucket, - ObjectKey: opts.NewEncryptedObjectKey, - }, tx) - if err != nil { - return Error.New("unable to query highest version: %w", err) - } + precommit, err = db.precommitConstraint(ctx, precommitConstraint{ + Location: opts.NewLocation(), + Versioned: opts.NewVersioned, + DisallowDelete: opts.NewDisallowDelete, + }, tx) + if err != nil { + return Error.Wrap(err) } var segmentsCount int @@ -219,7 +207,7 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err stream_id `, []byte(opts.NewBucket), opts.NewEncryptedObjectKey, opts.NewEncryptedMetadataKey, opts.NewEncryptedMetadataKeyNonce, opts.ProjectID, []byte(opts.BucketName), - opts.ObjectKey, opts.Version, newStatus, highestVersion+1). + opts.ObjectKey, opts.Version, newStatus, precommit.HighestVersion+1). Scan(&segmentsCount, &hasMetadata, &streamID) if err != nil { @@ -282,6 +270,7 @@ func (db *DB) FinishMoveObject(ctx context.Context, opts FinishMoveObject) (err return err } + precommit.submitMetrics() mon.Meter("finish_move_object").Mark(1) return nil diff --git a/satellite/metabase/precommit.go b/satellite/metabase/precommit.go index 408205cb4..01f7cc22e 100644 --- a/satellite/metabase/precommit.go +++ b/satellite/metabase/precommit.go @@ -12,28 +12,124 @@ import ( "storj.io/common/uuid" ) -type deleteObjectUnversionedCommittedResult struct { +type precommitConstraint struct { + Location ObjectLocation + + Versioned bool + DisallowDelete bool +} + +type precommitConstraintResult struct { Deleted []Object - // DeletedObjectCount and DeletedSegmentCount return how many elements were deleted. - DeletedObjectCount int + + // DeletedObjectCount returns how many objects were deleted. + DeletedObjectCount int + // DeletedSegmentCount returns how many segments were deleted. DeletedSegmentCount int - // MaxVersion returns tha highest version that was present in the table. + + // HighestVersion returns tha highest version that was present in the table. // It returns 0 if there was none. - MaxVersion Version + HighestVersion Version +} + +func (r *precommitConstraintResult) submitMetrics() { + mon.Meter("object_delete").Mark(r.DeletedObjectCount) + mon.Meter("segment_delete").Mark(r.DeletedSegmentCount) } type stmtRow interface { QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row } -// deleteObjectUnversionedCommitted deletes the unversioned object at the specified location inside a transaction. -// -// TODO(ver): this should have a better and clearer name. -func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLocation, stmt stmtRow) (result deleteObjectUnversionedCommittedResult, err error) { +// precommitConstraint ensures that only a single uncommitted object exists at the specified location. +func (db *DB) precommitConstraint(ctx context.Context, opts precommitConstraint, tx stmtRow) (result precommitConstraintResult, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Location.Verify(); err != nil { + return result, Error.Wrap(err) + } + + if opts.Versioned { + highest, err := db.precommitQueryHighest(ctx, opts.Location, tx) + if err != nil { + return precommitConstraintResult{}, Error.Wrap(err) + } + result.HighestVersion = highest + return result, nil + } + + if opts.DisallowDelete { + highest, unversionedExists, err := db.precommitQueryHighestAndUnversioned(ctx, opts.Location, tx) + if err != nil { + return precommitConstraintResult{}, Error.Wrap(err) + } + result.HighestVersion = highest + if unversionedExists { + return precommitConstraintResult{}, ErrPermissionDenied.New("no permissions to delete existing object") + } + return result, nil + } + + return db.precommitDeleteUnversioned(ctx, opts.Location, tx) +} + +// precommitQueryHighest queries the highest version for a given object. +func (db *DB) precommitQueryHighest(ctx context.Context, loc ObjectLocation, tx stmtRow) (highest Version, err error) { defer mon.Task()(&ctx)(&err) if err := loc.Verify(); err != nil { - return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err) + return 0, Error.Wrap(err) + } + + err = tx.QueryRowContext(ctx, ` + SELECT COALESCE(MAX(version), 0) as version + FROM objects + WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) + `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest) + if err != nil { + return 0, Error.Wrap(err) + } + + return highest, nil +} + +// precommitQueryHighestAndUnversioned queries the highest version for a given object and whether an unversioned object or delete marker exists. +func (db *DB) precommitQueryHighestAndUnversioned(ctx context.Context, loc ObjectLocation, tx stmtRow) (highest Version, unversionedExists bool, err error) { + defer mon.Task()(&ctx)(&err) + + if err := loc.Verify(); err != nil { + return 0, false, Error.Wrap(err) + } + + err = tx.QueryRowContext(ctx, ` + SELECT + ( + SELECT COALESCE(MAX(version), 0) as version + FROM objects + WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) + ), + ( + SELECT EXISTS ( + SELECT 1 + FROM objects + WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) AND + status IN `+statusesUnversioned+` + ) + ) + `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest, &unversionedExists) + if err != nil { + return 0, false, Error.Wrap(err) + } + + return highest, unversionedExists, nil +} + +// precommitDeleteUnversioned deletes the unversioned object at loc and also returns the highest version. +func (db *DB) precommitDeleteUnversioned(ctx context.Context, loc ObjectLocation, tx stmtRow) (result precommitConstraintResult, err error) { + defer mon.Task()(&ctx)(&err) + + if err := loc.Verify(); err != nil { + return precommitConstraintResult{}, Error.Wrap(err) } var deleted Object @@ -49,7 +145,7 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo var encryptionParams nullableValue[encryptionParameters] encryptionParams.value.EncryptionParameters = &deleted.Encryption - err = stmt.QueryRowContext(ctx, ` + err = tx.QueryRowContext(ctx, ` WITH highest_object AS ( SELECT MAX(version) as version FROM objects @@ -105,11 +201,11 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo &encryptionParams, &result.DeletedObjectCount, &result.DeletedSegmentCount, - &result.MaxVersion, + &result.HighestVersion, ) if err != nil { - return deleteObjectUnversionedCommittedResult{}, Error.Wrap(err) + return precommitConstraintResult{}, Error.Wrap(err) } deleted.ProjectID = loc.ProjectID @@ -126,10 +222,6 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo deleted.TotalEncryptedSize = totalEncryptedSize.Int64 deleted.FixedSegmentSize = fixedSegmentSize.Int32 - // TODO: this should happen outside of this func - mon.Meter("object_delete").Mark(result.DeletedObjectCount) - mon.Meter("segment_delete").Mark(result.DeletedObjectCount) - if result.DeletedObjectCount > 1 { db.log.Error("object with multiple committed versions were found!", zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName), @@ -146,25 +238,3 @@ func (db *DB) deleteObjectUnversionedCommitted(ctx context.Context, loc ObjectLo return result, nil } - -// queryHighestVersion queries the latest version of an object inside an transaction. -// -// TODO(ver): this should have a better and clearer name. -func (db *DB) queryHighestVersion(ctx context.Context, loc ObjectLocation, stmt stmtRow) (highest Version, err error) { - defer mon.Task()(&ctx)(&err) - - if err := loc.Verify(); err != nil { - return 0, Error.Wrap(err) - } - - err = stmt.QueryRowContext(ctx, ` - SELECT COALESCE(MAX(version), 0) as version - FROM objects - WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) - `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest) - if err != nil { - return 0, Error.Wrap(err) - } - - return highest, nil -}