diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index 641bd922b..8dedb822b 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -369,3 +369,23 @@ func (db *DB) Now(ctx context.Context) (time.Time, error) { err := db.db.QueryRowContext(ctx, `SELECT now()`).Scan(&t) return t, Error.Wrap(err) } + +func (db *DB) asOfTime(asOfSystemTime time.Time, asOfSystemInterval time.Duration) string { + return limitedAsOfSystemTime(db.impl, time.Now(), asOfSystemTime, asOfSystemInterval) +} + +func limitedAsOfSystemTime(impl dbutil.Implementation, now, baseline time.Time, maxInterval time.Duration) string { + if baseline.IsZero() || now.IsZero() { + return impl.AsOfSystemInterval(maxInterval) + } + + interval := now.Sub(baseline) + if interval < 0 { + return "" + } + // maxInterval is negative + if maxInterval < 0 && interval > -maxInterval { + return impl.AsOfSystemInterval(maxInterval) + } + return impl.AsOfSystemTime(baseline) +} diff --git a/satellite/metabase/db_internal_test.go b/satellite/metabase/db_internal_test.go new file mode 100644 index 000000000..5413f531f --- /dev/null +++ b/satellite/metabase/db_internal_test.go @@ -0,0 +1,71 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/private/dbutil" +) + +func TestLimitedAsOfSystemTime(t *testing.T) { + const ( + unixNano = 1623324728961910000 + unixNanoStr = `1623324728961910000` + ) + + check := func(expect string, startNano, baselineNano int64, maxInterval time.Duration) { + var start, baseline time.Time + if startNano != 0 { + start = time.Unix(0, startNano) + } + if baselineNano != 0 { + baseline = time.Unix(0, baselineNano) + } + result := limitedAsOfSystemTime(dbutil.Cockroach, start, baseline, maxInterval) + require.Equal(t, expect, result) + } + + // baseline in the future + check("", + unixNano-time.Second.Nanoseconds(), + unixNano, + 0, + ) + + // ignore interval when positive or zero + check(" AS OF SYSTEM TIME '"+unixNanoStr+"' ", + unixNano+time.Second.Nanoseconds(), + unixNano, + 0, + ) + check(" AS OF SYSTEM TIME '"+unixNanoStr+"' ", + unixNano+time.Second.Nanoseconds(), + unixNano, + 2*time.Second, + ) + + // ignore interval when it doesn't exceed the time difference + check(" AS OF SYSTEM TIME '"+unixNanoStr+"' ", + unixNano+time.Second.Nanoseconds(), + unixNano, + -time.Second, + ) + + // limit to interval when the time between now and baseline is large + check(" AS OF SYSTEM TIME '-1s' ", + unixNano+time.Minute.Nanoseconds(), + unixNano, + -time.Second, + ) + + // ignore now and baseline when either is zero + check(" AS OF SYSTEM TIME '-1s' ", 0, unixNano, -time.Second) + check(" AS OF SYSTEM TIME '-1s' ", unixNano, 0, -time.Second) + check("", unixNano, 0, 0) + check("", 0, unixNano, 0) +} diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index b05600db9..01a446ffb 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -329,14 +329,6 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h return nil } -func (db *DB) asOfTime(asOfSystemTime time.Time, asOfSystemInterval time.Duration) string { - interval := time.Since(asOfSystemTime) - if asOfSystemInterval < 0 && interval > -asOfSystemInterval { - return db.impl.AsOfSystemInterval(asOfSystemInterval) - } - return db.impl.AsOfSystemTime(asOfSystemTime) -} - // LoopSegmentsIterator iterates over a sequence of LoopSegmentEntry items. type LoopSegmentsIterator interface { Next(ctx context.Context, item *LoopSegmentEntry) bool @@ -344,8 +336,9 @@ type LoopSegmentsIterator interface { // IterateLoopSegments contains arguments necessary for listing segments in metabase. type IterateLoopSegments struct { - BatchSize int - AsOfSystemTime time.Time + BatchSize int + AsOfSystemTime time.Time + AsOfSystemInterval time.Duration } // Verify verifies segments request fields. @@ -367,8 +360,9 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, it := &loopSegmentIterator{ db: db, - asOfSystemTime: opts.AsOfSystemTime, - batchSize: opts.BatchSize, + asOfSystemTime: opts.AsOfSystemTime, + asOfSystemInterval: opts.AsOfSystemInterval, + batchSize: opts.BatchSize, curIndex: 0, cursor: loopSegmentIteratorCursor{}, @@ -398,8 +392,9 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, type loopSegmentIterator struct { db *DB - batchSize int - asOfSystemTime time.Time + batchSize int + asOfSystemTime time.Time + asOfSystemInterval time.Duration curIndex int curRows tagsql.Rows @@ -470,7 +465,7 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, redundancy, remote_alias_pieces FROM segments - `+it.db.impl.AsOfSystemTime(it.asOfSystemTime)+` + `+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+` WHERE (stream_id, position) > ($1, $2) ORDER BY (stream_id, position) ASC diff --git a/satellite/metabase/metaloop/service.go b/satellite/metabase/metaloop/service.go index 3df89563f..c5de38b95 100644 --- a/satellite/metabase/metaloop/service.go +++ b/satellite/metabase/metaloop/service.go @@ -164,7 +164,7 @@ type Config struct { RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"` ListLimit int `help:"how many items to query in a batch" default:"2500" testDefault:"10000"` - AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-5m"` + AsOfSystemInterval time.Duration `help:"as of system interval" default:"-5m"` } // MetabaseDB contains iterators for the metabase data. diff --git a/satellite/metabase/segmentloop/service.go b/satellite/metabase/segmentloop/service.go index cff950208..b34bd2428 100644 --- a/satellite/metabase/segmentloop/service.go +++ b/satellite/metabase/segmentloop/service.go @@ -134,6 +134,8 @@ type Config struct { CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s" testDefault:"1s"` RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"` ListLimit int `help:"how many items to query in a batch" default:"2500"` + + AsOfSystemInterval time.Duration `help:"as of system interval" default:"-5m"` } // MetabaseDB contains iterators for the metabase data. @@ -364,8 +366,9 @@ func (loop *Service) iterateSegments(ctx context.Context, observers []*observerC var segmentsProcessed int64 err = loop.metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{ - BatchSize: limit, - AsOfSystemTime: startingTime, + BatchSize: limit, + AsOfSystemTime: startingTime, + AsOfSystemInterval: loop.config.AsOfSystemInterval, }, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error { defer mon.TaskNamed("iterateLoopSegmentsCB")(&ctx)(&err) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 635b172e5..11d8cf8a6 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -424,6 +424,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # redundancy scheme configuration in the format k/m/o/n-sharesize # metainfo.rs: 29/35/80/110-256 B +# as of system interval +# metainfo.segment-loop.as-of-system-interval: -5m0s + # how long to wait for new observers before starting iteration # metainfo.segment-loop.coalesce-duration: 5s