From 2abe709b6e8ec029b0a83235cd86b07d5766746b Mon Sep 17 00:00:00 2001 From: Qweder93 Date: Thu, 28 Jul 2022 15:41:00 +0300 Subject: [PATCH] satellite/metabase: add metabase.GetObjectLatestVersion back Restored GetObjectLatestVersion and renamed it to GetObjectLastCommitted Add test cases to cover server-side copy Closes https://github.com/storj/storj/issues/4866 Change-Id: I343b339a60152b8fb92fda97baf80bd8fe60d631 --- satellite/metabase/bench_test.go | 16 ++ satellite/metabase/get.go | 84 ++++++++++ satellite/metabase/get_test.go | 199 ++++++++++++++++++++++++ satellite/metabase/metabasetest/test.go | 16 ++ satellite/metainfo/endpoint_object.go | 10 +- 5 files changed, 320 insertions(+), 5 deletions(-) diff --git a/satellite/metabase/bench_test.go b/satellite/metabase/bench_test.go index 6235807de..c728ee2b2 100644 --- a/satellite/metabase/bench_test.go +++ b/satellite/metabase/bench_test.go @@ -278,6 +278,22 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB) } }) + b.Run("GetObjectLastCommitted", func(b *testing.B) { + m := make(Metrics, 0, b.N*len(s.objectStream)) + defer m.Report(b, "ns/obj") + + for i := 0; i < b.N; i++ { + for _, object := range s.objectStream { + m.Record(func() { + _, err := db.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{ + ObjectLocation: object.Location(), + }) + require.NoError(b, err) + }) + } + } + }) + b.Run("GetSegmentByPosition", func(b *testing.B) { m := make(Metrics, 0, b.N*len(s.objectStream)*s.parts*s.segments) defer m.Report(b, "ns/seg") diff --git a/satellite/metabase/get.go b/satellite/metabase/get.go index 691f71c65..4e076b03a 100644 --- a/satellite/metabase/get.go +++ b/satellite/metabase/get.go @@ -10,9 +10,11 @@ import ( "time" "github.com/zeebo/errs" + "go.uber.org/zap" "storj.io/common/storj" "storj.io/common/uuid" + "storj.io/private/tagsql" ) // ErrSegmentNotFound is an error class for non-existing segment. @@ -119,6 +121,88 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers return object, nil } +// GetObjectLastCommitted contains arguments necessary for fetching +// an object information for last committed version. +type GetObjectLastCommitted struct { + ObjectLocation +} + +// GetObjectLastCommitted returns object information for last committed version. +func (db *DB) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted) (_ Object, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return Object{}, err + } + + var object Object + + err = withRows(db.db.QueryContext(ctx, ` + SELECT + stream_id, version, + created_at, expires_at, + segment_count, + encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, + total_plain_size, total_encrypted_size, fixed_segment_size, + encryption + FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + status = `+committedStatus+` AND + (expires_at IS NULL OR expires_at > now()) + ORDER BY version desc + `, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error { + objectFound := false + for rows.Next() { + var scannedObject Object + if err = rows.Scan( + &scannedObject.StreamID, &scannedObject.Version, + &scannedObject.CreatedAt, &scannedObject.ExpiresAt, + &scannedObject.SegmentCount, + &scannedObject.EncryptedMetadataNonce, &scannedObject.EncryptedMetadata, &scannedObject.EncryptedMetadataEncryptedKey, + &scannedObject.TotalPlainSize, &scannedObject.TotalEncryptedSize, &scannedObject.FixedSegmentSize, + encryptionParameters{&scannedObject.Encryption}, + ); err != nil { + return Error.New("unable to query object status: %w", err) + } + + if objectFound { + db.log.Warn("object with multiple committed versions were found!", + zap.Stringer("Project ID", scannedObject.ProjectID), zap.String("Bucket Name", scannedObject.BucketName), + zap.String("Object Key", string(scannedObject.ObjectKey)), zap.Int("Version", int(scannedObject.Version)), + zap.Stringer("Stream ID", scannedObject.StreamID)) + mon.Meter("multiple_committed_versions").Mark(1) + continue + } + object = scannedObject + + objectFound = true + } + + if !objectFound { + return sql.ErrNoRows + } + + return nil + }) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err)) + } + + return Object{}, Error.New("unable to query object status: %w", err) + } + + object.ProjectID = opts.ProjectID + object.BucketName = opts.BucketName + object.ObjectKey = opts.ObjectKey + object.Status = Committed + + return object, nil +} + // GetSegmentByPosition contains arguments necessary for fetching a segment on specific position. type GetSegmentByPosition struct { StreamID uuid.UUID diff --git a/satellite/metabase/get_test.go b/satellite/metabase/get_test.go index 14f29a538..85aa9a4e8 100644 --- a/satellite/metabase/get_test.go +++ b/satellite/metabase/get_test.go @@ -187,6 +187,205 @@ func TestGetObjectExactVersion(t *testing.T) { }) } +func TestGetObjectLastCommitted(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + location := obj.Location() + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + for _, test := range metabasetest.InvalidObjectLocations(location) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + metabasetest.GetObjectLastCommitted{ + Opts: metabase.GetObjectLastCommitted{ + ObjectLocation: test.ObjectLocation, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + } + + t.Run("Object missing", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + metabasetest.GetObjectLastCommitted{ + Opts: metabase.GetObjectLastCommitted{ + ObjectLocation: location, + }, + ErrClass: &storj.ErrObjectNotFound, + ErrText: "metabase: sql: no rows in result set", + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("Get pending object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.GetObjectLastCommitted{ + Opts: metabase.GetObjectLastCommitted{ + ObjectLocation: location, + }, + ErrClass: &storj.ErrObjectNotFound, + ErrText: "metabase: sql: no rows in result set", + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Get object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + encryptedMetadata := testrand.Bytes(1024) + encryptedMetadataNonce := testrand.Nonce() + encryptedMetadataKey := testrand.Bytes(265) + + metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + OverrideEncryptedMetadata: true, + }, + }.Run(ctx, t, db, obj, 0) + + metabasetest.GetObjectLastCommitted{ + Opts: metabase.GetObjectLastCommitted{ + ObjectLocation: location, + }, + Result: metabase.Object{ + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + Encryption: metabasetest.DefaultEncryption, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + Encryption: metabasetest.DefaultEncryption, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + }, + }}.Check(ctx, t, db) + }) + + t.Run("Get object last committed version from multiple", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + firstVersion := obj + + metabasetest.CreateObject(ctx, t, db, firstVersion, 0) + secondVersion := metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 2, + StreamID: obj.StreamID, + } + + metabasetest.CreateObject(ctx, t, db, secondVersion, 0) + metabasetest.GetObjectLastCommitted{ + Opts: metabase.GetObjectLastCommitted{ + ObjectLocation: location, + }, + Result: metabase.Object{ + ObjectStream: secondVersion, + CreatedAt: now, + Status: metabase.Committed, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{Objects: []metabase.RawObject{ + { + ObjectStream: firstVersion, + CreatedAt: now, + Status: metabase.Committed, + Encryption: metabasetest.DefaultEncryption, + }, + { + ObjectStream: secondVersion, + CreatedAt: now, + Status: metabase.Committed, + Encryption: metabasetest.DefaultEncryption, + }, + }}.Check(ctx, t, db) + }) + + t.Run("Get latest copied object version", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + copyObjStream := metabasetest.RandObjectStream() + originalObject := metabasetest.CreateObject(ctx, t, db, obj, 0) + + copiedObj, _, _ := metabasetest.CreateObjectCopy{ + OriginalObject: originalObject, + CopyObjectStream: ©ObjStream, + }.Run(ctx, t, db) + + metabasetest.DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + Version: 1, + ObjectLocation: obj.Location(), + }, + Result: metabase.DeleteObjectResult{ + Objects: []metabase.Object{originalObject}, + }, + }.Check(ctx, t, db) + + metabasetest.GetObjectLastCommitted{ + Opts: metabase.GetObjectLastCommitted{ + ObjectLocation: copiedObj.Location(), + }, + Result: copiedObj, + }.Check(ctx, t, db) + + metabasetest.Verify{Objects: []metabase.RawObject{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: copiedObj.ProjectID, + BucketName: copiedObj.BucketName, + ObjectKey: copiedObj.ObjectKey, + Version: copiedObj.Version, + StreamID: copiedObj.StreamID, + }, + CreatedAt: now, + Status: metabase.Committed, + Encryption: metabasetest.DefaultEncryption, + EncryptedMetadata: copiedObj.EncryptedMetadata, + EncryptedMetadataNonce: copiedObj.EncryptedMetadataNonce, + EncryptedMetadataEncryptedKey: copiedObj.EncryptedMetadataEncryptedKey, + }, + }}.Check(ctx, t, db) + }) + }) +} + func TestGetSegmentByPosition(t *testing.T) { metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { obj := metabasetest.RandObjectStream() diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index b0de6edff..20b4e4a7a 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -197,6 +197,22 @@ func (step GetObjectExactVersion) Check(ctx *testcontext.Context, t testing.TB, require.Zero(t, diff) } +// GetObjectLastCommitted is for testing metabase.GetObjectLastCommitted. +type GetObjectLastCommitted struct { + Opts metabase.GetObjectLastCommitted + Result metabase.Object + ErrClass *errs.Class + ErrText string +} + +// Check runs the test. +func (step GetObjectLastCommitted) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { + result, err := db.GetObjectLastCommitted(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +} + // GetSegmentByPosition is for testing metabase.GetSegmentByPosition. type GetSegmentByPosition struct { Opts metabase.GetSegmentByPosition diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 0af2c0d33..c5f57a5d6 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -122,7 +122,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - // TODO this will work only with newsest uplink + // TODO this will work only with newest uplink // figue out what to do with this encryptionParameters := storj.EncryptionParameters{ CipherSuite: storj.CipherSuite(req.EncryptionParameters.CipherSuite), @@ -1127,7 +1127,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj streamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{ Bucket: []byte(object.BucketName), EncryptedObjectKey: []byte(object.ObjectKey), - Version: int32(object.Version), // TODO incomatible types + Version: int32(object.Version), // TODO incompatible types CreationDate: object.CreatedAt, ExpirationDate: expires, StreamId: object.StreamID[:], @@ -1182,7 +1182,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj result := &pb.Object{ Bucket: []byte(object.BucketName), EncryptedPath: []byte(object.ObjectKey), - Version: int32(object.Version), // TODO incomatible types + Version: int32(object.Version), // TODO incompatible types StreamId: streamID, ExpiresAt: expires, CreatedAt: object.CreatedAt, @@ -1573,7 +1573,7 @@ func convertBeginMoveObjectResults(result metabase.BeginMoveObjectResult) (*pb.O } } - // TODO we need this becase of an uplink issue with how we are storing key and nonce + // TODO we need this because of an uplink issue with how we are storing key and nonce if result.EncryptedMetadataKey == nil { streamMeta := &pb.StreamMeta{} err := pb.Unmarshal(result.EncryptedMetadata, streamMeta) @@ -1802,7 +1802,7 @@ func convertBeginCopyObjectResults(result metabase.BeginCopyObjectResult) (*pb.O } } - // TODO we need this becase of an uplink issue with how we are storing key and nonce + // TODO we need this because of an uplink issue with how we are storing key and nonce if result.EncryptedMetadataKey == nil { streamMeta := &pb.StreamMeta{} err := pb.Unmarshal(result.EncryptedMetadata, streamMeta)