satellite/metainfo/metabase: add GetSegmentByOffset request

Change-Id: Iea2ce439ec1f7285e447d590c1297294204edc2e
This commit is contained in:
Michal Niewrzal 2020-11-10 10:17:25 +01:00
parent 3ed4183e52
commit 92f9251074
3 changed files with 204 additions and 4 deletions

View File

@ -246,3 +246,60 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
return segment, nil return segment, nil
} }
// GetSegmentByOffset contains arguments necessary for fetching a segment information.
type GetSegmentByOffset struct {
ObjectLocation
PlainOffset int64
}
// GetSegmentByOffset returns an object segment information.
func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (segment Segment, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return Segment{}, err
}
if opts.PlainOffset < 0 {
return Segment{}, ErrInvalidRequest.New("Invalid PlainOffset: %d", opts.PlainOffset)
}
err = db.db.QueryRow(ctx, `
SELECT
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
inline_data, remote_pieces
FROM segments
WHERE
stream_id = (SELECT stream_id FROM objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = 1
ORDER BY version DESC
LIMIT 1
) AND
plain_offset <= $4 AND
(plain_size + plain_offset) > $4
ORDER BY plain_offset ASC
LIMIT 1
`, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.PlainOffset).
Scan(
&segment.StreamID, &segment.Position,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &segment.Pieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Segment{}, storj.ErrObjectNotFound.Wrap(Error.New("object or segment missing"))
}
return Segment{}, Error.New("unable to query segment: %w", err)
}
return segment, nil
}

View File

@ -500,3 +500,131 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
}) })
}) })
} }
func TestGetSegmentByOffset(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := randObjectStream()
location := obj.Location()
now := time.Now()
for _, test := range invalidObjectLocations(location) {
test := test
t.Run(test.Name, func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
GetSegmentByOffset{
Opts: metabase.GetSegmentByOffset{
ObjectLocation: test.ObjectLocation,
},
ErrClass: test.ErrClass,
ErrText: test.ErrText,
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
}
t.Run("Invalid PlainOffset", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
GetSegmentByOffset{
Opts: metabase.GetSegmentByOffset{
ObjectLocation: location,
PlainOffset: -1,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "Invalid PlainOffset: -1",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("Object or segment missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
GetSegmentByOffset{
Opts: metabase.GetSegmentByOffset{
ObjectLocation: location,
},
ErrClass: &storj.ErrObjectNotFound,
ErrText: "metabase: object or segment missing",
}.Check(ctx, t, db)
Verify{}.Check(ctx, t, db)
})
t.Run("Get segment", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
CreateTestObject{}.Run(ctx, t, db, obj, 4)
segments := make([]metabase.Segment, 4)
for i := range segments {
segments[i] = metabase.Segment{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{
Index: uint32(i),
},
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedSize: 1060,
PlainSize: 512,
PlainOffset: int64(i * 512),
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
Redundancy: defaultTestRedundancy,
}
}
var testCases = []struct {
Offset int64
ExpectedSegment metabase.Segment
}{
{0, segments[0]},
{100, segments[0]},
{1023, segments[1]},
{1024, segments[2]},
}
for _, tc := range testCases {
GetSegmentByOffset{
Opts: metabase.GetSegmentByOffset{
ObjectLocation: location,
PlainOffset: tc.Offset,
},
Result: tc.ExpectedSegment,
}.Check(ctx, t, db)
}
GetSegmentByOffset{
Opts: metabase.GetSegmentByOffset{
ObjectLocation: location,
PlainOffset: 2048,
},
ErrClass: &storj.ErrObjectNotFound,
ErrText: "metabase: object or segment missing",
}.Check(ctx, t, db)
Verify{
Objects: []metabase.RawObject{
{
ObjectStream: obj,
CreatedAt: now,
Status: metabase.Committed,
SegmentCount: 4,
TotalEncryptedSize: 4240,
FixedSegmentSize: 1060,
Encryption: defaultTestEncryption,
},
},
Segments: []metabase.RawSegment{
metabase.RawSegment(segments[0]),
metabase.RawSegment(segments[1]),
metabase.RawSegment(segments[2]),
metabase.RawSegment(segments[3]),
},
}.Check(ctx, t, db)
})
})
}

View File

@ -164,6 +164,21 @@ func (step GetLatestObjectLastSegment) Check(ctx *testcontext.Context, t *testin
require.Zero(t, diff) require.Zero(t, diff)
} }
type GetSegmentByOffset struct {
Opts metabase.GetSegmentByOffset
Result metabase.Segment
ErrClass *errs.Class
ErrText string
}
func (step GetSegmentByOffset) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
result, err := db.GetSegmentByOffset(ctx, step.Opts)
checkError(t, err, step.ErrClass, step.ErrText)
diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
require.Zero(t, diff)
}
type DeleteObjectExactVersion struct { type DeleteObjectExactVersion struct {
Opts metabase.DeleteObjectExactVersion Opts metabase.DeleteObjectExactVersion
Result metabase.DeleteObjectResult Result metabase.DeleteObjectResult
@ -283,12 +298,12 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t *testing.T, db *metab
Version: obj.Version, Version: obj.Version,
}.Check(ctx, t, db) }.Check(ctx, t, db)
for i := byte(1); i <= numberOfSegments; i++ { for i := byte(0); i < numberOfSegments; i++ {
BeginSegment{ BeginSegment{
Opts: metabase.BeginSegment{ Opts: metabase.BeginSegment{
ObjectStream: obj, ObjectStream: obj,
Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)},
RootPieceID: storj.PieceID{i}, RootPieceID: storj.PieceID{i + 1},
Pieces: []metabase.Piece{{ Pieces: []metabase.Piece{{
Number: 1, Number: 1,
StorageNode: testrand.NodeID(), StorageNode: testrand.NodeID(),
@ -306,9 +321,9 @@ func (co CreateTestObject) Run(ctx *testcontext.Context, t *testing.T, db *metab
EncryptedKey: []byte{3}, EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4}, EncryptedKeyNonce: []byte{4},
EncryptedSize: 1024, EncryptedSize: 1060,
PlainSize: 512, PlainSize: 512,
PlainOffset: 0, PlainOffset: int64(i) * 512,
Redundancy: defaultTestRedundancy, Redundancy: defaultTestRedundancy,
}, },
}.Check(ctx, t, db) }.Check(ctx, t, db)