satellite/metainfo/metabase: allow to set Encryption while committing object

In some cases we need to set encryption parameters later, with CommitObject method. This change makes Encryption optional with BeginObject* methods and mandatory with CommitObject if not set earlier.

Change-Id: I812c9b0e8fc213ca32d4758e0e68227e0e9bdd32
This commit is contained in:
Michał Niewrzał 2021-01-14 11:24:29 +01:00
parent 2d087c54b1
commit 292caa9043
2 changed files with 130 additions and 127 deletions

View File

@ -46,12 +46,7 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
return -1, err
}
switch {
case opts.Encryption.IsZero() || opts.Encryption.CipherSuite == storj.EncUnspecified:
return -1, ErrInvalidRequest.New("Encryption is missing")
case opts.Encryption.BlockSize <= 0:
return -1, ErrInvalidRequest.New("Encryption.BlockSize is negative or zero")
case opts.Version != NextVersion:
if opts.Version != NextVersion {
return -1, ErrInvalidRequest.New("Version should be metabase.NextVersion")
}
@ -102,12 +97,7 @@ func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExact
return Object{}, err
}
switch {
case opts.Encryption.IsZero() || opts.Encryption.CipherSuite == storj.EncUnspecified:
return Object{}, ErrInvalidRequest.New("Encryption is missing")
case opts.Encryption.BlockSize <= 0:
return Object{}, ErrInvalidRequest.New("Encryption.BlockSize is negative or zero")
case opts.Version == NextVersion:
if opts.Version == NextVersion {
return Object{}, ErrInvalidRequest.New("Version should not be metabase.NextVersion")
}
@ -410,6 +400,8 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment)
type CommitObject struct {
ObjectStream
Encryption storj.EncryptionParameters
EncryptedMetadata []byte
EncryptedMetadataNonce []byte
EncryptedMetadataEncryptedKey []byte
@ -423,6 +415,10 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
return Object{}, err
}
if opts.Encryption.CipherSuite != storj.EncUnspecified && opts.Encryption.BlockSize <= 0 {
return Object{}, ErrInvalidRequest.New("Encryption.BlockSize is negative or zero")
}
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
type Segment struct {
Position SegmentPosition
@ -506,7 +502,14 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
total_plain_size = $10,
total_encrypted_size = $11,
fixed_segment_size = $12,
zombie_deletion_deadline = NULL
zombie_deletion_deadline = NULL,
-- TODO should we allow to override existing encryption parameters or return error if don't match with opts?
encryption = CASE
WHEN objects.encryption = 0 AND $13 <> 0 THEN $13
WHEN objects.encryption = 0 AND $13 = 0 THEN NULL
ELSE objects.encryption
END
WHERE
project_id = $1 AND
bucket_name = $2 AND
@ -523,6 +526,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
totalPlainSize,
totalEncryptedSize,
fixedSegmentSize,
encryptionParameters{&opts.Encryption},
).
Scan(
&object.CreatedAt, &object.ExpiresAt,
@ -531,6 +535,9 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return storj.ErrObjectNotFound.Wrap(Error.New("object with specified version and pending status is missing"))
} else if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
// TODO maybe we should check message if 'encryption' label is there
return ErrInvalidRequest.New("Encryption is missing")
}
return Error.New("failed to update object: %w", err)
}

View File

@ -145,63 +145,6 @@ func TestBeginObjectNextVersion(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("Encryption invalid", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: 5,
StreamID: obj.StreamID,
},
Encryption: storj.EncryptionParameters{},
},
Version: -1,
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption is missing",
}.Check(ctx, t, db)
BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: 5,
StreamID: obj.StreamID,
},
Encryption: storj.EncryptionParameters{
BlockSize: 123,
},
},
Version: -1,
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption is missing",
}.Check(ctx, t, db)
BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: 5,
StreamID: obj.StreamID,
},
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: -123,
},
},
Version: -1,
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption.BlockSize is negative or zero",
}.Check(ctx, t, db)
})
t.Run("NextVersion", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
@ -316,63 +259,6 @@ func TestBeginObjectExactVersion(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("Encryption invalid", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: metabase.NextVersion,
StreamID: obj.StreamID,
},
Encryption: storj.EncryptionParameters{},
},
Version: -1,
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption is missing",
}.Check(ctx, t, db)
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: metabase.NextVersion,
StreamID: obj.StreamID,
},
Encryption: storj.EncryptionParameters{
BlockSize: 123,
},
},
Version: -1,
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption is missing",
}.Check(ctx, t, db)
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: metabase.NextVersion,
StreamID: obj.StreamID,
},
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: -123,
},
},
Version: -1,
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption.BlockSize is negative or zero",
}.Check(ctx, t, db)
})
t.Run("Specific version", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
@ -1749,6 +1635,116 @@ func TestCommitObject(t *testing.T) {
},
}.Check(ctx, t, db)
})
t.Run("commit with encryption", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
},
Version: 1,
}.Check(ctx, t, db)
now := time.Now()
CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: storj.EncryptionParameters{},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption is missing",
}.Check(ctx, t, db)
CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption.BlockSize is negative or zero",
}.Check(ctx, t, db)
CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: -1,
},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Encryption.BlockSize is negative or zero",
}.Check(ctx, t, db)
CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 512,
},
},
}.Check(ctx, t, db)
Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 0,
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncAESGCM,
BlockSize: 512,
},
},
},
}.Check(ctx, t, db)
})
t.Run("commit with encryption (no override)", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: defaultTestEncryption,
},
Version: 1,
}.Check(ctx, t, db)
now := time.Now()
CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
// set different encryption than with BeginObjectExactVersion
Encryption: storj.EncryptionParameters{
CipherSuite: storj.EncNull,
BlockSize: 512,
},
},
}.Check(ctx, t, db)
Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 0,
Encryption: defaultTestEncryption,
},
},
}.Check(ctx, t, db)
})
})
}