satellite/metabase: implement SetObjectMetadataLatestVersion

It replaces UpdateMetadataObject that hasn't been used anywhere yet.

Change-Id: I1b2131acd75924c487a1c3ca099674251c5c9b56
This commit is contained in:
Kaloyan Raev 2021-07-02 17:12:43 +03:00 committed by Sean Harvey
parent 68627e7d80
commit a6086d421f
5 changed files with 177 additions and 164 deletions

View File

@ -552,63 +552,3 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
return object, nil
}
// UpdateObjectMetadata contains arguments necessary for updating an object metadata.
type UpdateObjectMetadata struct {
ObjectStream
EncryptedMetadata []byte
EncryptedMetadataNonce []byte
EncryptedMetadataEncryptedKey []byte
}
// UpdateObjectMetadata updates an object metadata.
func (db *DB) UpdateObjectMetadata(ctx context.Context, opts UpdateObjectMetadata) (err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.ObjectStream.Verify(); err != nil {
return err
}
if opts.ObjectStream.Version <= 0 {
return ErrInvalidRequest.New("Version invalid: %v", opts.Version)
}
// TODO So the issue is that during a multipart upload of an object,
// uplink can update object metadata. If we add the arguments EncryptedMetadata
// to CommitObject, they will need to account for them being optional.
// Leading to scenarios where uplink calls update metadata, but wants to clear them
// during commit object.
result, err := db.db.ExecContext(ctx, `
UPDATE objects SET
encrypted_metadata_nonce = $6,
encrypted_metadata = $7,
encrypted_metadata_encrypted_key = $8
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
stream_id = $5 AND
status = `+committedStatus,
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey)
if err != nil {
return Error.New("unable to update object metadata: %w", err)
}
affected, err := result.RowsAffected()
if err != nil {
return Error.New("failed to get rows affected: %w", err)
}
if affected == 0 {
return storj.ErrObjectNotFound.Wrap(
Error.New("object with specified version and committed status is missing"),
)
}
mon.Meter("object_update_metadata").Mark(1)
return nil
}

View File

@ -2288,102 +2288,3 @@ func TestCommitObject(t *testing.T) {
})
})
}
func TestUpdateObjectMetadata(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()
now := time.Now()
for _, test := range metabasetest.InvalidObjectStreams(obj) {
test := test
t.Run(test.Name, func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.UpdateObjectMetadata{
Opts: metabase.UpdateObjectMetadata{
ObjectStream: test.ObjectStream,
},
ErrClass: test.ErrClass,
ErrText: test.ErrText,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
}
t.Run("Version invalid", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.UpdateObjectMetadata{
Opts: metabase.UpdateObjectMetadata{
ObjectStream: metabase.ObjectStream{
ProjectID: obj.ProjectID,
BucketName: obj.BucketName,
ObjectKey: obj.ObjectKey,
Version: 0,
StreamID: obj.StreamID,
},
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Version invalid: 0",
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("Object missing", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.UpdateObjectMetadata{
Opts: metabase.UpdateObjectMetadata{
ObjectStream: obj,
},
ErrClass: &storj.ErrObjectNotFound,
ErrText: "metabase: object with specified version and committed status is missing",
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("Update metadata", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 0)
encryptedMetadata := testrand.Bytes(1024)
encryptedMetadataNonce := testrand.Nonce()
encryptedMetadataKey := testrand.Bytes(265)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
metabasetest.UpdateObjectMetadata{
Opts: metabase.UpdateObjectMetadata{
ObjectStream: obj,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
},
},
}.Check(ctx, t, db)
})
})
}

View File

@ -155,16 +155,16 @@ func (step DeleteBucketObjects) Check(ctx *testcontext.Context, t testing.TB, db
checkError(t, err, step.ErrClass, step.ErrText)
}
// UpdateObjectMetadata is for testing metabase.UpdateObjectMetadata.
type UpdateObjectMetadata struct {
Opts metabase.UpdateObjectMetadata
// SetObjectMetadataLatestVersion is for testing metabase.SetObjectMetadataLatestVersion.
type SetObjectMetadataLatestVersion struct {
Opts metabase.SetObjectMetadataLatestVersion
ErrClass *errs.Class
ErrText string
}
// Check runs the test.
func (step UpdateObjectMetadata) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
err := db.UpdateObjectMetadata(ctx, step.Opts)
func (step SetObjectMetadataLatestVersion) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
err := db.SetObjectMetadataLatestVersion(ctx, step.Opts)
checkError(t, err, step.ErrClass, step.ErrText)
}

View File

@ -0,0 +1,75 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"storj.io/common/storj"
)
// SetObjectMetadataLatestVersion contains arguments necessary for replacing an object metadata.
type SetObjectMetadataLatestVersion struct {
ObjectLocation
EncryptedMetadata []byte
EncryptedMetadataNonce []byte
EncryptedMetadataEncryptedKey []byte
}
// SetObjectMetadataLatestVersion replaces an object metadata.
func (db *DB) SetObjectMetadataLatestVersion(ctx context.Context, opts SetObjectMetadataLatestVersion) (err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.ObjectLocation.Verify(); err != nil {
return err
}
// TODO So the issue is that during a multipart upload of an object,
// uplink can update object metadata. If we add the arguments EncryptedMetadata
// to CommitObject, they will need to account for them being optional.
// Leading to scenarios where uplink calls update metadata, but wants to clear them
// during commit object.
result, err := db.db.ExecContext(ctx, `
UPDATE objects SET
encrypted_metadata_nonce = $4,
encrypted_metadata = $5,
encrypted_metadata_encrypted_key = $6
FROM (
SELECT version, stream_id FROM objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = `+committedStatus+`
ORDER BY version DESC
LIMIT 1
) AS latest_object
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
objects.version = latest_object.version AND
objects.stream_id = latest_object.stream_id AND
status = `+committedStatus,
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey),
opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey)
if err != nil {
return Error.New("unable to update object metadata: %w", err)
}
affected, err := result.RowsAffected()
if err != nil {
return Error.New("failed to get rows affected: %w", err)
}
if affected == 0 {
return storj.ErrObjectNotFound.Wrap(
Error.New("object with specified committed status is missing"),
)
}
mon.Meter("object_update_metadata").Mark(1)
return nil
}

View File

@ -0,0 +1,97 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase_test
import (
"testing"
"time"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
)
func TestSetObjectMetadataLatestVersion(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()
now := time.Now()
location := obj.Location()
for _, test := range metabasetest.InvalidObjectLocations(location) {
test := test
t.Run(test.Name, func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.SetObjectMetadataLatestVersion{
Opts: metabase.SetObjectMetadataLatestVersion{
ObjectLocation: test.ObjectLocation,
},
ErrClass: test.ErrClass,
ErrText: test.ErrText,
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
}
t.Run("Object missing", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.SetObjectMetadataLatestVersion{
Opts: metabase.SetObjectMetadataLatestVersion{
ObjectLocation: location,
},
ErrClass: &storj.ErrObjectNotFound,
ErrText: "metabase: object with specified committed status is missing",
}.Check(ctx, t, db)
metabasetest.Verify{}.Check(ctx, t, db)
})
t.Run("Update metadata", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 0)
encryptedMetadata := testrand.Bytes(1024)
encryptedMetadataNonce := testrand.Nonce()
encryptedMetadataKey := testrand.Bytes(265)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
},
},
}.Check(ctx, t, db)
metabasetest.SetObjectMetadataLatestVersion{
Opts: metabase.SetObjectMetadataLatestVersion{
ObjectLocation: location,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
},
}.Check(ctx, t, db)
metabasetest.Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
Encryption: metabasetest.DefaultEncryption,
EncryptedMetadata: encryptedMetadata,
EncryptedMetadataNonce: encryptedMetadataNonce[:],
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
},
},
}.Check(ctx, t, db)
})
})
}