storj/satellite/metabase/loop.go
Egon Elbre 347f5f87e0 satellite/metabase/metaloop: limit as of system time
Currently we did not limit the "as of system time" for iterating over
objects table. Using just an interval would cause problems with the
tests. That could be overcome skipping that interval for tests
altogether, however, we should probably test those more to ensure that
GC stays working as intended.

This is a safer code, however, maybe not as straigthforward as it could
be.

Change-Id: I374f77783b2af42bb6da846735ceea20a7ce5e60
2021-06-07 13:01:06 +00:00

506 lines
12 KiB
Go

// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"bytes"
"context"
"sort"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql"
)
const loopIteratorBatchSizeLimit = 2500
// IterateLoopObjects contains arguments necessary for listing objects in metabase.
type IterateLoopObjects struct {
BatchSize int
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
}
// Verify verifies get object request fields.
func (opts *IterateLoopObjects) Verify() error {
if opts.BatchSize < 0 {
return ErrInvalidRequest.New("BatchSize is negative")
}
return nil
}
// LoopObjectsIterator iterates over a sequence of LoopObjectEntry items.
type LoopObjectsIterator interface {
Next(ctx context.Context, item *LoopObjectEntry) bool
}
// LoopObjectEntry contains information about object needed by metainfo loop.
type LoopObjectEntry struct {
ObjectStream // metrics, repair, tally
Status ObjectStatus // verify
CreatedAt time.Time // temp used by metabase-createdat-migration
ExpiresAt *time.Time // tally
SegmentCount int32 // metrics
EncryptedMetadataSize int // tally
}
// IterateLoopObjects iterates through all objects in metabase.
func (db *DB) IterateLoopObjects(ctx context.Context, opts IterateLoopObjects, fn func(context.Context, LoopObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return err
}
it := &loopIterator{
db: db,
batchSize: opts.BatchSize,
curIndex: 0,
cursor: loopIterateCursor{},
asOfSystemTime: opts.AsOfSystemTime,
asOfSystemInterval: opts.AsOfSystemInterval,
}
// ensure batch size is reasonable
if it.batchSize <= 0 || it.batchSize > loopIteratorBatchSizeLimit {
it.batchSize = loopIteratorBatchSizeLimit
}
it.curRows, err = it.doNextQuery(ctx)
if err != nil {
return err
}
defer func() {
if rowsErr := it.curRows.Err(); rowsErr != nil {
err = errs.Combine(err, rowsErr)
}
err = errs.Combine(err, it.failErr, it.curRows.Close())
}()
return fn(ctx, it)
}
// loopIterator enables iteration of all objects in metabase.
type loopIterator struct {
db *DB
batchSize int
asOfSystemTime time.Time
asOfSystemInterval time.Duration
curIndex int
curRows tagsql.Rows
cursor loopIterateCursor
// failErr is set when either scan or next query fails during iteration.
failErr error
}
type loopIterateCursor struct {
ProjectID uuid.UUID
BucketName string
ObjectKey ObjectKey
Version Version
}
// Next returns true if there was another item and copy it in item.
func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool {
next := it.curRows.Next()
if !next {
if it.curIndex < it.batchSize {
return false
}
if it.curRows.Err() != nil {
return false
}
rows, err := it.doNextQuery(ctx)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
if closeErr := it.curRows.Close(); closeErr != nil {
it.failErr = errs.Combine(it.failErr, closeErr, rows.Close())
return false
}
it.curRows = rows
it.curIndex = 0
if !it.curRows.Next() {
return false
}
}
err := it.scanItem(item)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
it.curIndex++
it.cursor.ProjectID = item.ProjectID
it.cursor.BucketName = item.BucketName
it.cursor.ObjectKey = item.ObjectKey
it.cursor.Version = item.Version
return true
}
func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
return it.db.db.Query(ctx, `
SELECT
project_id, bucket_name,
object_key, stream_id, version,
status,
created_at, expires_at,
segment_count,
LENGTH(COALESCE(encrypted_metadata,''))
FROM objects
`+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+`
WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC
LIMIT $5
`, it.cursor.ProjectID, []byte(it.cursor.BucketName),
[]byte(it.cursor.ObjectKey), int(it.cursor.Version),
it.batchSize,
)
}
// scanItem scans doNextQuery results into LoopObjectEntry.
func (it *loopIterator) scanItem(item *LoopObjectEntry) error {
return it.curRows.Scan(
&item.ProjectID, &item.BucketName,
&item.ObjectKey, &item.StreamID, &item.Version,
&item.Status,
&item.CreatedAt, &item.ExpiresAt,
&item.SegmentCount,
&item.EncryptedMetadataSize,
)
}
// IterateLoopStreams contains arguments necessary for listing multiple streams segments.
type IterateLoopStreams struct {
StreamIDs []uuid.UUID
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
}
// SegmentIterator returns the next segment.
type SegmentIterator func(segment *LoopSegmentEntry) bool
// LoopSegmentEntry contains information about segment metadata needed by metainfo loop.
type LoopSegmentEntry struct {
StreamID uuid.UUID
Position SegmentPosition
CreatedAt *time.Time // repair
RepairedAt *time.Time // repair
RootPieceID storj.PieceID
EncryptedSize int32 // size of the whole segment (not a piece)
PlainOffset int64 // verify
PlainSize int32 // verify
Redundancy storj.RedundancyScheme
Pieces Pieces
}
// Inline returns true if segment is inline.
func (s LoopSegmentEntry) Inline() bool {
return s.Redundancy.IsZero() && len(s.Pieces) == 0
}
// IterateLoopStreams lists multiple streams segments.
func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, handleStream func(ctx context.Context, streamID uuid.UUID, next SegmentIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
if len(opts.StreamIDs) == 0 {
return ErrInvalidRequest.New("StreamIDs list is empty")
}
sort.Slice(opts.StreamIDs, func(i, k int) bool {
return bytes.Compare(opts.StreamIDs[i][:], opts.StreamIDs[k][:]) < 0
})
// TODO do something like pgutil.UUIDArray()
bytesIDs := make([][]byte, len(opts.StreamIDs))
for i, streamID := range opts.StreamIDs {
if streamID.IsZero() {
return ErrInvalidRequest.New("StreamID missing: index %d", i)
}
id := streamID
bytesIDs[i] = id[:]
}
rows, err := db.db.Query(ctx, `
SELECT
stream_id, position,
created_at, repaired_at,
root_piece_id,
encrypted_size,
plain_offset, plain_size,
redundancy,
remote_alias_pieces
FROM segments
`+db.asOfTime(opts.AsOfSystemTime, opts.AsOfSystemInterval)+`
WHERE
-- this turns out to be a little bit faster than stream_id IN (SELECT unnest($1::BYTEA[]))
stream_id = ANY ($1::BYTEA[])
ORDER BY stream_id ASC, position ASC
`, pgutil.ByteaArray(bytesIDs))
if err != nil {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Err(), rows.Close()) }()
var noMoreData bool
var nextSegment *LoopSegmentEntry
for _, streamID := range opts.StreamIDs {
streamID := streamID
var internalError error
err := handleStream(ctx, streamID, func(output *LoopSegmentEntry) bool {
if nextSegment != nil {
if nextSegment.StreamID != streamID {
return false
}
*output = *nextSegment
nextSegment = nil
return true
}
if noMoreData {
return false
}
if !rows.Next() {
noMoreData = true
return false
}
var segment LoopSegmentEntry
var aliasPieces AliasPieces
err = rows.Scan(
&segment.StreamID, &segment.Position,
&segment.CreatedAt, &segment.RepairedAt,
&segment.RootPieceID,
&segment.EncryptedSize,
&segment.PlainOffset, &segment.PlainSize,
redundancyScheme{&segment.Redundancy},
&aliasPieces,
)
if err != nil {
internalError = Error.New("failed to scan segments: %w", err)
return false
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
internalError = Error.New("failed to convert aliases to pieces: %w", err)
return false
}
if segment.StreamID != streamID {
nextSegment = &segment
return false
}
*output = segment
return true
})
if internalError != nil || err != nil {
return Error.Wrap(errs.Combine(internalError, err))
}
}
if !noMoreData {
return Error.New("expected rows to be completely read")
}
return nil
}
func (db *DB) asOfTime(asOfSystemTime time.Time, asOfSystemInterval time.Duration) string {
interval := time.Since(asOfSystemTime)
if asOfSystemInterval < 0 && interval > -asOfSystemInterval {
return db.impl.AsOfSystemInterval(asOfSystemInterval)
}
return db.impl.AsOfSystemTime(asOfSystemTime)
}
// LoopSegmentsIterator iterates over a sequence of LoopSegmentEntry items.
type LoopSegmentsIterator interface {
Next(ctx context.Context, item *LoopSegmentEntry) bool
}
// IterateLoopSegments contains arguments necessary for listing segments in metabase.
type IterateLoopSegments struct {
BatchSize int
AsOfSystemTime time.Time
}
// Verify verifies segments request fields.
func (opts *IterateLoopSegments) Verify() error {
if opts.BatchSize < 0 {
return ErrInvalidRequest.New("BatchSize is negative")
}
return nil
}
// IterateLoopSegments iterates through all segments in metabase.
func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return err
}
it := &loopSegmentIterator{
db: db,
asOfSystemTime: opts.AsOfSystemTime,
batchSize: opts.BatchSize,
curIndex: 0,
cursor: loopSegmentIteratorCursor{},
}
// ensure batch size is reasonable
if it.batchSize <= 0 || it.batchSize > loopIteratorBatchSizeLimit {
it.batchSize = loopIteratorBatchSizeLimit
}
it.curRows, err = it.doNextQuery(ctx)
if err != nil {
return err
}
defer func() {
if rowsErr := it.curRows.Err(); rowsErr != nil {
err = errs.Combine(err, rowsErr)
}
err = errs.Combine(err, it.failErr, it.curRows.Close())
}()
return fn(ctx, it)
}
// loopSegmentIterator enables iteration of all segments in metabase.
type loopSegmentIterator struct {
db *DB
batchSize int
asOfSystemTime time.Time
curIndex int
curRows tagsql.Rows
cursor loopSegmentIteratorCursor
// failErr is set when either scan or next query fails during iteration.
failErr error
}
type loopSegmentIteratorCursor struct {
StreamID uuid.UUID
Position SegmentPosition
}
// Next returns true if there was another item and copy it in item.
func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool {
next := it.curRows.Next()
if !next {
if it.curIndex < it.batchSize {
return false
}
if it.curRows.Err() != nil {
return false
}
rows, err := it.doNextQuery(ctx)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
if failErr := it.curRows.Close(); failErr != nil {
it.failErr = errs.Combine(it.failErr, failErr, rows.Close())
return false
}
it.curRows = rows
it.curIndex = 0
if !it.curRows.Next() {
return false
}
}
err := it.scanItem(ctx, item)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}
it.curIndex++
it.cursor.StreamID = item.StreamID
it.cursor.Position = item.Position
return true
}
func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
return it.db.db.Query(ctx, `
SELECT
stream_id, position,
created_at, repaired_at,
root_piece_id,
encrypted_size,
plain_offset, plain_size,
redundancy,
remote_alias_pieces
FROM segments
`+it.db.impl.AsOfSystemTime(it.asOfSystemTime)+`
WHERE
(stream_id, position) > ($1, $2)
ORDER BY (stream_id, position) ASC
LIMIT $3
`, it.cursor.StreamID, it.cursor.Position,
it.batchSize,
)
}
// scanItem scans doNextQuery results into LoopSegmentEntry.
func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) error {
var aliasPieces AliasPieces
err := it.curRows.Scan(
&item.StreamID, &item.Position,
&item.CreatedAt, &item.RepairedAt,
&item.RootPieceID,
&item.EncryptedSize,
&item.PlainOffset, &item.PlainSize,
redundancyScheme{&item.Redundancy},
&aliasPieces,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
item.Pieces, err = it.db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
}
return nil
}