diff --git a/satellite/metabase/rangedloop/provider.go b/satellite/metabase/rangedloop/provider.go index 417780ab0..75eafa06e 100644 --- a/satellite/metabase/rangedloop/provider.go +++ b/satellite/metabase/rangedloop/provider.go @@ -9,7 +9,8 @@ import ( "storj.io/storj/satellite/metabase/segmentloop" ) -// RangeSplitter gives a way to get non-overlapping ranges of segments concurrently. +// RangeSplitter splits a source of segments into ranges, +// so that multiple segments can be processed concurrently. // It usually abstracts over a database. // It is a subcomponent of the ranged segment loop. type RangeSplitter interface { diff --git a/satellite/metabase/rangedloop/providerdb.go b/satellite/metabase/rangedloop/providerdb.go new file mode 100644 index 000000000..d5f82c475 --- /dev/null +++ b/satellite/metabase/rangedloop/providerdb.go @@ -0,0 +1,107 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package rangedloop + +import ( + "context" + "time" + + "storj.io/common/uuid" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/segmentloop" +) + +// MetabaseRangeSplitter implements RangeSplitter. +type MetabaseRangeSplitter struct { + db *metabase.DB + + batchSize int +} + +// MetabaseSegmentProvider implements SegmentProvider. +type MetabaseSegmentProvider struct { + db *metabase.DB + + uuidRange UUIDRange + asOfSystemTime time.Time + batchSize int +} + +// NewMetabaseRangeSplitter creates the segment provider. +func NewMetabaseRangeSplitter(db *metabase.DB, batchSize int) MetabaseRangeSplitter { + return MetabaseRangeSplitter{ + db: db, + batchSize: batchSize, + } +} + +// CreateRanges splits the segment table into chunks. +func (provider *MetabaseRangeSplitter) CreateRanges(nRanges int, batchSize int) ([]SegmentProvider, error) { + uuidRanges, err := CreateUUIDRanges(uint32(nRanges)) + if err != nil { + return nil, err + } + + asOfSystemTime := time.Now() + + rangeProviders := []SegmentProvider{} + for _, uuidRange := range uuidRanges { + rangeProviders = append(rangeProviders, &MetabaseSegmentProvider{ + db: provider.db, + uuidRange: uuidRange, + asOfSystemTime: asOfSystemTime, + batchSize: batchSize, + }) + } + + return rangeProviders, err +} + +// Iterate loops over a part of the segment table. +func (provider *MetabaseSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error { + var startStreamID uuid.UUID + var endStreamID uuid.UUID + + if provider.uuidRange.Start != nil { + startStreamID = *provider.uuidRange.Start + } + if provider.uuidRange.End != nil { + endStreamID = *provider.uuidRange.End + } + + return provider.db.IterateLoopSegments(ctx, metabase.IterateLoopSegments{ + BatchSize: provider.batchSize, + AsOfSystemTime: provider.asOfSystemTime, + StartStreamID: startStreamID, + EndStreamID: endStreamID, + }, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error { + segments := make([]segmentloop.Segment, 0, provider.batchSize) + + segment := metabase.LoopSegmentEntry{} + for iterator.Next(ctx, &segment) { + err := ctx.Err() + if err != nil { + return err + } + + segments = append(segments, segmentloop.Segment(segment)) + + if len(segments) >= provider.batchSize { + err = fn(segments) + if err != nil { + return err + } + // prepare for next batch + segments = segments[:0] + } + } + + // send last batch + if len(segments) > 0 { + return fn(segments) + } + + return nil + }) +} diff --git a/satellite/metabase/rangedloop/providerdb_test.go b/satellite/metabase/rangedloop/providerdb_test.go new file mode 100644 index 000000000..1b9ff3486 --- /dev/null +++ b/satellite/metabase/rangedloop/providerdb_test.go @@ -0,0 +1,167 @@ +// Copyright (C) 2022 Storj Labs, Inc. +// See LICENSE for copying information. + +package rangedloop_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/common/uuid" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metabasetest" + "storj.io/storj/satellite/metabase/rangedloop" + "storj.io/storj/satellite/metabase/segmentloop" +) + +type in struct { + streamIDs []string + nRanges int + batchSize int +} + +type expected struct { + nBatches int + nSegments int +} + +func TestMetabaseSegementProvider(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + inouts := []struct { + in in + expected expected + }{ + { + in: in{ + streamIDs: []string{}, + nRanges: 1, + batchSize: 2, + }, + expected: expected{ + nBatches: 0, + nSegments: 0, + }, + }, + { + in: in{ + streamIDs: []string{ + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + }, + nRanges: 2, + batchSize: 2, + }, + expected: expected{ + nBatches: 1, + nSegments: 2, + }, + }, + { + in: in{ + streamIDs: []string{ + "00000000-0000-0000-0000-000000000001", + "f0000000-0000-0000-0000-000000000001", + }, + nRanges: 2, + batchSize: 2, + }, + expected: expected{ + nBatches: 2, + nSegments: 2, + }, + }, + { + in: in{ + streamIDs: []string{ + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + "f0000000-0000-0000-0000-000000000001", + "f0000000-0000-0000-0000-000000000002", + }, + nRanges: 2, + batchSize: 1, + }, + expected: expected{ + nBatches: 4, + nSegments: 4, + }, + }, + } + + for _, inout := range inouts { + runTest(ctx, t, db, inout.in, inout.expected) + } + }) +} + +func runTest(ctx *testcontext.Context, t *testing.T, db *metabase.DB, in in, expected expected) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + for _, streamID := range in.streamIDs { + u, err := uuid.FromString(streamID) + require.NoError(t, err) + createSegment(ctx, t, db, u) + } + + provider := rangedloop.NewMetabaseRangeSplitter(db, in.batchSize) + ranges, err := provider.CreateRanges(in.nRanges, in.batchSize) + require.NoError(t, err) + + nBatches := 0 + nSegments := 0 + for _, r := range ranges { + err = r.Iterate(ctx, func(segments []segmentloop.Segment) error { + nBatches++ + nSegments += len(segments) + return nil + }) + require.NoError(t, err) + } + + require.Equal(t, expected.nSegments, nSegments) + require.Equal(t, expected.nBatches, nBatches) +} + +func createSegment(ctx *testcontext.Context, t testing.TB, db *metabase.DB, streamID uuid.UUID) { + obj := metabasetest.RandObjectStream() + obj.StreamID = streamID + + pos := metabase.SegmentPosition{Part: 0, Index: 0} + data := testrand.Bytes(32) + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: pos, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + + InlineData: data, + }, + }.Check(ctx, t, db) + + metabasetest.CommitObjectWithSegments{ + Opts: metabase.CommitObjectWithSegments{ + ObjectStream: obj, + Segments: []metabase.SegmentPosition{ + {Part: 0, Index: 0}, + }, + }, + }.Check(ctx, t, db) +}