satellite/metainfo/metabase: GetSegmentByLocation
We need this method to fix repairing pending objects. In another PR, it will replace the GetObjectLatestVersion + GetSegmentByPosition calls that are currently executed. Change-Id: I4c5c2ab604edf898452b6fd21b86d4d3f970ce79
This commit is contained in:
parent
6f441960ec
commit
b519bb377d
@ -202,6 +202,19 @@ func (seg SegmentLocation) Encode() SegmentKey {
|
||||
))
|
||||
}
|
||||
|
||||
// Verify segment location fields.
|
||||
func (seg SegmentLocation) Verify() error {
|
||||
switch {
|
||||
case seg.ProjectID.IsZero():
|
||||
return ErrInvalidRequest.New("ProjectID missing")
|
||||
case seg.BucketName == "":
|
||||
return ErrInvalidRequest.New("BucketName missing")
|
||||
case len(seg.ObjectKey) == 0:
|
||||
return ErrInvalidRequest.New("ObjectKey missing")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ObjectStream uniquely defines an object and stream.
|
||||
//
|
||||
// TODO: figure out whether ther's a better name.
|
||||
|
@ -7,8 +7,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
@ -16,48 +14,6 @@ import (
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
)
|
||||
|
||||
type invalidObjectLocation struct {
|
||||
Name string
|
||||
ObjectLocation metabase.ObjectLocation
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
func invalidObjectLocations(base metabase.ObjectLocation) []invalidObjectLocation {
|
||||
var tests []invalidObjectLocation
|
||||
{
|
||||
location := base
|
||||
location.ProjectID = uuid.UUID{}
|
||||
tests = append(tests, invalidObjectLocation{
|
||||
Name: "ProjectID missing",
|
||||
ObjectLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ProjectID missing",
|
||||
})
|
||||
}
|
||||
{
|
||||
location := base
|
||||
location.BucketName = ""
|
||||
tests = append(tests, invalidObjectLocation{
|
||||
Name: "BucketName missing",
|
||||
ObjectLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "BucketName missing",
|
||||
})
|
||||
}
|
||||
{
|
||||
location := base
|
||||
location.ObjectKey = ""
|
||||
tests = append(tests, invalidObjectLocation{
|
||||
Name: "ObjectKey missing",
|
||||
ObjectLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ObjectKey missing",
|
||||
})
|
||||
}
|
||||
|
||||
return tests
|
||||
}
|
||||
func TestDeletePendingObject(t *testing.T) {
|
||||
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := randObjectStream()
|
||||
|
@ -155,6 +155,56 @@ func (db *DB) GetObjectLatestVersion(ctx context.Context, opts GetObjectLatestVe
|
||||
return object, nil
|
||||
}
|
||||
|
||||
// GetSegmentByLocation contains arguments necessary for fetching a segment on specific segment location.
|
||||
type GetSegmentByLocation struct {
|
||||
SegmentLocation
|
||||
}
|
||||
|
||||
// GetSegmentByLocation returns information about segment on the specified location.
|
||||
func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocation) (segment Segment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.Verify(); err != nil {
|
||||
return Segment{}, err
|
||||
}
|
||||
|
||||
err = db.db.QueryRow(ctx, `
|
||||
SELECT
|
||||
stream_id,
|
||||
root_piece_id, encrypted_key_nonce, encrypted_key,
|
||||
encrypted_size, plain_offset, plain_size,
|
||||
redundancy,
|
||||
inline_data, remote_pieces
|
||||
FROM segments
|
||||
WHERE
|
||||
stream_id IN (SELECT stream_id FROM objects WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3
|
||||
ORDER BY version DESC
|
||||
LIMIT 1
|
||||
) AND
|
||||
position = $4
|
||||
`, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Position.Encode()).
|
||||
Scan(
|
||||
&segment.StreamID,
|
||||
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
|
||||
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
|
||||
redundancyScheme{&segment.Redundancy},
|
||||
&segment.InlineData, &segment.Pieces,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Segment{}, storj.ErrObjectNotFound.Wrap(Error.New("object or segment missing"))
|
||||
}
|
||||
return Segment{}, Error.New("unable to query segment: %w", err)
|
||||
}
|
||||
|
||||
segment.Position = opts.Position
|
||||
|
||||
return segment, nil
|
||||
}
|
||||
|
||||
// GetSegmentByPosition contains arguments necessary for fetching a segment on specific position.
|
||||
type GetSegmentByPosition struct {
|
||||
StreamID uuid.UUID
|
||||
@ -169,7 +219,7 @@ func (seg *GetSegmentByPosition) Verify() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSegmentByPosition returns a information about segment which covers specified offset.
|
||||
// GetSegmentByPosition returns information about segment on the specified position.
|
||||
func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPosition) (segment Segment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
|
@ -326,6 +326,113 @@ func TestGetObjectLatestVersion(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetSegmentByLocation(t *testing.T) {
|
||||
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := randObjectStream()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
location := metabase.SegmentLocation{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
}
|
||||
|
||||
for _, test := range invalidSegmentLocations(location) {
|
||||
test := test
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
GetSegmentByLocation{
|
||||
Opts: metabase.GetSegmentByLocation{
|
||||
SegmentLocation: test.SegmentLocation,
|
||||
},
|
||||
ErrClass: test.ErrClass,
|
||||
ErrText: test.ErrText,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("Object missing", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
GetSegmentByLocation{
|
||||
Opts: metabase.GetSegmentByLocation{
|
||||
SegmentLocation: location,
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: object or segment missing",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get segment", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
createObject(ctx, t, db, obj, 1)
|
||||
|
||||
expectedSegment := metabase.Segment{
|
||||
StreamID: obj.StreamID,
|
||||
Position: metabase.SegmentPosition{
|
||||
Index: 0,
|
||||
},
|
||||
RootPieceID: storj.PieceID{1},
|
||||
EncryptedKey: []byte{3},
|
||||
EncryptedKeyNonce: []byte{4},
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
|
||||
Redundancy: defaultTestRedundancy,
|
||||
}
|
||||
|
||||
GetSegmentByLocation{
|
||||
Opts: metabase.GetSegmentByLocation{
|
||||
SegmentLocation: location,
|
||||
},
|
||||
Result: expectedSegment,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// check non existing segment in existing object
|
||||
GetSegmentByLocation{
|
||||
Opts: metabase.GetSegmentByLocation{
|
||||
SegmentLocation: metabase.SegmentLocation{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Position: metabase.SegmentPosition{
|
||||
Index: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: object or segment missing",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
SegmentCount: 1,
|
||||
|
||||
TotalPlainSize: 512,
|
||||
TotalEncryptedSize: 1024,
|
||||
FixedSegmentSize: 512,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
},
|
||||
Segments: []metabase.RawSegment{
|
||||
metabase.RawSegment(expectedSegment),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetSegmentByPosition(t *testing.T) {
|
||||
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := randObjectStream()
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
)
|
||||
|
||||
@ -184,6 +185,21 @@ func (step GetObjectLatestVersion) Check(ctx *testcontext.Context, t testing.TB,
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
type GetSegmentByLocation struct {
|
||||
Opts metabase.GetSegmentByLocation
|
||||
Result metabase.Segment
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
func (step GetSegmentByLocation) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
|
||||
result, err := db.GetSegmentByLocation(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
|
||||
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
|
||||
require.Zero(t, diff)
|
||||
}
|
||||
|
||||
type GetSegmentByPosition struct {
|
||||
Opts metabase.GetSegmentByPosition
|
||||
Result metabase.Segment
|
||||
@ -512,3 +528,89 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
Opts: coOpts,
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
|
||||
type invalidObjectLocation struct {
|
||||
Name string
|
||||
ObjectLocation metabase.ObjectLocation
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
func invalidObjectLocations(base metabase.ObjectLocation) []invalidObjectLocation {
|
||||
var tests []invalidObjectLocation
|
||||
{
|
||||
location := base
|
||||
location.ProjectID = uuid.UUID{}
|
||||
tests = append(tests, invalidObjectLocation{
|
||||
Name: "ProjectID missing",
|
||||
ObjectLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ProjectID missing",
|
||||
})
|
||||
}
|
||||
{
|
||||
location := base
|
||||
location.BucketName = ""
|
||||
tests = append(tests, invalidObjectLocation{
|
||||
Name: "BucketName missing",
|
||||
ObjectLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "BucketName missing",
|
||||
})
|
||||
}
|
||||
{
|
||||
location := base
|
||||
location.ObjectKey = ""
|
||||
tests = append(tests, invalidObjectLocation{
|
||||
Name: "ObjectKey missing",
|
||||
ObjectLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ObjectKey missing",
|
||||
})
|
||||
}
|
||||
|
||||
return tests
|
||||
}
|
||||
|
||||
type invalidSegmentLocation struct {
|
||||
Name string
|
||||
SegmentLocation metabase.SegmentLocation
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
func invalidSegmentLocations(base metabase.SegmentLocation) []invalidSegmentLocation {
|
||||
var tests []invalidSegmentLocation
|
||||
{
|
||||
location := base
|
||||
location.ProjectID = uuid.UUID{}
|
||||
tests = append(tests, invalidSegmentLocation{
|
||||
Name: "ProjectID missing",
|
||||
SegmentLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ProjectID missing",
|
||||
})
|
||||
}
|
||||
{
|
||||
location := base
|
||||
location.BucketName = ""
|
||||
tests = append(tests, invalidSegmentLocation{
|
||||
Name: "BucketName missing",
|
||||
SegmentLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "BucketName missing",
|
||||
})
|
||||
}
|
||||
{
|
||||
location := base
|
||||
location.ObjectKey = ""
|
||||
tests = append(tests, invalidSegmentLocation{
|
||||
Name: "ObjectKey missing",
|
||||
SegmentLocation: location,
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "ObjectKey missing",
|
||||
})
|
||||
}
|
||||
|
||||
return tests
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user