From a767aed5919f561e858371565e7e2cf75e8aa724 Mon Sep 17 00:00:00 2001 From: Kaloyan Raev Date: Thu, 8 Jul 2021 17:50:37 +0300 Subject: [PATCH] satellite/{metabase,metainfo}: require StreamID for UpdateObjectMetadata This avoids corrupting objects if reuploads and metadata updates happen concurrently. --- go.mod | 2 +- go.sum | 4 +-- satellite/metabase/metabasetest/test.go | 10 +++--- satellite/metabase/metadata.go | 47 +++++++++++-------------- satellite/metabase/metadata_test.go | 45 +++++++++++++++-------- satellite/metainfo/endpoint_test.go | 20 +++++++---- satellite/metainfo/metainfo.go | 16 +++++++-- 7 files changed, 87 insertions(+), 57 deletions(-) diff --git a/go.mod b/go.mod index 66c2835e7..89b9a2345 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( google.golang.org/api v0.20.0 // indirect gopkg.in/segmentio/analytics-go.v3 v3.1.0 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c - storj.io/common v0.0.0-20210702123130-0f973652e4bb + storj.io/common v0.0.0-20210708125041-4882a3ae3eda storj.io/drpc v0.0.23 storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 storj.io/private v0.0.0-20210625132526-af46b647eda5 diff --git a/go.sum b/go.sum index 9ab502be7..a6fb41c43 100644 --- a/go.sum +++ b/go.sum @@ -871,8 +871,8 @@ sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3 storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0= storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ= storj.io/common v0.0.0-20210504141454-bcb03a80052f/go.mod h1:PdP3eTld9RqSV3E4K44JSlw7Z/zNsymj9rnKuHFKhJE= -storj.io/common v0.0.0-20210702123130-0f973652e4bb h1:p6cKlUcTJAgOnsW5ILRJ/WsxylRq6ys74ZDkxI+PHXE= -storj.io/common v0.0.0-20210702123130-0f973652e4bb/go.mod h1:SN7WwEhNC2PTSuIJDj8184eeBmq+VEDwGqJzrlPera4= +storj.io/common v0.0.0-20210708125041-4882a3ae3eda h1:UtmPvs92TUEwCWal1LYfXS7lg+X34tQI5giMqwu0dVo= +storj.io/common v0.0.0-20210708125041-4882a3ae3eda/go.mod h1:SN7WwEhNC2PTSuIJDj8184eeBmq+VEDwGqJzrlPera4= storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw= storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA= storj.io/drpc v0.0.20/go.mod h1:eAxUDk8HWvGl9iqznpuphtZ+WIjIGPJFqNXuKHgRiMM= diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 4438dbfb5..305f71d82 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -155,16 +155,16 @@ func (step DeleteBucketObjects) Check(ctx *testcontext.Context, t testing.TB, db checkError(t, err, step.ErrClass, step.ErrText) } -// SetObjectMetadataLatestVersion is for testing metabase.SetObjectMetadataLatestVersion. -type SetObjectMetadataLatestVersion struct { - Opts metabase.SetObjectMetadataLatestVersion +// UpdateObjectMetadata is for testing metabase.UpdateObjectMetadata. +type UpdateObjectMetadata struct { + Opts metabase.UpdateObjectMetadata ErrClass *errs.Class ErrText string } // Check runs the test. -func (step SetObjectMetadataLatestVersion) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { - err := db.SetObjectMetadataLatestVersion(ctx, step.Opts) +func (step UpdateObjectMetadata) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { + err := db.UpdateObjectMetadata(ctx, step.Opts) checkError(t, err, step.ErrClass, step.ErrText) } diff --git a/satellite/metabase/metadata.go b/satellite/metabase/metadata.go index 46282fee5..7c4f9c1d0 100644 --- a/satellite/metabase/metadata.go +++ b/satellite/metabase/metadata.go @@ -9,23 +9,27 @@ import ( "storj.io/common/storj" ) -// SetObjectMetadataLatestVersion contains arguments necessary for replacing an object metadata. -type SetObjectMetadataLatestVersion struct { - ObjectLocation +// UpdateObjectMetadata contains arguments necessary for replacing an object metadata. +type UpdateObjectMetadata struct { + ObjectStream EncryptedMetadata []byte EncryptedMetadataNonce []byte EncryptedMetadataEncryptedKey []byte } -// SetObjectMetadataLatestVersion replaces an object metadata. -func (db *DB) SetObjectMetadataLatestVersion(ctx context.Context, opts SetObjectMetadataLatestVersion) (err error) { +// UpdateObjectMetadata updates an object metadata. +func (db *DB) UpdateObjectMetadata(ctx context.Context, opts UpdateObjectMetadata) (err error) { defer mon.Task()(&ctx)(&err) - if err := opts.ObjectLocation.Verify(); err != nil { + if err := opts.ObjectStream.Verify(); err != nil { return err } + if opts.ObjectStream.Version <= 0 { + return ErrInvalidRequest.New("Version invalid: %v", opts.Version) + } + // TODO So the issue is that during a multipart upload of an object, // uplink can update object metadata. If we add the arguments EncryptedMetadata // to CommitObject, they will need to account for them being optional. @@ -33,26 +37,17 @@ func (db *DB) SetObjectMetadataLatestVersion(ctx context.Context, opts SetObject // during commit object. result, err := db.db.ExecContext(ctx, ` UPDATE objects SET - encrypted_metadata_nonce = $4, - encrypted_metadata = $5, - encrypted_metadata_encrypted_key = $6 - FROM ( - SELECT version, stream_id FROM objects WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - status = `+committedStatus+` - ORDER BY version DESC - LIMIT 1 - ) AS latest_object + encrypted_metadata_nonce = $6, + encrypted_metadata = $7, + encrypted_metadata_encrypted_key = $8 WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - objects.version = latest_object.version AND - objects.stream_id = latest_object.stream_id AND - status = `+committedStatus, - opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + stream_id = $5 AND + status = `+committedStatus, + opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID, opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey) if err != nil { return Error.New("unable to update object metadata: %w", err) @@ -65,7 +60,7 @@ func (db *DB) SetObjectMetadataLatestVersion(ctx context.Context, opts SetObject if affected == 0 { return storj.ErrObjectNotFound.Wrap( - Error.New("object with specified committed status is missing"), + Error.New("object with specified version and committed status is missing"), ) } diff --git a/satellite/metabase/metadata_test.go b/satellite/metabase/metadata_test.go index 8ba892c76..5d5c4fd9d 100644 --- a/satellite/metabase/metadata_test.go +++ b/satellite/metabase/metadata_test.go @@ -14,20 +14,18 @@ import ( "storj.io/storj/satellite/metabase/metabasetest" ) -func TestSetObjectMetadataLatestVersion(t *testing.T) { +func TestUpdateObjectMetadata(t *testing.T) { metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { obj := metabasetest.RandObjectStream() now := time.Now() - location := obj.Location() - - for _, test := range metabasetest.InvalidObjectLocations(location) { + for _, test := range metabasetest.InvalidObjectStreams(obj) { test := test t.Run(test.Name, func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) - metabasetest.SetObjectMetadataLatestVersion{ - Opts: metabase.SetObjectMetadataLatestVersion{ - ObjectLocation: test.ObjectLocation, + metabasetest.UpdateObjectMetadata{ + Opts: metabase.UpdateObjectMetadata{ + ObjectStream: test.ObjectStream, }, ErrClass: test.ErrClass, ErrText: test.ErrText, @@ -36,15 +34,34 @@ func TestSetObjectMetadataLatestVersion(t *testing.T) { }) } + t.Run("Version invalid", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.UpdateObjectMetadata{ + Opts: metabase.UpdateObjectMetadata{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 0, + StreamID: obj.StreamID, + }, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Version invalid: 0", + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + t.Run("Object missing", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) - metabasetest.SetObjectMetadataLatestVersion{ - Opts: metabase.SetObjectMetadataLatestVersion{ - ObjectLocation: location, + metabasetest.UpdateObjectMetadata{ + Opts: metabase.UpdateObjectMetadata{ + ObjectStream: obj, }, ErrClass: &storj.ErrObjectNotFound, - ErrText: "metabase: object with specified committed status is missing", + ErrText: "metabase: object with specified version and committed status is missing", }.Check(ctx, t, db) metabasetest.Verify{}.Check(ctx, t, db) }) @@ -69,9 +86,9 @@ func TestSetObjectMetadataLatestVersion(t *testing.T) { }, }.Check(ctx, t, db) - metabasetest.SetObjectMetadataLatestVersion{ - Opts: metabase.SetObjectMetadataLatestVersion{ - ObjectLocation: location, + metabasetest.UpdateObjectMetadata{ + Opts: metabase.UpdateObjectMetadata{ + ObjectStream: obj, EncryptedMetadata: encryptedMetadata, EncryptedMetadataNonce: encryptedMetadataNonce[:], EncryptedMetadataEncryptedKey: encryptedMetadataKey, diff --git a/satellite/metainfo/endpoint_test.go b/satellite/metainfo/endpoint_test.go index fc0789825..049aa7ef3 100644 --- a/satellite/metainfo/endpoint_test.go +++ b/satellite/metainfo/endpoint_test.go @@ -585,11 +585,15 @@ func TestEndpoint_UpdateObjectMetadata(t *testing.T) { objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx) require.NoError(t, err) require.Len(t, objects, 1) - // TODO: we expect no encrypted metadata at this point, but for some reason there are 111 bytes of metadata. - // assert.Nil(t, objects[0].EncryptedMetadata, len(objects[0].EncryptedMetadata)) - assert.Nil(t, objects[0].EncryptedMetadataEncryptedKey) - zeroNonce := storj.Nonce{} - assert.Equal(t, zeroNonce[:], objects[0].EncryptedMetadataNonce) + + getResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + Bucket: []byte("testbucket"), + EncryptedPath: []byte(objects[0].ObjectKey), + }) + require.NoError(t, err) testEncryptedMetadata := testrand.BytesInt(64) testEncryptedMetadataEncryptedKey := testrand.BytesInt(32) @@ -600,8 +604,10 @@ func TestEndpoint_UpdateObjectMetadata(t *testing.T) { Header: &pb.RequestHeader{ ApiKey: apiKey.SerializeRaw(), }, - Bucket: []byte("testbucket"), - EncryptedObjectKey: []byte(objects[0].ObjectKey), + Bucket: getResp.Object.Bucket, + EncryptedObjectKey: getResp.Object.EncryptedPath, + Version: getResp.Object.Version, + StreamId: getResp.Object.StreamId, EncryptedMetadataNonce: testEncryptedMetadataNonce, EncryptedMetadata: testEncryptedMetadata, EncryptedMetadataEncryptedKey: testEncryptedMetadataEncryptedKey, diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index f085edb36..f7b9c7819 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1580,11 +1580,23 @@ func (endpoint *Endpoint) UpdateObjectMetadata(ctx context.Context, req *pb.Obje return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) } - err = endpoint.metainfo.metabaseDB.SetObjectMetadataLatestVersion(ctx, metabase.SetObjectMetadataLatestVersion{ - ObjectLocation: metabase.ObjectLocation{ + streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + id, err := uuid.FromBytes(streamID.StreamId) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + err = endpoint.metainfo.metabaseDB.UpdateObjectMetadata(ctx, metabase.UpdateObjectMetadata{ + ObjectStream: metabase.ObjectStream{ ProjectID: keyInfo.ProjectID, BucketName: string(req.Bucket), ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey), + Version: metabase.Version(req.Version), + StreamID: id, }, EncryptedMetadata: req.EncryptedMetadata, EncryptedMetadataNonce: req.EncryptedMetadataNonce[:],