satellite/metabase: drop GetObjectLatestVersion method
We don't have different version of object than 1 so at the moment this method is not needed. Also using GetObjectExactVersion should be slightly more performant. Change-Id: I78235d8ae22594cc1d6345dabcc915f41cd7797b
This commit is contained in:
parent
dffa7845f6
commit
5041ee0abc
@ -73,8 +73,9 @@ func (endpoint *Endpoint) ObjectHealth(ctx context.Context, in *internalpb.Objec
|
||||
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
|
||||
}
|
||||
// TODO add version field to ObjectHealthRequest?
|
||||
object, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
ObjectLocation: objectLocation,
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
@ -121,8 +122,9 @@ func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *internalpb.Segm
|
||||
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
|
||||
}
|
||||
|
||||
object, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
ObjectLocation: objectLocation,
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
|
@ -276,22 +276,6 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("GetObjectLatestVersion", func(b *testing.B) {
|
||||
m := make(Metrics, 0, b.N*len(s.objectStream))
|
||||
defer m.Report(b, "ns/obj")
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, object := range s.objectStream {
|
||||
m.Record(func() {
|
||||
_, err := db.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
ObjectLocation: object.Location(),
|
||||
})
|
||||
require.NoError(b, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("GetSegmentByPosition", func(b *testing.B) {
|
||||
m := make(Metrics, 0, b.N*len(s.objectStream)*s.parts*s.segments)
|
||||
defer m.Report(b, "ns/seg")
|
||||
|
@ -113,62 +113,6 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers
|
||||
return object, nil
|
||||
}
|
||||
|
||||
// GetObjectLatestVersion contains arguments necessary for fetching
|
||||
// an object information for latest version.
|
||||
type GetObjectLatestVersion struct {
|
||||
ObjectLocation
|
||||
}
|
||||
|
||||
// GetObjectLatestVersion returns object information for latest version.
|
||||
func (db *DB) GetObjectLatestVersion(ctx context.Context, opts GetObjectLatestVersion) (_ Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
return Object{}, err
|
||||
}
|
||||
|
||||
object := Object{}
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
stream_id, version,
|
||||
created_at, expires_at,
|
||||
segment_count,
|
||||
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption
|
||||
FROM objects
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
status = `+committedStatus+`
|
||||
ORDER BY version desc
|
||||
LIMIT 1
|
||||
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey).
|
||||
Scan(
|
||||
&object.StreamID, &object.Version,
|
||||
&object.CreatedAt, &object.ExpiresAt,
|
||||
&object.SegmentCount,
|
||||
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
|
||||
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
|
||||
encryptionParameters{&object.Encryption},
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
|
||||
}
|
||||
return Object{}, Error.New("unable to query object status: %w", err)
|
||||
}
|
||||
|
||||
object.ProjectID = opts.ProjectID
|
||||
object.BucketName = opts.BucketName
|
||||
object.ObjectKey = opts.ObjectKey
|
||||
|
||||
object.Status = Committed
|
||||
|
||||
return object, nil
|
||||
}
|
||||
|
||||
// GetSegmentByPosition contains arguments necessary for fetching a segment on specific position.
|
||||
type GetSegmentByPosition struct {
|
||||
StreamID uuid.UUID
|
||||
@ -447,8 +391,9 @@ func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID
|
||||
func (db *DB) TestingAllObjectSegments(ctx context.Context, objectLocation ObjectLocation) (segments []Segment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
object, err := db.GetObjectLatestVersion(ctx, GetObjectLatestVersion{
|
||||
object, err := db.GetObjectExactVersion(ctx, GetObjectExactVersion{
|
||||
ObjectLocation: objectLocation,
|
||||
Version: DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/metabasetest"
|
||||
)
|
||||
@ -163,174 +162,6 @@ func TestGetObjectExactVersion(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetObjectLatestVersion(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
||||
location := obj.Location()
|
||||
|
||||
now := time.Now()
|
||||
zombieDeadline := now.Add(24 * time.Hour)
|
||||
|
||||
for _, test := range metabasetest.InvalidObjectLocations(location) {
|
||||
test := test
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
metabasetest.GetObjectLatestVersion{
|
||||
Opts: metabase.GetObjectLatestVersion{
|
||||
ObjectLocation: test.ObjectLocation,
|
||||
},
|
||||
ErrClass: test.ErrClass,
|
||||
ErrText: test.ErrText,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("Object missing", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.GetObjectLatestVersion{
|
||||
Opts: metabase.GetObjectLatestVersion{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: sql: no rows in result set",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get pending object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.GetObjectLatestVersion{
|
||||
Opts: metabase.GetObjectLatestVersion{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: sql: no rows in result set",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
ZombieDeletionDeadline: &zombieDeadline,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get object", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}.Run(ctx, t, db, obj, 0)
|
||||
|
||||
metabasetest.GetObjectLatestVersion{
|
||||
Opts: metabase.GetObjectLatestVersion{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.Object{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get latest object version from multiple", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
firstVersion := obj
|
||||
metabasetest.CreateObject(ctx, t, db, firstVersion, 0)
|
||||
secondVersion := metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 2,
|
||||
StreamID: obj.StreamID,
|
||||
}
|
||||
metabasetest.CreateObject(ctx, t, db, secondVersion, 0)
|
||||
|
||||
metabasetest.GetObjectLatestVersion{
|
||||
Opts: metabase.GetObjectLatestVersion{
|
||||
ObjectLocation: location,
|
||||
},
|
||||
Result: metabase.Object{
|
||||
ObjectStream: secondVersion,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: firstVersion,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
{
|
||||
ObjectStream: secondVersion,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetSegmentByPosition(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
|
@ -197,23 +197,6 @@ func (step GetObjectExactVersion) Check(ctx *testcontext.Context, t testing.TB,
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// GetObjectLatestVersion is for testing metabase.GetObjectLatestVersion.
|
||||
type GetObjectLatestVersion struct {
|
||||
Opts metabase.GetObjectLatestVersion
|
||||
Result metabase.Object
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
// Check runs the test.
|
||||
func (step GetObjectLatestVersion) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
result, err := db.GetObjectLatestVersion(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
// GetSegmentByPosition is for testing metabase.GetSegmentByPosition.
|
||||
type GetSegmentByPosition struct {
|
||||
Opts metabase.GetSegmentByPosition
|
||||
|
@ -102,12 +102,13 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
_, err = endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
_, err = endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err == nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
|
||||
@ -144,7 +145,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
StreamID: streamID,
|
||||
Version: metabase.Version(1),
|
||||
Version: metabase.DefaultVersion,
|
||||
},
|
||||
ExpiresAt: expiresAt,
|
||||
Encryption: encryptionParameters,
|
||||
@ -231,7 +232,7 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
||||
BucketName: string(streamID.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey),
|
||||
StreamID: id,
|
||||
Version: metabase.Version(1),
|
||||
Version: metabase.DefaultVersion,
|
||||
},
|
||||
Encryption: encryption,
|
||||
}
|
||||
@ -284,12 +285,13 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
mbObject, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
mbObject, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
@ -384,12 +386,13 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
|
||||
// get the object information
|
||||
|
||||
object, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
@ -744,7 +747,7 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
|
||||
Prefix: prefix,
|
||||
Cursor: metabase.IterateCursor{
|
||||
Key: metabase.ObjectKey(cursor),
|
||||
Version: 1, // TODO: set to a the version from the protobuf request when it supports this
|
||||
Version: metabase.DefaultVersion, // TODO: set to a the version from the protobuf request when it supports this
|
||||
},
|
||||
Recursive: req.Recursive,
|
||||
BatchSize: limit + 1,
|
||||
@ -996,12 +999,13 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
|
||||
}
|
||||
|
||||
// TODO we may need custom metabase request to avoid two DB calls
|
||||
object, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
|
Loading…
Reference in New Issue
Block a user