satellite/metabase/rangedloop: database abstraction (#5337)

Add an abstraction rangedloop.SegmentProvider to fetch chunks of
segments from the metainfo database in parallel.

Part of https://github.com/storj/storj/issues/5223

Change-Id: Ife26467ea0c3be550bde0b05464ef1db62dd4d2a
This commit is contained in:
Erik van Velzen 2022-12-09 03:01:07 +01:00 committed by GitHub
parent 9fedc21fea
commit 3cf7ebfad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 276 additions and 1 deletions

View File

@ -9,7 +9,8 @@ import (
"storj.io/storj/satellite/metabase/segmentloop"
)
// RangeSplitter gives a way to get non-overlapping ranges of segments concurrently.
// RangeSplitter splits a source of segments into ranges,
// so that multiple segments can be processed concurrently.
// It usually abstracts over a database.
// It is a subcomponent of the ranged segment loop.
type RangeSplitter interface {

View File

@ -0,0 +1,107 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedloop
import (
"context"
"time"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)
// MetabaseRangeSplitter implements RangeSplitter.
type MetabaseRangeSplitter struct {
db *metabase.DB
batchSize int
}
// MetabaseSegmentProvider implements SegmentProvider.
type MetabaseSegmentProvider struct {
db *metabase.DB
uuidRange UUIDRange
asOfSystemTime time.Time
batchSize int
}
// NewMetabaseRangeSplitter creates the segment provider.
func NewMetabaseRangeSplitter(db *metabase.DB, batchSize int) MetabaseRangeSplitter {
return MetabaseRangeSplitter{
db: db,
batchSize: batchSize,
}
}
// CreateRanges splits the segment table into chunks.
func (provider *MetabaseRangeSplitter) CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error) {
uuidRanges, err := CreateUUIDRanges(uint32(nRanges))
if err != nil {
return nil, err
}
asOfSystemTime := time.Now()
rangeProviders := []SegmentProvider{}
for _, uuidRange := range uuidRanges {
rangeProviders = append(rangeProviders, &MetabaseSegmentProvider{
db: provider.db,
uuidRange: uuidRange,
asOfSystemTime: asOfSystemTime,
batchSize: batchSize,
})
}
return rangeProviders, err
}
// Iterate loops over a part of the segment table.
func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
var startStreamID uuid.UUID
var endStreamID uuid.UUID
if provider.uuidRange.Start != nil {
startStreamID = *provider.uuidRange.Start
}
if provider.uuidRange.End != nil {
endStreamID = *provider.uuidRange.End
}
return provider.db.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
BatchSize: provider.batchSize,
AsOfSystemTime: provider.asOfSystemTime,
StartStreamID: startStreamID,
EndStreamID: endStreamID,
}, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error {
segments := make([]segmentloop.Segment, 0, provider.batchSize)
segment := metabase.LoopSegmentEntry{}
for iterator.Next(ctx, &segment) {
err := ctx.Err()
if err != nil {
return err
}
segments = append(segments, segmentloop.Segment(segment))
if len(segments) >= provider.batchSize {
err = fn(segments)
if err != nil {
return err
}
// prepare for next batch
segments = segments[:0]
}
}
// send last batch
if len(segments) > 0 {
return fn(segments)
}
return nil
})
}

View File

@ -0,0 +1,167 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package rangedloop_test
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)
type in struct {
streamIDs []string
nRanges int
batchSize int
}
type expected struct {
nBatches int
nSegments int
}
func TestMetabaseSegementProvider(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
inouts := []struct {
in in
expected expected
}{
{
in: in{
streamIDs: []string{},
nRanges: 1,
batchSize: 2,
},
expected: expected{
nBatches: 0,
nSegments: 0,
},
},
{
in: in{
streamIDs: []string{
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
},
nRanges: 2,
batchSize: 2,
},
expected: expected{
nBatches: 1,
nSegments: 2,
},
},
{
in: in{
streamIDs: []string{
"00000000-0000-0000-0000-000000000001",
"f0000000-0000-0000-0000-000000000001",
},
nRanges: 2,
batchSize: 2,
},
expected: expected{
nBatches: 2,
nSegments: 2,
},
},
{
in: in{
streamIDs: []string{
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
"f0000000-0000-0000-0000-000000000001",
"f0000000-0000-0000-0000-000000000002",
},
nRanges: 2,
batchSize: 1,
},
expected: expected{
nBatches: 4,
nSegments: 4,
},
},
}
for _, inout := range inouts {
runTest(ctx, t, db, inout.in, inout.expected)
}
})
}
func runTest(ctx *testcontext.Context, t *testing.T, db *metabase.DB, in in, expected expected) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
for _, streamID := range in.streamIDs {
u, err := uuid.FromString(streamID)
require.NoError(t, err)
createSegment(ctx, t, db, u)
}
provider := rangedloop.NewMetabaseRangeSplitter(db, in.batchSize)
ranges, err := provider.CreateRanges(in.nRanges, in.batchSize)
require.NoError(t, err)
nBatches := 0
nSegments := 0
for _, r := range ranges {
err = r.Iterate(ctx, func(segments []segmentloop.Segment) error {
nBatches++
nSegments += len(segments)
return nil
})
require.NoError(t, err)
}
require.Equal(t, expected.nSegments, nSegments)
require.Equal(t, expected.nBatches, nBatches)
}
func createSegment(ctx *testcontext.Context, t testing.TB, db *metabase.DB, streamID uuid.UUID) {
obj := metabasetest.RandObjectStream()
obj.StreamID = streamID
pos := metabase.SegmentPosition{Part: 0, Index: 0}
data := testrand.Bytes(32)
encryptedKey := testrand.Bytes(32)
encryptedKeyNonce := testrand.Bytes(32)
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
},
Version: 1,
}.Check(ctx, t, db)
metabasetest.CommitInlineSegment{
Opts: metabase.CommitInlineSegment{
ObjectStream: obj,
Position: pos,
EncryptedKey: encryptedKey,
EncryptedKeyNonce: encryptedKeyNonce,
PlainSize: 512,
PlainOffset: 0,
InlineData: data,
},
}.Check(ctx, t, db)
metabasetest.CommitObjectWithSegments{
Opts: metabase.CommitObjectWithSegments{
ObjectStream: obj,
Segments: []metabase.SegmentPosition{
{Part: 0, Index: 0},
},
},
}.Check(ctx, t, db)
}