satellite/metabase: add CommitObjectWithSegments.Versioned
Change-Id: I4e1fc82b45cca94ff0b48d5b8d9deb6e13d0957b
This commit is contained in:
parent
1d5ea2d35c
commit
d3429fafd0
@ -18,6 +18,8 @@ import (
|
||||
)
|
||||
|
||||
// CommitObjectWithSegments contains arguments necessary for committing an object.
|
||||
//
|
||||
// TODO: not ready for production.
|
||||
type CommitObjectWithSegments struct {
|
||||
ObjectStream
|
||||
|
||||
@ -27,9 +29,18 @@ type CommitObjectWithSegments struct {
|
||||
|
||||
// TODO: this probably should use segment ranges rather than individual items
|
||||
Segments []SegmentPosition
|
||||
|
||||
// DisallowDelete indicates whether the user is allowed to overwrite
|
||||
// the previous unversioned object.
|
||||
DisallowDelete bool
|
||||
|
||||
// Versioned indicates whether an object is allowed to have multiple versions.
|
||||
Versioned bool
|
||||
}
|
||||
|
||||
// CommitObjectWithSegments commits pending object to the database.
|
||||
//
|
||||
// TODO: not ready for production.
|
||||
func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWithSegments) (object Object, deletedSegments []DeletedSegmentInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
@ -40,10 +51,25 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
||||
return Object{}, nil, err
|
||||
}
|
||||
|
||||
var previousObject deleteObjectUnversionedCommittedResult
|
||||
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
// TODO: should we prevent this from executing when the object has been committed
|
||||
// currently this requires quite a lot of database communication, so invalid handling can be expensive.
|
||||
|
||||
if !opts.Versioned {
|
||||
var err error
|
||||
// Note, we are prematurely deleting the object without permissions
|
||||
// and then rolling the action back, if we were not allowed to.
|
||||
previousObject, err = db.deleteObjectUnversionedCommitted(ctx, opts.Location(), tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if previousObject.DeletedObjectCount > 0 && opts.DisallowDelete {
|
||||
return ErrPermissionDenied.New("no permissions to delete existing object")
|
||||
}
|
||||
}
|
||||
|
||||
segmentsInDatabase, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -86,18 +112,20 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
||||
totalEncryptedSize += int64(seg.EncryptedSize)
|
||||
}
|
||||
|
||||
nextStatus := committedWhereVersioned(opts.Versioned)
|
||||
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
UPDATE objects SET
|
||||
status =`+statusCommittedUnversioned+`,
|
||||
segment_count = $6,
|
||||
status = $6,
|
||||
segment_count = $7,
|
||||
|
||||
encrypted_metadata_nonce = $7,
|
||||
encrypted_metadata = $8,
|
||||
encrypted_metadata_encrypted_key = $9,
|
||||
encrypted_metadata_nonce = $8,
|
||||
encrypted_metadata = $9,
|
||||
encrypted_metadata_encrypted_key = $10,
|
||||
|
||||
total_plain_size = $10,
|
||||
total_encrypted_size = $11,
|
||||
fixed_segment_size = $12,
|
||||
total_plain_size = $11,
|
||||
total_encrypted_size = $12,
|
||||
fixed_segment_size = $13,
|
||||
zombie_deletion_deadline = NULL
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
@ -109,7 +137,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
||||
RETURNING
|
||||
created_at, expires_at,
|
||||
encryption;
|
||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
|
||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, nextStatus,
|
||||
len(finalSegments),
|
||||
opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey,
|
||||
totalPlainSize,
|
||||
@ -132,7 +160,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
||||
object.BucketName = opts.BucketName
|
||||
object.ObjectKey = opts.ObjectKey
|
||||
object.Version = opts.Version
|
||||
object.Status = CommittedUnversioned
|
||||
object.Status = nextStatus
|
||||
object.SegmentCount = int32(len(finalSegments))
|
||||
object.EncryptedMetadataNonce = opts.EncryptedMetadataNonce
|
||||
object.EncryptedMetadata = opts.EncryptedMetadata
|
||||
|
@ -171,6 +171,139 @@ func TestCommitObjectWithSegments(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
// TODO(ver): add tests for DisallowDelete
|
||||
|
||||
t.Run("delete previous unversioned object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.CreateObject(ctx, t, db, obj, 1)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
metabasetest.CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now,
|
||||
Status: metabase.CommittedUnversioned,
|
||||
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("keep previous objects when committing versioned", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
unversionedStream := obj
|
||||
unversionedStream.Version = 3
|
||||
unversionedObject := metabasetest.CreateObject(ctx, t, db, unversionedStream, 0)
|
||||
versionedStream := obj
|
||||
versionedStream.Version = 4
|
||||
versionedObject := metabasetest.CreateObjectVersioned(ctx, t, db, versionedStream, 0)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
metabasetest.CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
|
||||
Versioned: true,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(unversionedObject),
|
||||
metabase.RawObject(versionedObject),
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now,
|
||||
Status: metabase.CommittedVersioned,
|
||||
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("segments missing in database", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user