satellite/metabase/metaloop: use database time
The system and database time may drift. We should use database time for absolute "as of system time" to ensure that it's not newer than the current database time. When the "as of system time" is in the future, then the query will fail. Change-Id: I5423f6aaad966ca03a76b5ff805bfba932e44a51
This commit is contained in:
parent
60ff87a7d7
commit
2af7e4ef26
@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
_ "github.com/jackc/pgx/v4" // registers pgx as a tagsql driver.
|
||||
_ "github.com/jackc/pgx/v4/stdlib" // registers pgx as a tagsql driver.
|
||||
@ -415,3 +416,10 @@ func (pq postgresRebind) Rebind(sql string) string {
|
||||
|
||||
return string(out)
|
||||
}
|
||||
|
||||
// Now returns time on the database.
|
||||
func (db *DB) Now(ctx context.Context) (time.Time, error) {
|
||||
var t time.Time
|
||||
err := db.db.QueryRowContext(ctx, `SELECT now()`).Scan(&t)
|
||||
return t, Error.Wrap(err)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ package metabase_test
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@ -116,3 +117,12 @@ func TestMigrateToAliases(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNow(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
sysnow := time.Now()
|
||||
now, err := db.Now(ctx)
|
||||
require.NoError(t, err)
|
||||
require.WithinDuration(t, sysnow, now, 5*time.Second)
|
||||
})
|
||||
}
|
@ -167,6 +167,8 @@ type Config struct {
|
||||
|
||||
// MetabaseDB contains iterators for the metabase data.
|
||||
type MetabaseDB interface {
|
||||
// Now returns the time on the database.
|
||||
Now(ctx context.Context) (time.Time, error)
|
||||
// IterateLoopObjects iterates through all objects in metabase for metainfo loop purpose.
|
||||
IterateLoopObjects(ctx context.Context, opts metabase.IterateLoopObjects, fn func(context.Context, metabase.LoopObjectsIterator) error) (err error)
|
||||
// IterateLoopStreams iterates through all streams passed in as arguments.
|
||||
@ -372,7 +374,10 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
|
||||
limit = batchsizeLimit
|
||||
}
|
||||
|
||||
startingTime := time.Now()
|
||||
startingTime, err := metabaseDB.Now(ctx)
|
||||
if err != nil {
|
||||
return observers, Error.Wrap(err)
|
||||
}
|
||||
|
||||
noObserversErr := errs.New("no observers")
|
||||
|
||||
|
@ -131,6 +131,9 @@ type Config struct {
|
||||
type MetabaseDB interface {
|
||||
io.Closer
|
||||
|
||||
// Now returns time on the database.
|
||||
Now(ctx context.Context) (time.Time, error)
|
||||
|
||||
// MigrateToLatest migrates to latest schema version.
|
||||
MigrateToLatest(ctx context.Context) error
|
||||
// CheckVersion checks the database is the correct version
|
||||
|
Loading…
Reference in New Issue
Block a user