satellite/metainfo/metabase: read created_at from DB

All SQL queries for reading segments are updated to read the created_at
column where appropriate.

Change-Id: Icd7c7672fa71e992673078598b28229bb898c728
This commit is contained in:
Kaloyan Raev 2021-03-12 19:43:30 +02:00
parent 887f3b04e1
commit 6e661da0a0
13 changed files with 91 additions and 38 deletions

2
go.mod
View File

@ -46,7 +46,7 @@ require (
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
google.golang.org/api v0.20.0 // indirect
google.golang.org/protobuf v1.25.0 // indirect
storj.io/common v0.0.0-20210217105242-970e119468ed
storj.io/common v0.0.0-20210311141746-133f4d716d1d
storj.io/drpc v0.0.16
storj.io/monkit-jaeger v0.0.0-20210205021559-85f08034688c
storj.io/private v0.0.0-20210203200143-9d2ec06f0d3c

4
go.sum
View File

@ -921,8 +921,8 @@ sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3
storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/common v0.0.0-20201026135900-1aaeec90670b/go.mod h1:GqdmNf3fLm2UZX/7Zr0BLFCJ4gFjgm6eHrk/fnmr5jQ=
storj.io/common v0.0.0-20210208122718-577b1f8a0a0f/go.mod h1:b8XP/TdW8OyTZ/J2BDFOIE9KojSUNZgImBFZI99zS04=
storj.io/common v0.0.0-20210217105242-970e119468ed h1:hL0mXcag3pydoaRiQ8kYrj+qTSbR2Dp3nrC0penj/f0=
storj.io/common v0.0.0-20210217105242-970e119468ed/go.mod h1:b8XP/TdW8OyTZ/J2BDFOIE9KojSUNZgImBFZI99zS04=
storj.io/common v0.0.0-20210311141746-133f4d716d1d h1:cq11lWaPt91EppJBnR7vrVyfGpb0Za68Dm1lTENLwF0=
storj.io/common v0.0.0-20210311141746-133f4d716d1d/go.mod h1:OAPn3OXJBq4omkIlWSrTsLa6hm4FnaLs12Odn/ksQL4=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.14/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
storj.io/drpc v0.0.16 h1:9sxypc5lKi/0D69cR21BR0S21+IvXfON8L5nXMVNTwQ=

View File

@ -298,8 +298,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: pos01,
StreamID: obj.StreamID,
Position: pos01,
CreatedAt: &now,
RootPieceID: rootPieceID01,
EncryptedKey: encryptedKey01,
@ -396,8 +397,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: pos01,
StreamID: obj.StreamID,
Position: pos01,
CreatedAt: &now,
EncryptedKey: encryptedKey01,
EncryptedKeyNonce: encryptedKeyNonce01,
@ -499,8 +501,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: pos00,
StreamID: obj.StreamID,
Position: pos00,
CreatedAt: &now,
RootPieceID: rootPieceID00,
EncryptedKey: encryptedKey00,
@ -515,8 +518,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
Pieces: pieces00,
},
{
StreamID: obj.StreamID,
Position: pos01,
StreamID: obj.StreamID,
Position: pos01,
CreatedAt: &now,
RootPieceID: rootPieceID01,
EncryptedKey: encryptedKey01,
@ -621,8 +625,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: pos00,
StreamID: obj.StreamID,
Position: pos00,
CreatedAt: &now,
RootPieceID: rootPieceID00,
EncryptedKey: encryptedKey00,
@ -637,8 +642,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
Pieces: pieces00,
},
{
StreamID: obj.StreamID,
Position: pos10,
StreamID: obj.StreamID,
Position: pos10,
CreatedAt: &now,
RootPieceID: rootPieceID10,
EncryptedKey: encryptedKey10,
@ -743,8 +749,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: pos00,
StreamID: obj.StreamID,
Position: pos00,
CreatedAt: &now,
RootPieceID: rootPieceID00,
EncryptedKey: encryptedKey00,
@ -759,8 +766,9 @@ func TestCommitObjectWithSegments(t *testing.T) {
Pieces: pieces00,
},
{
StreamID: obj.StreamID,
Position: pos02,
StreamID: obj.StreamID,
Position: pos02,
CreatedAt: &now,
RootPieceID: rootPieceID02,
EncryptedKey: encryptedKey02,

View File

@ -937,6 +937,7 @@ func TestBeginSegment(t *testing.T) {
func TestCommitSegment(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := randObjectStream()
now := time.Now()
for _, test := range invalidObjectStreams(obj) {
test := test
@ -1258,8 +1259,9 @@ func TestCommitSegment(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: &now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
@ -1405,7 +1407,8 @@ func TestCommitSegment(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
StreamID: obj.StreamID,
CreatedAt: &now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
@ -1427,6 +1430,7 @@ func TestCommitSegment(t *testing.T) {
func TestCommitInlineSegment(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := randObjectStream()
now := time.Now()
for _, test := range invalidObjectStreams(obj) {
test := test
@ -1574,8 +1578,9 @@ func TestCommitInlineSegment(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: &now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
@ -1688,7 +1693,8 @@ func TestCommitInlineSegment(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
StreamID: obj.StreamID,
CreatedAt: &now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
@ -1741,7 +1747,8 @@ func TestCommitInlineSegment(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
StreamID: obj.StreamID,
CreatedAt: &now,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
@ -1929,8 +1936,9 @@ func TestCommitObject(t *testing.T) {
Verify{
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 0},
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 0},
CreatedAt: &now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,
@ -1944,8 +1952,9 @@ func TestCommitObject(t *testing.T) {
Pieces: pieces,
},
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 1},
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Index: 1},
CreatedAt: &now,
RootPieceID: rootPieceID,
EncryptedKey: encryptedKey,

View File

@ -189,8 +189,9 @@ func TestDeleteBucketObjects(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: objX.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
StreamID: objX.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
@ -204,8 +205,9 @@ func TestDeleteBucketObjects(t *testing.T) {
Redundancy: defaultTestRedundancy,
},
{
StreamID: objY.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
StreamID: objY.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},

View File

@ -106,6 +106,7 @@ func TestDeleteExpiredObjects(t *testing.T) {
expectedObj1Segment := metabase.Segment{
StreamID: obj1.StreamID,
RootPieceID: storj.PieceID{1},
CreatedAt: &now,
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedSize: 1060,

View File

@ -712,8 +712,9 @@ func TestDeleteObjectLatestVersion(t *testing.T) {
},
Segments: []metabase.RawSegment{
{
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
StreamID: obj.StreamID,
Position: metabase.SegmentPosition{Part: 0, Index: 0},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},

View File

@ -171,6 +171,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
err = db.db.QueryRow(ctx, `
SELECT
stream_id,
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
@ -188,6 +189,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
`, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Position.Encode()).
Scan(
&segment.StreamID,
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
@ -234,6 +236,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
var aliasPieces AliasPieces
err = db.db.QueryRow(ctx, `
SELECT
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
@ -244,6 +247,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
position = $2
`, opts.StreamID, opts.Position.Encode()).
Scan(
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
@ -284,6 +288,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
err = db.db.QueryRow(ctx, `
SELECT
stream_id, position,
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
@ -303,6 +308,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
`, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)).
Scan(
&segment.StreamID, &segment.Position,
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
@ -345,6 +351,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
err = db.db.QueryRow(ctx, `
SELECT
stream_id, position,
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
@ -366,6 +373,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
`, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.PlainOffset).
Scan(
&segment.StreamID, &segment.Position,
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},

View File

@ -378,6 +378,7 @@ func TestGetSegmentByLocation(t *testing.T) {
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
@ -475,6 +476,7 @@ func TestGetSegmentByPosition(t *testing.T) {
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
@ -575,6 +577,7 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
Position: metabase.SegmentPosition{
Index: 1,
},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
@ -681,6 +684,7 @@ func TestGetSegmentByOffset(t *testing.T) {
Position: metabase.SegmentPosition{
Index: uint32(i),
},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},

View File

@ -7,6 +7,7 @@ import (
"context"
"database/sql"
"errors"
"time"
"storj.io/common/uuid"
"storj.io/storj/private/tagsql"
@ -44,6 +45,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
err = withRows(db.db.Query(ctx, `
SELECT
position,
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
@ -60,6 +62,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
var aliasPieces AliasPieces
err = rows.Scan(
&segment.Position,
&segment.CreatedAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
@ -111,6 +114,7 @@ type ListStreamPositionsResult struct {
type SegmentPositionInfo struct {
Position SegmentPosition
PlainSize int32
CreatedAt *time.Time // TODO: make it non-nilable after we migrate all existing segments to have creation time
}
// ListStreamPositions lists specified stream segment positions.
@ -131,7 +135,7 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
err = withRows(db.db.Query(ctx, `
SELECT
position, plain_size
position, plain_size, created_at
FROM segments
WHERE
stream_id = $1 AND
@ -141,7 +145,7 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error {
for rows.Next() {
var segment SegmentPositionInfo
err = rows.Scan(&segment.Position, &segment.PlainSize)
err = rows.Scan(&segment.Position, &segment.PlainSize, &segment.CreatedAt)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}

View File

@ -5,6 +5,7 @@ package metabase_test
import (
"testing"
"time"
"storj.io/common/storj"
"storj.io/common/testcontext"
@ -15,6 +16,7 @@ import (
func TestListSegments(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := randObjectStream()
now := time.Now()
t.Run("StreamID missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
@ -67,6 +69,7 @@ func TestListSegments(t *testing.T) {
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
@ -182,6 +185,7 @@ func TestListSegments(t *testing.T) {
expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
@ -263,6 +267,7 @@ func TestListSegments(t *testing.T) {
func TestListStreamPositions(t *testing.T) {
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := randObjectStream()
now := time.Now()
t.Run("StreamID missing", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
@ -315,6 +320,7 @@ func TestListStreamPositions(t *testing.T) {
Position: metabase.SegmentPosition{
Index: 0,
},
CreatedAt: &now,
RootPieceID: storj.PieceID{1},
EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
@ -332,6 +338,7 @@ func TestListStreamPositions(t *testing.T) {
expectedSegments[i] = metabase.SegmentPositionInfo{
Position: expectedSegment.Position,
PlainSize: expectedSegment.PlainSize,
CreatedAt: &now,
}
}
@ -497,6 +504,7 @@ func TestListStreamPositions(t *testing.T) {
expectedSegments[i] = metabase.SegmentPositionInfo{
Position: pos,
PlainSize: expectedSegment.PlainSize,
CreatedAt: &now,
}
}

View File

@ -335,6 +335,8 @@ func TestIterateLoopStreams(t *testing.T) {
t.Run("List objects segments", func(t *testing.T) {
defer DeleteAll{}.Check(ctx, t, db)
now := time.Now()
expectedObject00 := createObject(ctx, t, db, randObjectStream(), 0)
expectedObject01 := createObject(ctx, t, db, randObjectStream(), 1)
expectedObject02 := createObject(ctx, t, db, randObjectStream(), 5)
@ -372,6 +374,7 @@ func TestIterateLoopStreams(t *testing.T) {
expectedRawSegments = append(expectedRawSegments, metabase.RawSegment{
StreamID: segment.StreamID,
Position: segment.Position,
CreatedAt: &now,
EncryptedSize: segment.EncryptedSize,
Pieces: segment.Pieces,
Redundancy: segment.Redundancy,

View File

@ -44,6 +44,8 @@ type RawSegment struct {
StreamID uuid.UUID
Position SegmentPosition
CreatedAt *time.Time // TODO: make it non-nilable after we migrate all existing segments to have creation time
RootPieceID storj.PieceID
EncryptedKeyNonce []byte
EncryptedKey []byte
@ -161,6 +163,7 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
rows, err := db.db.Query(ctx, `
SELECT
stream_id, position,
created_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size,
plain_offset, plain_size,
@ -180,6 +183,8 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
&seg.StreamID,
&seg.Position,
&seg.CreatedAt,
&seg.RootPieceID,
&seg.EncryptedKeyNonce,
&seg.EncryptedKey,