satellite/metabase/segmentloop: limit max interval
Ensure that we don't query too far in the history, which slows things down. Change-Id: Ia77aa522f7f4c5d43629d51bb9a51a49fab6fa14
This commit is contained in:
parent
bc79f01aaa
commit
f3a52d1da5
@ -369,3 +369,23 @@ func (db *DB) Now(ctx context.Context) (time.Time, error) {
|
|||||||
err := db.db.QueryRowContext(ctx, `SELECT now()`).Scan(&t)
|
err := db.db.QueryRowContext(ctx, `SELECT now()`).Scan(&t)
|
||||||
return t, Error.Wrap(err)
|
return t, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) asOfTime(asOfSystemTime time.Time, asOfSystemInterval time.Duration) string {
|
||||||
|
return limitedAsOfSystemTime(db.impl, time.Now(), asOfSystemTime, asOfSystemInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
func limitedAsOfSystemTime(impl dbutil.Implementation, now, baseline time.Time, maxInterval time.Duration) string {
|
||||||
|
if baseline.IsZero() || now.IsZero() {
|
||||||
|
return impl.AsOfSystemInterval(maxInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
interval := now.Sub(baseline)
|
||||||
|
if interval < 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
// maxInterval is negative
|
||||||
|
if maxInterval < 0 && interval > -maxInterval {
|
||||||
|
return impl.AsOfSystemInterval(maxInterval)
|
||||||
|
}
|
||||||
|
return impl.AsOfSystemTime(baseline)
|
||||||
|
}
|
||||||
|
71
satellite/metabase/db_internal_test.go
Normal file
71
satellite/metabase/db_internal_test.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
// Copyright (C) 2021 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package metabase
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"storj.io/private/dbutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLimitedAsOfSystemTime(t *testing.T) {
|
||||||
|
const (
|
||||||
|
unixNano = 1623324728961910000
|
||||||
|
unixNanoStr = `1623324728961910000`
|
||||||
|
)
|
||||||
|
|
||||||
|
check := func(expect string, startNano, baselineNano int64, maxInterval time.Duration) {
|
||||||
|
var start, baseline time.Time
|
||||||
|
if startNano != 0 {
|
||||||
|
start = time.Unix(0, startNano)
|
||||||
|
}
|
||||||
|
if baselineNano != 0 {
|
||||||
|
baseline = time.Unix(0, baselineNano)
|
||||||
|
}
|
||||||
|
result := limitedAsOfSystemTime(dbutil.Cockroach, start, baseline, maxInterval)
|
||||||
|
require.Equal(t, expect, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// baseline in the future
|
||||||
|
check("",
|
||||||
|
unixNano-time.Second.Nanoseconds(),
|
||||||
|
unixNano,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
|
||||||
|
// ignore interval when positive or zero
|
||||||
|
check(" AS OF SYSTEM TIME '"+unixNanoStr+"' ",
|
||||||
|
unixNano+time.Second.Nanoseconds(),
|
||||||
|
unixNano,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
check(" AS OF SYSTEM TIME '"+unixNanoStr+"' ",
|
||||||
|
unixNano+time.Second.Nanoseconds(),
|
||||||
|
unixNano,
|
||||||
|
2*time.Second,
|
||||||
|
)
|
||||||
|
|
||||||
|
// ignore interval when it doesn't exceed the time difference
|
||||||
|
check(" AS OF SYSTEM TIME '"+unixNanoStr+"' ",
|
||||||
|
unixNano+time.Second.Nanoseconds(),
|
||||||
|
unixNano,
|
||||||
|
-time.Second,
|
||||||
|
)
|
||||||
|
|
||||||
|
// limit to interval when the time between now and baseline is large
|
||||||
|
check(" AS OF SYSTEM TIME '-1s' ",
|
||||||
|
unixNano+time.Minute.Nanoseconds(),
|
||||||
|
unixNano,
|
||||||
|
-time.Second,
|
||||||
|
)
|
||||||
|
|
||||||
|
// ignore now and baseline when either is zero
|
||||||
|
check(" AS OF SYSTEM TIME '-1s' ", 0, unixNano, -time.Second)
|
||||||
|
check(" AS OF SYSTEM TIME '-1s' ", unixNano, 0, -time.Second)
|
||||||
|
check("", unixNano, 0, 0)
|
||||||
|
check("", 0, unixNano, 0)
|
||||||
|
}
|
@ -329,14 +329,6 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) asOfTime(asOfSystemTime time.Time, asOfSystemInterval time.Duration) string {
|
|
||||||
interval := time.Since(asOfSystemTime)
|
|
||||||
if asOfSystemInterval < 0 && interval > -asOfSystemInterval {
|
|
||||||
return db.impl.AsOfSystemInterval(asOfSystemInterval)
|
|
||||||
}
|
|
||||||
return db.impl.AsOfSystemTime(asOfSystemTime)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoopSegmentsIterator iterates over a sequence of LoopSegmentEntry items.
|
// LoopSegmentsIterator iterates over a sequence of LoopSegmentEntry items.
|
||||||
type LoopSegmentsIterator interface {
|
type LoopSegmentsIterator interface {
|
||||||
Next(ctx context.Context, item *LoopSegmentEntry) bool
|
Next(ctx context.Context, item *LoopSegmentEntry) bool
|
||||||
@ -344,8 +336,9 @@ type LoopSegmentsIterator interface {
|
|||||||
|
|
||||||
// IterateLoopSegments contains arguments necessary for listing segments in metabase.
|
// IterateLoopSegments contains arguments necessary for listing segments in metabase.
|
||||||
type IterateLoopSegments struct {
|
type IterateLoopSegments struct {
|
||||||
BatchSize int
|
BatchSize int
|
||||||
AsOfSystemTime time.Time
|
AsOfSystemTime time.Time
|
||||||
|
AsOfSystemInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify verifies segments request fields.
|
// Verify verifies segments request fields.
|
||||||
@ -367,8 +360,9 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
|
|||||||
it := &loopSegmentIterator{
|
it := &loopSegmentIterator{
|
||||||
db: db,
|
db: db,
|
||||||
|
|
||||||
asOfSystemTime: opts.AsOfSystemTime,
|
asOfSystemTime: opts.AsOfSystemTime,
|
||||||
batchSize: opts.BatchSize,
|
asOfSystemInterval: opts.AsOfSystemInterval,
|
||||||
|
batchSize: opts.BatchSize,
|
||||||
|
|
||||||
curIndex: 0,
|
curIndex: 0,
|
||||||
cursor: loopSegmentIteratorCursor{},
|
cursor: loopSegmentIteratorCursor{},
|
||||||
@ -398,8 +392,9 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
|
|||||||
type loopSegmentIterator struct {
|
type loopSegmentIterator struct {
|
||||||
db *DB
|
db *DB
|
||||||
|
|
||||||
batchSize int
|
batchSize int
|
||||||
asOfSystemTime time.Time
|
asOfSystemTime time.Time
|
||||||
|
asOfSystemInterval time.Duration
|
||||||
|
|
||||||
curIndex int
|
curIndex int
|
||||||
curRows tagsql.Rows
|
curRows tagsql.Rows
|
||||||
@ -470,7 +465,7 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows,
|
|||||||
redundancy,
|
redundancy,
|
||||||
remote_alias_pieces
|
remote_alias_pieces
|
||||||
FROM segments
|
FROM segments
|
||||||
`+it.db.impl.AsOfSystemTime(it.asOfSystemTime)+`
|
`+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+`
|
||||||
WHERE
|
WHERE
|
||||||
(stream_id, position) > ($1, $2)
|
(stream_id, position) > ($1, $2)
|
||||||
ORDER BY (stream_id, position) ASC
|
ORDER BY (stream_id, position) ASC
|
||||||
|
@ -164,7 +164,7 @@ type Config struct {
|
|||||||
RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"`
|
RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"`
|
||||||
ListLimit int `help:"how many items to query in a batch" default:"2500" testDefault:"10000"`
|
ListLimit int `help:"how many items to query in a batch" default:"2500" testDefault:"10000"`
|
||||||
|
|
||||||
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-5m"`
|
AsOfSystemInterval time.Duration `help:"as of system interval" default:"-5m"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetabaseDB contains iterators for the metabase data.
|
// MetabaseDB contains iterators for the metabase data.
|
||||||
|
@ -134,6 +134,8 @@ type Config struct {
|
|||||||
CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s" testDefault:"1s"`
|
CoalesceDuration time.Duration `help:"how long to wait for new observers before starting iteration" releaseDefault:"5s" devDefault:"5s" testDefault:"1s"`
|
||||||
RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"`
|
RateLimit float64 `help:"rate limit (default is 0 which is unlimited segments per second)" default:"0"`
|
||||||
ListLimit int `help:"how many items to query in a batch" default:"2500"`
|
ListLimit int `help:"how many items to query in a batch" default:"2500"`
|
||||||
|
|
||||||
|
AsOfSystemInterval time.Duration `help:"as of system interval" default:"-5m"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetabaseDB contains iterators for the metabase data.
|
// MetabaseDB contains iterators for the metabase data.
|
||||||
@ -364,8 +366,9 @@ func (loop *Service) iterateSegments(ctx context.Context, observers []*observerC
|
|||||||
var segmentsProcessed int64
|
var segmentsProcessed int64
|
||||||
|
|
||||||
err = loop.metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
|
err = loop.metabaseDB.IterateLoopSegments(ctx, metabase.IterateLoopSegments{
|
||||||
BatchSize: limit,
|
BatchSize: limit,
|
||||||
AsOfSystemTime: startingTime,
|
AsOfSystemTime: startingTime,
|
||||||
|
AsOfSystemInterval: loop.config.AsOfSystemInterval,
|
||||||
}, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error {
|
}, func(ctx context.Context, iterator metabase.LoopSegmentsIterator) error {
|
||||||
defer mon.TaskNamed("iterateLoopSegmentsCB")(&ctx)(&err)
|
defer mon.TaskNamed("iterateLoopSegmentsCB")(&ctx)(&err)
|
||||||
|
|
||||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -424,6 +424,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
|||||||
# redundancy scheme configuration in the format k/m/o/n-sharesize
|
# redundancy scheme configuration in the format k/m/o/n-sharesize
|
||||||
# metainfo.rs: 29/35/80/110-256 B
|
# metainfo.rs: 29/35/80/110-256 B
|
||||||
|
|
||||||
|
# as of system interval
|
||||||
|
# metainfo.segment-loop.as-of-system-interval: -5m0s
|
||||||
|
|
||||||
# how long to wait for new observers before starting iteration
|
# how long to wait for new observers before starting iteration
|
||||||
# metainfo.segment-loop.coalesce-duration: 5s
|
# metainfo.segment-loop.coalesce-duration: 5s
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user