{satellite/metabase, satellite/metainfo, satellite/inspector} : Use metabase.GetObjectLastCommitted instead metabase.GetObjectExactVersion
adjusted metainfo, metabase and inspector methods to use GetObjectLastCommitted, added test cases to confirm that this method will work correctly with version higher then 1 Closes https://github.com/storj/storj/issues/4869 Closes https://github.com/storj/storj/issues/4873 Change-Id: I8c338dcd1db82c141383f41339a295d54e2fa039
This commit is contained in:
parent
ad7c5b1483
commit
2a1bcfed81
@ -72,10 +72,9 @@ func (endpoint *Endpoint) ObjectHealth(ctx context.Context, in *internalpb.Objec
|
||||
BucketName: string(in.GetBucket()),
|
||||
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
|
||||
}
|
||||
// TODO add version field to ObjectHealthRequest?
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
|
||||
object, err := endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: objectLocation,
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
@ -122,9 +121,8 @@ func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *internalpb.Segm
|
||||
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
|
||||
}
|
||||
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
object, err := endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: objectLocation,
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
|
@ -468,9 +468,8 @@ func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID
|
||||
func (db *DB) TestingAllObjectSegments(ctx context.Context, objectLocation ObjectLocation) (segments []Segment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
object, err := db.GetObjectExactVersion(ctx, GetObjectExactVersion{
|
||||
object, err := db.GetObjectLastCommitted(ctx, GetObjectLastCommitted{
|
||||
ObjectLocation: objectLocation,
|
||||
Version: DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
|
@ -98,13 +98,12 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
_, err = endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
_, err = endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err == nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
|
||||
@ -284,13 +283,12 @@ func (endpoint *Endpoint) GetObject(ctx context.Context, req *pb.ObjectGetReques
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
mbObject, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
mbObject, err := endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
@ -381,21 +379,18 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
|
||||
}
|
||||
|
||||
// get the object information
|
||||
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
object, err := endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
}
|
||||
|
||||
// get the range segments
|
||||
|
||||
streamRange, err := calculateStreamRange(object, req.Range)
|
||||
if err != nil {
|
||||
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
|
||||
@ -1005,13 +1000,12 @@ func (endpoint *Endpoint) GetObjectIPs(ctx context.Context, req *pb.ObjectGetIPs
|
||||
}
|
||||
|
||||
// TODO we may need custom metabase request to avoid two DB calls
|
||||
object, err := endpoint.metabase.GetObjectExactVersion(ctx, metabase.GetObjectExactVersion{
|
||||
object, err := endpoint.metabase.GetObjectLastCommitted(ctx, metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: metabase.ObjectLocation{
|
||||
ProjectID: keyInfo.ProjectID,
|
||||
BucketName: string(req.Bucket),
|
||||
ObjectKey: metabase.ObjectKey(req.EncryptedPath),
|
||||
},
|
||||
Version: metabase.DefaultVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, endpoint.convertMetabaseErr(err)
|
||||
|
@ -4,6 +4,7 @@
|
||||
package metainfo_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -54,6 +55,7 @@ func TestObject_NoStorageNodes(t *testing.T) {
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
satellite := planet.Satellites[0]
|
||||
|
||||
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
|
||||
require.NoError(t, err)
|
||||
@ -494,6 +496,75 @@ func TestObject_NoStorageNodes(t *testing.T) {
|
||||
})
|
||||
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
|
||||
})
|
||||
|
||||
t.Run("get object", func(t *testing.T) {
|
||||
defer ctx.Check(deleteBucket)
|
||||
|
||||
err := planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "object", testrand.Bytes(256))
|
||||
require.NoError(t, err)
|
||||
|
||||
objects, err := satellite.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objects, 1)
|
||||
|
||||
committedObject := objects[0]
|
||||
|
||||
pendingObjectVersion, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: committedObject.ProjectID,
|
||||
BucketName: committedObject.BucketName,
|
||||
ObjectKey: committedObject.ObjectKey,
|
||||
StreamID: committedObject.StreamID,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, committedObject.Version+1, pendingObjectVersion)
|
||||
|
||||
getObjectResponse, err := satellite.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
|
||||
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
|
||||
Bucket: []byte("testbucket"),
|
||||
EncryptedPath: []byte(committedObject.ObjectKey),
|
||||
Version: int32(committedObject.Version),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, committedObject.BucketName, getObjectResponse.Object.Bucket)
|
||||
require.EqualValues(t, committedObject.ObjectKey, getObjectResponse.Object.EncryptedPath)
|
||||
require.EqualValues(t, committedObject.Version, getObjectResponse.Object.Version)
|
||||
})
|
||||
|
||||
t.Run("download object", func(t *testing.T) {
|
||||
defer ctx.Check(deleteBucket)
|
||||
|
||||
err := planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "object", testrand.Bytes(256))
|
||||
require.NoError(t, err)
|
||||
|
||||
objects, err := satellite.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objects, 1)
|
||||
|
||||
committedObject := objects[0]
|
||||
|
||||
pendingObjectVersion, err := satellite.API.Metainfo.Metabase.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: committedObject.ProjectID,
|
||||
BucketName: committedObject.BucketName,
|
||||
ObjectKey: committedObject.ObjectKey,
|
||||
StreamID: committedObject.StreamID,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, committedObject.Version+1, pendingObjectVersion)
|
||||
|
||||
downloadObjectResponse, err := satellite.API.Metainfo.Endpoint.DownloadObject(ctx, &pb.ObjectDownloadRequest{
|
||||
Header: &pb.RequestHeader{ApiKey: apiKey.SerializeRaw()},
|
||||
Bucket: []byte("testbucket"),
|
||||
EncryptedObjectKey: []byte(committedObject.ObjectKey),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, committedObject.BucketName, downloadObjectResponse.Object.Bucket)
|
||||
require.EqualValues(t, committedObject.ObjectKey, downloadObjectResponse.Object.EncryptedPath)
|
||||
require.EqualValues(t, committedObject.Version, downloadObjectResponse.Object.Version)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@ -669,6 +740,99 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("get object IP with same location committed and pending status", func(t *testing.T) {
|
||||
defer ctx.Check(deleteBucket(bucketName))
|
||||
|
||||
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
||||
uplnk := planet.Uplinks[0]
|
||||
sat := planet.Satellites[0]
|
||||
|
||||
require.NoError(t, uplnk.Upload(ctx, sat, bucketName, "jones", testrand.Bytes(20*memory.KB)))
|
||||
|
||||
ips, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
|
||||
require.NoError(t, err)
|
||||
require.True(t, len(ips) > 0)
|
||||
|
||||
// verify it's a real IP with valid host and port
|
||||
for _, ip := range ips {
|
||||
host, port, err := net.SplitHostPort(string(ip))
|
||||
require.NoError(t, err)
|
||||
netIP := net.ParseIP(host)
|
||||
require.NotNil(t, netIP)
|
||||
_, err = strconv.Atoi(port)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
objects, err := sat.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, objects, 1)
|
||||
|
||||
committedObject := objects[0]
|
||||
|
||||
pendingObjectVersion, err := sat.Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: committedObject.ProjectID,
|
||||
BucketName: committedObject.BucketName,
|
||||
ObjectKey: committedObject.ObjectKey,
|
||||
StreamID: committedObject.StreamID,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, committedObject.Version+1, pendingObjectVersion)
|
||||
|
||||
newIps, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
|
||||
require.NoError(t, err)
|
||||
|
||||
sort.Slice(ips, func(i, j int) bool {
|
||||
return bytes.Compare(ips[i], ips[j]) < 0
|
||||
})
|
||||
sort.Slice(newIps, func(i, j int) bool {
|
||||
return bytes.Compare(newIps[i], newIps[j]) < 0
|
||||
})
|
||||
require.Equal(t, ips, newIps)
|
||||
})
|
||||
|
||||
t.Run("get object IP with version != 1", func(t *testing.T) {
|
||||
defer ctx.Check(deleteBucket(bucketName))
|
||||
|
||||
access := planet.Uplinks[0].Access[planet.Satellites[0].ID()]
|
||||
uplnk := planet.Uplinks[0]
|
||||
sat := planet.Satellites[0]
|
||||
|
||||
require.NoError(t, uplnk.Upload(ctx, sat, bucketName, "jones", testrand.Bytes(20*memory.KB)))
|
||||
|
||||
objects, err := sat.API.Metainfo.Metabase.TestingAllObjects(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
committedObject := objects[0]
|
||||
randomVersion := metabase.Version(2 + testrand.Intn(9))
|
||||
|
||||
// atm there's no better way to change object's version
|
||||
res, err := planet.Satellites[0].Metabase.DB.UnderlyingTagSQL().Exec(ctx,
|
||||
"UPDATE objects SET version = $1 WHERE project_id = $2 AND bucket_name = $3 AND object_key = $4 AND stream_id = $5",
|
||||
randomVersion, committedObject.ProjectID, committedObject.BucketName, committedObject.ObjectKey, committedObject.StreamID,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
affected, err := res.RowsAffected()
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
|
||||
ips, err := object.GetObjectIPs(ctx, uplink.Config{}, access, bucketName, "jones")
|
||||
require.NoError(t, err)
|
||||
require.True(t, len(ips) > 0)
|
||||
|
||||
// verify it's a real IP with valid host and port
|
||||
for _, ip := range ips {
|
||||
host, port, err := net.SplitHostPort(string(ip))
|
||||
require.NoError(t, err)
|
||||
netIP := net.ParseIP(host)
|
||||
require.NotNil(t, netIP)
|
||||
_, err = strconv.Atoi(port)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("multipart object download rejection", func(t *testing.T) {
|
||||
defer ctx.Check(deleteBucket("pip-first"))
|
||||
defer ctx.Check(deleteBucket("pip-second"))
|
||||
|
Loading…
Reference in New Issue
Block a user