// Copyright (C) 2023 Storj Labs, Inc. // See LICENSE for copying information. package metabase import ( "context" "database/sql" "go.uber.org/zap" "storj.io/common/uuid" ) type precommitConstraint struct { Location ObjectLocation Versioned bool DisallowDelete bool } type precommitConstraintResult struct { Deleted []Object // DeletedObjectCount returns how many objects were deleted. DeletedObjectCount int // DeletedSegmentCount returns how many segments were deleted. DeletedSegmentCount int // HighestVersion returns tha highest version that was present in the table. // It returns 0 if there was none. HighestVersion Version } func (r *precommitConstraintResult) submitMetrics() { mon.Meter("object_delete").Mark(r.DeletedObjectCount) mon.Meter("segment_delete").Mark(r.DeletedSegmentCount) } type stmtRow interface { QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row } // precommitConstraint ensures that only a single uncommitted object exists at the specified location. func (db *DB) precommitConstraint(ctx context.Context, opts precommitConstraint, tx stmtRow) (result precommitConstraintResult, err error) { defer mon.Task()(&ctx)(&err) if err := opts.Location.Verify(); err != nil { return result, Error.Wrap(err) } if opts.Versioned { highest, err := db.precommitQueryHighest(ctx, opts.Location, tx) if err != nil { return precommitConstraintResult{}, Error.Wrap(err) } result.HighestVersion = highest return result, nil } if opts.DisallowDelete { highest, unversionedExists, err := db.precommitQueryHighestAndUnversioned(ctx, opts.Location, tx) if err != nil { return precommitConstraintResult{}, Error.Wrap(err) } result.HighestVersion = highest if unversionedExists { return precommitConstraintResult{}, ErrPermissionDenied.New("no permissions to delete existing object") } return result, nil } return db.precommitDeleteUnversioned(ctx, opts.Location, tx) } // precommitQueryHighest queries the highest version for a given object. func (db *DB) precommitQueryHighest(ctx context.Context, loc ObjectLocation, tx stmtRow) (highest Version, err error) { defer mon.Task()(&ctx)(&err) if err := loc.Verify(); err != nil { return 0, Error.Wrap(err) } err = tx.QueryRowContext(ctx, ` SELECT COALESCE(MAX(version), 0) as version FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest) if err != nil { return 0, Error.Wrap(err) } return highest, nil } // precommitQueryHighestAndUnversioned queries the highest version for a given object and whether an unversioned object or delete marker exists. func (db *DB) precommitQueryHighestAndUnversioned(ctx context.Context, loc ObjectLocation, tx stmtRow) (highest Version, unversionedExists bool, err error) { defer mon.Task()(&ctx)(&err) if err := loc.Verify(); err != nil { return 0, false, Error.Wrap(err) } err = tx.QueryRowContext(ctx, ` SELECT ( SELECT COALESCE(MAX(version), 0) as version FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) ), ( SELECT EXISTS ( SELECT 1 FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) AND status IN `+statusesUnversioned+` ) ) `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest, &unversionedExists) if err != nil { return 0, false, Error.Wrap(err) } return highest, unversionedExists, nil } // precommitDeleteUnversioned deletes the unversioned object at loc and also returns the highest version. func (db *DB) precommitDeleteUnversioned(ctx context.Context, loc ObjectLocation, tx stmtRow) (result precommitConstraintResult, err error) { defer mon.Task()(&ctx)(&err) if err := loc.Verify(); err != nil { return precommitConstraintResult{}, Error.Wrap(err) } var deleted Object // TODO(ver): this scanning can probably simplified somehow. var version sql.NullInt64 var streamID uuid.NullUUID var createdAt sql.NullTime var segmentCount, fixedSegmentSize sql.NullInt32 var totalPlainSize, totalEncryptedSize sql.NullInt64 var status sql.NullByte var encryptionParams nullableValue[encryptionParameters] encryptionParams.value.EncryptionParameters = &deleted.Encryption err = tx.QueryRowContext(ctx, ` WITH highest_object AS ( SELECT MAX(version) as version FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) ), deleted_objects AS ( DELETE FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) AND status IN `+statusesUnversioned+` RETURNING version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, fixed_segment_size, encryption ), deleted_segments AS ( DELETE FROM segments WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) RETURNING segments.stream_id ) SELECT (SELECT version FROM deleted_objects), (SELECT stream_id FROM deleted_objects), (SELECT created_at FROM deleted_objects), (SELECT expires_at FROM deleted_objects), (SELECT status FROM deleted_objects), (SELECT segment_count FROM deleted_objects), (SELECT encrypted_metadata_nonce FROM deleted_objects), (SELECT encrypted_metadata FROM deleted_objects), (SELECT encrypted_metadata_encrypted_key FROM deleted_objects), (SELECT total_plain_size FROM deleted_objects), (SELECT total_encrypted_size FROM deleted_objects), (SELECT fixed_segment_size FROM deleted_objects), (SELECT encryption FROM deleted_objects), (SELECT count(*) FROM deleted_objects), (SELECT count(*) FROM deleted_segments), coalesce((SELECT version FROM highest_object), 0) `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey). Scan( &version, &streamID, &createdAt, &deleted.ExpiresAt, &status, &segmentCount, &deleted.EncryptedMetadataNonce, &deleted.EncryptedMetadata, &deleted.EncryptedMetadataEncryptedKey, &totalPlainSize, &totalEncryptedSize, &fixedSegmentSize, &encryptionParams, &result.DeletedObjectCount, &result.DeletedSegmentCount, &result.HighestVersion, ) if err != nil { return precommitConstraintResult{}, Error.Wrap(err) } deleted.ProjectID = loc.ProjectID deleted.BucketName = loc.BucketName deleted.ObjectKey = loc.ObjectKey deleted.Version = Version(version.Int64) deleted.Status = ObjectStatus(status.Byte) deleted.StreamID = streamID.UUID deleted.CreatedAt = createdAt.Time deleted.SegmentCount = segmentCount.Int32 deleted.TotalPlainSize = totalPlainSize.Int64 deleted.TotalEncryptedSize = totalEncryptedSize.Int64 deleted.FixedSegmentSize = fixedSegmentSize.Int32 if result.DeletedObjectCount > 1 { db.log.Error("object with multiple committed versions were found!", zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName), zap.ByteString("Object Key", []byte(loc.ObjectKey)), zap.Int("deleted", result.DeletedObjectCount)) mon.Meter("multiple_committed_versions").Mark(1) return result, Error.New("internal error: multiple committed unversioned objects") } if result.DeletedObjectCount > 0 { result.Deleted = append(result.Deleted, deleted) } return result, nil } type precommitConstraintWithNonPendingResult struct { Deleted []Object // DeletedObjectCount returns how many objects were deleted. DeletedObjectCount int // DeletedSegmentCount returns how many segments were deleted. DeletedSegmentCount int // HighestVersion returns tha highest version that was present in the table. // It returns 0 if there was none. HighestVersion Version // HighestNonPendingVersion returns tha highest non-pending version that was present in the table. // It returns 0 if there was none. HighestNonPendingVersion Version } func (r *precommitConstraintWithNonPendingResult) submitMetrics() { mon.Meter("object_delete").Mark(r.DeletedObjectCount) mon.Meter("segment_delete").Mark(r.DeletedSegmentCount) } // precommitDeleteUnversionedWithNonPending deletes the unversioned object at loc and also returns the highest version and highest committed version. func (db *DB) precommitDeleteUnversionedWithNonPending(ctx context.Context, loc ObjectLocation, tx stmtRow) (result precommitConstraintWithNonPendingResult, err error) { defer mon.Task()(&ctx)(&err) if err := loc.Verify(); err != nil { return precommitConstraintWithNonPendingResult{}, Error.Wrap(err) } var deleted Object // TODO(ver): this scanning can probably simplified somehow. var version sql.NullInt64 var streamID uuid.NullUUID var createdAt sql.NullTime var segmentCount, fixedSegmentSize sql.NullInt32 var totalPlainSize, totalEncryptedSize sql.NullInt64 var status sql.NullByte var encryptionParams nullableValue[encryptionParameters] encryptionParams.value.EncryptionParameters = &deleted.Encryption err = tx.QueryRowContext(ctx, ` WITH highest_object AS ( SELECT MAX(version) as version FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) ), highest_non_pending_object AS ( SELECT MAX(version) as version FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) AND status <> `+statusPending+` ), deleted_objects AS ( DELETE FROM objects WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) AND status IN `+statusesUnversioned+` RETURNING version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size, fixed_segment_size, encryption ), deleted_segments AS ( DELETE FROM segments WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) RETURNING segments.stream_id ) SELECT (SELECT version FROM deleted_objects), (SELECT stream_id FROM deleted_objects), (SELECT created_at FROM deleted_objects), (SELECT expires_at FROM deleted_objects), (SELECT status FROM deleted_objects), (SELECT segment_count FROM deleted_objects), (SELECT encrypted_metadata_nonce FROM deleted_objects), (SELECT encrypted_metadata FROM deleted_objects), (SELECT encrypted_metadata_encrypted_key FROM deleted_objects), (SELECT total_plain_size FROM deleted_objects), (SELECT total_encrypted_size FROM deleted_objects), (SELECT fixed_segment_size FROM deleted_objects), (SELECT encryption FROM deleted_objects), (SELECT count(*) FROM deleted_objects), (SELECT count(*) FROM deleted_segments), coalesce((SELECT version FROM highest_object), 0), coalesce((SELECT version FROM highest_non_pending_object), 0) `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey). Scan( &version, &streamID, &createdAt, &deleted.ExpiresAt, &status, &segmentCount, &deleted.EncryptedMetadataNonce, &deleted.EncryptedMetadata, &deleted.EncryptedMetadataEncryptedKey, &totalPlainSize, &totalEncryptedSize, &fixedSegmentSize, &encryptionParams, &result.DeletedObjectCount, &result.DeletedSegmentCount, &result.HighestVersion, &result.HighestNonPendingVersion, ) if err != nil { return precommitConstraintWithNonPendingResult{}, Error.Wrap(err) } deleted.ProjectID = loc.ProjectID deleted.BucketName = loc.BucketName deleted.ObjectKey = loc.ObjectKey deleted.Version = Version(version.Int64) deleted.Status = ObjectStatus(status.Byte) deleted.StreamID = streamID.UUID deleted.CreatedAt = createdAt.Time deleted.SegmentCount = segmentCount.Int32 deleted.TotalPlainSize = totalPlainSize.Int64 deleted.TotalEncryptedSize = totalEncryptedSize.Int64 deleted.FixedSegmentSize = fixedSegmentSize.Int32 if result.DeletedObjectCount > 1 { db.log.Error("object with multiple committed versions were found!", zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName), zap.ByteString("Object Key", []byte(loc.ObjectKey)), zap.Int("deleted", result.DeletedObjectCount)) mon.Meter("multiple_committed_versions").Mark(1) return result, Error.New("internal error: multiple committed unversioned objects") } if result.DeletedObjectCount > 0 { result.Deleted = append(result.Deleted, deleted) } return result, nil }