satellite/metabase: BeginObjectNextVersion returns object

We plan to replace metabase.BeginObjectExactVersion usage in
metainfo.BeginObject with metabase.BeginObjectNextVersion. To make this
switch as simple a possible would be nice to have the same results for
both methods. This change is extending return value for
BeginObjectNextVersion to whole object struct. Tests were also adjusted
to be more like metabase.BeginObjectExactVersion tests.

Part of https://github.com/storj/storj/issues/4871

Change-Id: I4db99d74af07e5a73757b55233e0bbdc7b99d565
This commit is contained in:
Michal Niewrzal 2022-09-07 17:43:17 +02:00 committed by Storj Robot
parent fc9bd515fd
commit eea3fac0d3
3 changed files with 48 additions and 18 deletions

View File

@ -64,11 +64,11 @@ func (opts *BeginObjectNextVersion) Verify() error {
} }
// BeginObjectNextVersion adds a pending object to the database, with automatically assigned version. // BeginObjectNextVersion adds a pending object to the database, with automatically assigned version.
func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion) (committed Version, err error) { func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion) (object Object, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil { if err := opts.Verify(); err != nil {
return -1, err return Object{}, err
} }
if opts.ZombieDeletionDeadline == nil { if opts.ZombieDeletionDeadline == nil {
@ -76,7 +76,19 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
opts.ZombieDeletionDeadline = &deadline opts.ZombieDeletionDeadline = &deadline
} }
row := db.db.QueryRowContext(ctx, ` object = Object{
ObjectStream: ObjectStream{
ProjectID: opts.ProjectID,
BucketName: opts.BucketName,
ObjectKey: opts.ObjectKey,
StreamID: opts.StreamID,
},
ExpiresAt: opts.ExpiresAt,
Encryption: opts.Encryption,
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
}
if err := db.db.QueryRowContext(ctx, `
INSERT INTO objects ( INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id, project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption, expires_at, encryption,
@ -94,21 +106,18 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
$4, $5, $6, $4, $5, $6,
$7, $7,
$8, $9, $10) $8, $9, $10)
RETURNING version RETURNING status, version, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID, `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption}, opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline, opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey, opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
) ).Scan(&object.Status, &object.Version, &object.CreatedAt); err != nil {
return Object{}, Error.New("unable to insert object: %w", err)
var v int64
if err := row.Scan(&v); err != nil {
return -1, Error.New("unable to insert object: %w", err)
} }
mon.Meter("object_begin").Mark(1) mon.Meter("object_begin").Mark(1)
return Version(v), nil return object, nil
} }
// BeginObjectExactVersion contains arguments necessary for starting an object upload. // BeginObjectExactVersion contains arguments necessary for starting an object upload.

View File

@ -32,7 +32,28 @@ type BeginObjectNextVersion struct {
func (step BeginObjectNextVersion) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { func (step BeginObjectNextVersion) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
got, err := db.BeginObjectNextVersion(ctx, step.Opts) got, err := db.BeginObjectNextVersion(ctx, step.Opts)
checkError(t, err, step.ErrClass, step.ErrText) checkError(t, err, step.ErrClass, step.ErrText)
require.Equal(t, step.Version, got)
if step.ErrClass == nil {
require.Equal(t, step.Version, got.Version)
require.WithinDuration(t, time.Now(), got.CreatedAt, 5*time.Second)
require.Equal(t, step.Opts.ObjectStream.ProjectID, got.ObjectStream.ProjectID)
require.Equal(t, step.Opts.ObjectStream.BucketName, got.ObjectStream.BucketName)
require.Equal(t, step.Opts.ObjectStream.ObjectKey, got.ObjectStream.ObjectKey)
require.Equal(t, step.Opts.ObjectStream.StreamID, got.ObjectStream.StreamID)
require.Equal(t, metabase.Pending, got.Status)
require.Equal(t, step.Opts.ExpiresAt, got.ExpiresAt)
gotDeadline := got.ZombieDeletionDeadline
optsDeadline := step.Opts.ZombieDeletionDeadline
if optsDeadline == nil {
require.WithinDuration(t, time.Now().Add(24*time.Hour), *gotDeadline, 5*time.Second)
} else {
require.WithinDuration(t, *optsDeadline, *gotDeadline, 5*time.Second)
}
require.Equal(t, step.Opts.Encryption, got.Encryption)
}
} }
// BeginObjectExactVersion is for testing metabase.BeginObjectExactVersion. // BeginObjectExactVersion is for testing metabase.BeginObjectExactVersion.

View File

@ -47,7 +47,7 @@ func assertRPCStatusCode(t *testing.T, actualError error, expectedStatusCode rpc
require.Equal(t, expectedStatusCode, statusCode, "wrong %T, got %v", statusCode, actualError) require.Equal(t, expectedStatusCode, statusCode, "wrong %T, got %v", statusCode, actualError)
} }
func TestObject_NoStorageNodes(t *testing.T) { func TestEndpoint_Object_No_StorageNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, UplinkCount: 1, SatelliteCount: 1, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{ Reconfigure: testplanet.Reconfigure{
@ -509,7 +509,7 @@ func TestObject_NoStorageNodes(t *testing.T) {
committedObject := objects[0] committedObject := objects[0]
pendingObjectVersion, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ pendingObject, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{ ObjectStream: metabase.ObjectStream{
ProjectID: committedObject.ProjectID, ProjectID: committedObject.ProjectID,
BucketName: committedObject.BucketName, BucketName: committedObject.BucketName,
@ -518,7 +518,7 @@ func TestObject_NoStorageNodes(t *testing.T) {
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, committedObject.Version+1, pendingObjectVersion) require.Equal(t, committedObject.Version+1, pendingObject.Version)
getObjectResponse, err := satellite.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{ getObjectResponse, err := satellite.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()}, Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
@ -544,7 +544,7 @@ func TestObject_NoStorageNodes(t *testing.T) {
committedObject := objects[0] committedObject := objects[0]
pendingObjectVersion, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ pendingObject, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{ ObjectStream: metabase.ObjectStream{
ProjectID: committedObject.ProjectID, ProjectID: committedObject.ProjectID,
BucketName: committedObject.BucketName, BucketName: committedObject.BucketName,
@ -553,7 +553,7 @@ func TestObject_NoStorageNodes(t *testing.T) {
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, committedObject.Version+1, pendingObjectVersion) require.Equal(t, committedObject.Version+1, pendingObject.Version)
downloadObjectResponse, err := satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{ downloadObjectResponse, err := satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()}, Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
@ -799,7 +799,7 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
committedObject := objects[0] committedObject := objects[0]
pendingObjectVersion, err := sat.Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ pendingObject, err := sat.Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
ObjectStream: metabase.ObjectStream{ ObjectStream: metabase.ObjectStream{
ProjectID: committedObject.ProjectID, ProjectID: committedObject.ProjectID,
BucketName: committedObject.BucketName, BucketName: committedObject.BucketName,
@ -808,7 +808,7 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, committedObject.Version+1, pendingObjectVersion) require.Equal(t, committedObject.Version+1, pendingObject.Version)
newIps, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones") newIps, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
require.NoError(t, err) require.NoError(t, err)