satellite/metabase: add metabase.GetObjectLatestVersion back
Restored GetObjectLatestVersion and renamed it to GetObjectLastCommitted Add test cases to cover server-side copy Closes https://github.com/storj/storj/issues/4866 Change-Id: I343b339a60152b8fb92fda97baf80bd8fe60d631
This commit is contained in:
parent
c552c343f9
commit
2abe709b6e
@ -278,6 +278,22 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("GetObjectLastCommitted", func(b *testing.B) {
|
||||
m := make(Metrics, 0, b.N*len(s.objectStream))
|
||||
defer m.Report(b, "ns/obj")
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, object := range s.objectStream {
|
||||
m.Record(func() {
|
||||
_, err := db.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: object.Location(),
|
||||
})
|
||||
require.NoError(b, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("GetSegmentByPosition", func(b *testing.B) {
|
||||
m := make(Metrics, 0, b.N*len(s.objectStream)*s.parts*s.segments)
|
||||
defer m.Report(b, "ns/seg")
|
||||
|
@ -10,9 +10,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/tagsql"
|
||||
)
|
||||
|
||||
// ErrSegmentNotFound is an error class for non-existing segment.
|
||||
@ -119,6 +121,88 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers
|
||||
return object, nil
|
||||
}
|
||||
|
||||
// GetObjectLastCommitted contains arguments necessary for fetching
|
||||
// an object information for last committed version.
|
||||
type GetObjectLastCommitted struct {
|
||||
ObjectLocation
|
||||
}
|
||||
|
||||
// GetObjectLastCommitted returns object information for last committed version.
|
||||
func (db *DB) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted) (_ Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
return Object{}, err
|
||||
}
|
||||
|
||||
var object Object
|
||||
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
stream_id, version,
|
||||
created_at, expires_at,
|
||||
segment_count,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
status = `+committedStatus+` AND
|
||||
(expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY version desc
|
||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
|
||||
objectFound := false
|
||||
for rows.Next() {
|
||||
var scannedObject Object
|
||||
if err = rows.Scan(
|
||||
&scannedObject.StreamID, &scannedObject.Version,
|
||||
&scannedObject.CreatedAt, &scannedObject.ExpiresAt,
|
||||
&scannedObject.SegmentCount,
|
||||
&scannedObject.EncryptedMetadataNonce, &scannedObject.EncryptedMetadata, &scannedObject.EncryptedMetadataEncryptedKey,
|
||||
&scannedObject.TotalPlainSize, &scannedObject.TotalEncryptedSize, &scannedObject.FixedSegmentSize,
|
||||
encryptionParameters{&scannedObject.Encryption},
|
||||
); err != nil {
|
||||
return Error.New("unable to query object status: %w", err)
|
||||
}
|
||||
|
||||
if objectFound {
|
||||
db.log.Warn("object with multiple committed versions were found!",
|
||||
zap.Stringer("Project ID", scannedObject.ProjectID), zap.String("Bucket Name", scannedObject.BucketName),
|
||||
zap.String("Object Key", string(scannedObject.ObjectKey)), zap.Int("Version", int(scannedObject.Version)),
|
||||
zap.Stringer("Stream ID", scannedObject.StreamID))
|
||||
mon.Meter("multiple_committed_versions").Mark(1)
|
||||
continue
|
||||
}
|
||||
object = scannedObject
|
||||
|
||||
objectFound = true
|
||||
}
|
||||
|
||||
if !objectFound {
|
||||
return sql.ErrNoRows
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
|
||||
}
|
||||
|
||||
return Object{}, Error.New("unable to query object status: %w", err)
|
||||
}
|
||||
|
||||
object.ProjectID = opts.ProjectID
|
||||
object.BucketName = opts.BucketName
|
||||
object.ObjectKey = opts.ObjectKey
|
||||
object.Status = Committed
|
||||
|
||||
return object, nil
|
||||
}
|
||||
|
||||
// GetSegmentByPosition contains arguments necessary for fetching a segment on specific position.
|
||||
type GetSegmentByPosition struct {
|
||||
StreamID uuid.UUID
|
||||
|
@ -187,6 +187,205 @@ func TestGetObjectExactVersion(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetObjectLastCommitted(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
location := obj.Location()
|
||||
now := time.Now()
|
||||
zombieDeadline := now.Add(24 * time.Hour)
|
||||
|
||||
for _, test := range metabasetest.InvalidObjectLocations(location) {
|
||||
test := test
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
metabasetest.GetObjectLastCommitted{
|
||||
Opts: metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: test.ObjectLocation,
|
||||
},
|
||||
ErrClass: test.ErrClass,
|
||||
ErrText: test.ErrText,
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("Object missing", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
metabasetest.GetObjectLastCommitted{
|
||||
Opts: metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: sql: no rows in result set",
|
||||
}.Check(ctx, t, db)
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get pending object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.GetObjectLastCommitted{
|
||||
Opts: metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: sql: no rows in result set",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
ZombieDeletionDeadline: &zombieDeadline,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
OverrideEncryptedMetadata: true,
|
||||
},
|
||||
}.Run(ctx, t, db, obj, 0)
|
||||
|
||||
metabasetest.GetObjectLastCommitted{
|
||||
Opts: metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.Object{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get object last committed version from multiple", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
firstVersion := obj
|
||||
|
||||
metabasetest.CreateObject(ctx, t, db, firstVersion, 0)
|
||||
secondVersion := metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 2,
|
||||
StreamID: obj.StreamID,
|
||||
}
|
||||
|
||||
metabasetest.CreateObject(ctx, t, db, secondVersion, 0)
|
||||
metabasetest.GetObjectLastCommitted{
|
||||
Opts: metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.Object{
|
||||
ObjectStream: secondVersion,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: firstVersion,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
{
|
||||
ObjectStream: secondVersion,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get latest copied object version", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
copyObjStream := metabasetest.RandObjectStream()
|
||||
originalObject := metabasetest.CreateObject(ctx, t, db, obj, 0)
|
||||
|
||||
copiedObj, _, _ := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObject,
|
||||
CopyObjectStream: ©ObjStream,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
Version: 1,
|
||||
ObjectLocation: obj.Location(),
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{originalObject},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.GetObjectLastCommitted{
|
||||
Opts: metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: copiedObj.Location(),
|
||||
},
|
||||
Result: copiedObj,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: copiedObj.ProjectID,
|
||||
BucketName: copiedObj.BucketName,
|
||||
ObjectKey: copiedObj.ObjectKey,
|
||||
Version: copiedObj.Version,
|
||||
StreamID: copiedObj.StreamID,
|
||||
},
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
EncryptedMetadata: copiedObj.EncryptedMetadata,
|
||||
EncryptedMetadataNonce: copiedObj.EncryptedMetadataNonce,
|
||||
EncryptedMetadataEncryptedKey: copiedObj.EncryptedMetadataEncryptedKey,
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetSegmentByPosition(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
@ -197,6 +197,22 @@ func (step GetObjectExactVersion) Check(ctx *testcontext.Context, t testing.TB,
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// GetObjectLastCommitted is for testing metabase.GetObjectLastCommitted.
|
||||
type GetObjectLastCommitted struct {
|
||||
Opts metabase.GetObjectLastCommitted
|
||||
Result metabase.Object
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
// Check runs the test.
|
||||
func (step GetObjectLastCommitted) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
result, err := db.GetObjectLastCommitted(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// GetSegmentByPosition is for testing metabase.GetSegmentByPosition.
|
||||
type GetSegmentByPosition struct {
|
||||
Opts metabase.GetSegmentByPosition
|
||||
|
@ -122,7 +122,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
||||
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
|
||||
}
|
||||
|
||||
// TODO this will work only with newsest uplink
|
||||
// TODO this will work only with newest uplink
|
||||
// figue out what to do with this
|
||||
encryptionParameters := storj.EncryptionParameters{
|
||||
CipherSuite: storj.CipherSuite(req.EncryptionParameters.CipherSuite),
|
||||
@ -1127,7 +1127,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj
|
||||
streamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
|
||||
Bucket: []byte(object.BucketName),
|
||||
EncryptedObjectKey: []byte(object.ObjectKey),
|
||||
Version: int32(object.Version), // TODO incomatible types
|
||||
Version: int32(object.Version), // TODO incompatible types
|
||||
CreationDate: object.CreatedAt,
|
||||
ExpirationDate: expires,
|
||||
StreamId: object.StreamID[:],
|
||||
@ -1182,7 +1182,7 @@ func (endpoint *Endpoint) objectToProto(ctx context.Context, object metabase.Obj
|
||||
result := &pb.Object{
|
||||
Bucket: []byte(object.BucketName),
|
||||
EncryptedPath: []byte(object.ObjectKey),
|
||||
Version: int32(object.Version), // TODO incomatible types
|
||||
Version: int32(object.Version), // TODO incompatible types
|
||||
StreamId: streamID,
|
||||
ExpiresAt: expires,
|
||||
CreatedAt: object.CreatedAt,
|
||||
@ -1573,7 +1573,7 @@ func convertBeginMoveObjectResults(result metabase.BeginMoveObjectResult) (*pb.O
|
||||
}
|
||||
}
|
||||
|
||||
// TODO we need this becase of an uplink issue with how we are storing key and nonce
|
||||
// TODO we need this because of an uplink issue with how we are storing key and nonce
|
||||
if result.EncryptedMetadataKey == nil {
|
||||
streamMeta := &pb.StreamMeta{}
|
||||
err := pb.Unmarshal(result.EncryptedMetadata, streamMeta)
|
||||
@ -1802,7 +1802,7 @@ func convertBeginCopyObjectResults(result metabase.BeginCopyObjectResult) (*pb.O
|
||||
}
|
||||
}
|
||||
|
||||
// TODO we need this becase of an uplink issue with how we are storing key and nonce
|
||||
// TODO we need this because of an uplink issue with how we are storing key and nonce
|
||||
if result.EncryptedMetadataKey == nil {
|
||||
streamMeta := &pb.StreamMeta{}
|
||||
err := pb.Unmarshal(result.EncryptedMetadata, streamMeta)
|
||||
|
Loading…
Reference in New Issue
Block a user