satellite/{metabase,satellitedb}: deduplicate AS OF SYSTEM TIME code

Currently we were duplicating code for AS OF SYSTEM TIME in several
places. This replaces the code with using a method on
dbutil.Implementation.

As a consequence it's more useful to use a shorter name for
implementation - 'impl' should be sufficiently clear in the context.

Similarly, using AsOfSystemInterval and AsOfSystemTime to distinguish
between the two modes is useful and slightly shorter without causing
confusion.

Change-Id: Idefe55528efa758b6176591017b6572a8d443e3d
This commit is contained in:
Egon Elbre 2021-05-11 11:49:26 +03:00
parent 033006403f
commit 0858c3797a
17 changed files with 107 additions and 148 deletions

2
go.mod
View File

@ -51,6 +51,6 @@ require (
storj.io/common v0.0.0-20210429174118-60091ebbbdaf storj.io/common v0.0.0-20210429174118-60091ebbbdaf
storj.io/drpc v0.0.20 storj.io/drpc v0.0.20
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7
storj.io/private v0.0.0-20210429173958-0e792382d191 storj.io/private v0.0.0-20210511083637-239fca6e9894
storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9 storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9
) )

4
go.sum
View File

@ -850,7 +850,7 @@ storj.io/drpc v0.0.20/go.mod h1:eAxUDk8HWvGl9iqznpuphtZ+WIjIGPJFqNXuKHgRiMM=
storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80= storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 h1:zi0w9zoBfvuqysSAqxJT1Ton2YB5IhyMM3/3CISjlrQ= storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 h1:zi0w9zoBfvuqysSAqxJT1Ton2YB5IhyMM3/3CISjlrQ=
storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80= storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80=
storj.io/private v0.0.0-20210429173958-0e792382d191 h1:3G4zwDJB/+71A5ahHldelS/TzBkUIuRTn8qwLTD2vck= storj.io/private v0.0.0-20210511083637-239fca6e9894 h1:ANILx94AKXmvXAf+hs0HMb85Qi2Y4k7RjEb9S7OhK+M=
storj.io/private v0.0.0-20210429173958-0e792382d191/go.mod h1:iAc+LGwXYCe+YRRTlkfkg95ZBEL8pWHLVZ508/KQjOs= storj.io/private v0.0.0-20210511083637-239fca6e9894/go.mod h1:iAc+LGwXYCe+YRRTlkfkg95ZBEL8pWHLVZ508/KQjOs=
storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9 h1:rSP8cSfLqkYtRLUlGkwj1CeafNf7YPRgTstgwHu0tC8= storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9 h1:rSP8cSfLqkYtRLUlGkwj1CeafNf7YPRgTstgwHu0tC8=
storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9/go.mod h1:VzJd+P1sfcVnGCxm0mPPhOBkbov0gLZ+/QXeKkkZ1tI= storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9/go.mod h1:VzJd+P1sfcVnGCxm0mPPhOBkbov0gLZ+/QXeKkkZ1tI=

View File

@ -39,18 +39,18 @@ type multinodeDB struct {
log *zap.Logger log *zap.Logger
driver string driver string
implementation dbutil.Implementation impl dbutil.Implementation
source string source string
} }
// Open creates instance of database supports postgres. // Open creates instance of database supports postgres.
func Open(ctx context.Context, log *zap.Logger, databaseURL string) (multinode.DB, error) { func Open(ctx context.Context, log *zap.Logger, databaseURL string) (multinode.DB, error) {
driver, source, implementation, err := dbutil.SplitConnStr(databaseURL) driver, source, impl, err := dbutil.SplitConnStr(databaseURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
switch implementation { switch impl {
case dbutil.SQLite3: case dbutil.SQLite3:
source = sqlite3SetDefaultOptions(source) source = sqlite3SetDefaultOptions(source)
case dbutil.Postgres: case dbutil.Postgres:
@ -77,7 +77,7 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string) (multinode.D
log: log, log: log,
driver: driver, driver: driver,
implementation: implementation, impl: impl,
source: source, source: source,
} }

View File

@ -31,7 +31,7 @@ type DB struct {
log *zap.Logger log *zap.Logger
db tagsql.DB db tagsql.DB
connstr string connstr string
implementation dbutil.Implementation impl dbutil.Implementation
aliasCache *NodeAliasCache aliasCache *NodeAliasCache
} }
@ -47,7 +47,7 @@ func Open(ctx context.Context, log *zap.Logger, driverName, connstr string) (*DB
db := &DB{log: log, connstr: connstr, db: postgresRebind{rawdb}} db := &DB{log: log, connstr: connstr, db: postgresRebind{rawdb}}
db.aliasCache = NewNodeAliasCache(db) db.aliasCache = NewNodeAliasCache(db)
_, _, db.implementation, err = dbutil.SplitConnStr(connstr) _, _, db.impl, err = dbutil.SplitConnStr(connstr)
if err != nil { if err != nil {
return nil, Error.Wrap(err) return nil, Error.Wrap(err)
} }
@ -94,7 +94,7 @@ func (db *DB) MigrateToLatest(ctx context.Context) error {
// will need to create the database it was told to connect to. These things should // will need to create the database it was told to connect to. These things should
// not really be here, and instead should be assumed to exist. // not really be here, and instead should be assumed to exist.
// This is tracked in jira ticket SM-200 // This is tracked in jira ticket SM-200
switch db.implementation { switch db.impl {
case dbutil.Postgres: case dbutil.Postgres:
schema, err := pgutil.ParseSchemaFromConnstr(db.connstr) schema, err := pgutil.ParseSchemaFromConnstr(db.connstr)
if err != nil { if err != nil {

View File

@ -222,7 +222,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa
} }
var query string var query string
switch db.implementation { switch db.impl {
case dbutil.Cockroach: case dbutil.Cockroach:
query = ` query = `
WITH deleted_objects AS ( WITH deleted_objects AS (
@ -299,7 +299,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
` `
default: default:
return DeleteObjectResult{}, Error.New("invalid dbType: %v", db.implementation) return DeleteObjectResult{}, Error.New("unhandled database: %v", db.impl)
} }
err = withRows(db.db.Query(ctx, query, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)))(func(rows tagsql.Rows) error { err = withRows(db.db.Query(ctx, query, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)))(func(rows tagsql.Rows) error {
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)

View File

@ -39,7 +39,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
} }
var query string var query string
switch db.implementation { switch db.impl {
case dbutil.Cockroach: case dbutil.Cockroach:
query = ` query = `
WITH deleted_objects AS ( WITH deleted_objects AS (
@ -67,7 +67,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
` `
default: default:
return deletedObjectCount, Error.New("invalid dbType: %v", db.implementation) return deletedObjectCount, Error.New("unhandled database: %v", db.impl)
} }
// TODO: fix the count for objects without segments // TODO: fix the count for objects without segments

View File

@ -6,7 +6,6 @@ package metabase
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt"
"time" "time"
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
@ -14,7 +13,6 @@ import (
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/private/dbutil"
"storj.io/private/tagsql" "storj.io/private/tagsql"
) )
@ -33,18 +31,13 @@ type DeleteExpiredObjects struct {
func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObjects) (err error) { func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObjects) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
var asOfSystemTimeString string
if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach {
asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano())
}
return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) { return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) {
query := ` query := `
SELECT SELECT
project_id, bucket_name, object_key, version, stream_id, project_id, bucket_name, object_key, version, stream_id,
expires_at expires_at
FROM objects FROM objects
` + asOfSystemTimeString + ` ` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + `
WHERE WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND expires_at < $5 AND expires_at < $5
@ -104,17 +97,12 @@ type DeleteZombieObjects struct {
func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) (err error) { func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
var asOfSystemTimeString string
if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach {
asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano())
}
return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) { return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) {
query := ` query := `
SELECT SELECT
project_id, bucket_name, object_key, version, stream_id project_id, bucket_name, object_key, version, stream_id
FROM objects FROM objects
` + asOfSystemTimeString + ` ` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + `
WHERE WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND status = ` + pendingStatus + ` AND status = ` + pendingStatus + `

View File

@ -6,7 +6,6 @@ package metabase
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"sort" "sort"
"time" "time"
@ -14,7 +13,6 @@ import (
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/uuid" "storj.io/common/uuid"
"storj.io/private/dbutil"
"storj.io/private/dbutil/pgutil" "storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql" "storj.io/private/tagsql"
) )
@ -154,11 +152,6 @@ func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool {
func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) { func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
var asOfSystemTime string
if !it.asOfSystemTime.IsZero() && it.db.implementation == dbutil.Cockroach {
asOfSystemTime = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, it.asOfSystemTime.UnixNano())
}
return it.db.db.Query(ctx, ` return it.db.db.Query(ctx, `
SELECT SELECT
project_id, bucket_name, project_id, bucket_name,
@ -168,7 +161,7 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err
segment_count, segment_count,
LENGTH(COALESCE(encrypted_metadata,'')) LENGTH(COALESCE(encrypted_metadata,''))
FROM objects FROM objects
`+asOfSystemTime+` `+it.db.impl.AsOfSystemTime(it.asOfSystemTime)+`
WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC
LIMIT $5 LIMIT $5
@ -241,11 +234,6 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
bytesIDs[i] = id[:] bytesIDs[i] = id[:]
} }
var asOfSystemTime string
if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach {
asOfSystemTime = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano())
}
rows, err := db.db.Query(ctx, ` rows, err := db.db.Query(ctx, `
SELECT SELECT
stream_id, position, stream_id, position,
@ -256,7 +244,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
redundancy, redundancy,
remote_alias_pieces remote_alias_pieces
FROM segments FROM segments
`+asOfSystemTime+` `+db.impl.AsOfSystemTime(opts.AsOfSystemTime)+`
WHERE WHERE
-- this turns out to be a little bit faster than stream_id IN (SELECT unnest($1::BYTEA[])) -- this turns out to be a little bit faster than stream_id IN (SELECT unnest($1::BYTEA[]))
stream_id = ANY ($1::BYTEA[]) stream_id = ANY ($1::BYTEA[])

View File

@ -136,7 +136,7 @@ type FindStorageNodesRequest struct {
RequestedCount int RequestedCount int
ExcludedIDs []storj.NodeID ExcludedIDs []storj.NodeID
MinimumVersion string // semver or empty MinimumVersion string // semver or empty
AsOfSystemTimeInterval time.Duration // only used for CRDB queries AsOfSystemInterval time.Duration // only used for CRDB queries
} }
// NodeCriteria are the requirements for selecting nodes. // NodeCriteria are the requirements for selecting nodes.
@ -147,7 +147,7 @@ type NodeCriteria struct {
MinimumVersion string // semver or empty MinimumVersion string // semver or empty
OnlineWindow time.Duration OnlineWindow time.Duration
DistinctIP bool DistinctIP bool
AsOfSystemTimeInterval time.Duration // only used for CRDB queries AsOfSystemInterval time.Duration // only used for CRDB queries
} }
// AuditType is an enum representing the outcome of a particular audit reported to the overlay. // AuditType is an enum representing the outcome of a particular audit reported to the overlay.
@ -337,7 +337,7 @@ func (service *Service) IsOnline(node *NodeDossier) bool {
func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) { func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 { if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 {
req.AsOfSystemTimeInterval = service.config.Node.AsOfSystemTime.DefaultInterval req.AsOfSystemInterval = service.config.Node.AsOfSystemTime.DefaultInterval
} }
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node) return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
} }
@ -349,7 +349,7 @@ func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req
func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) { func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 { if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 {
req.AsOfSystemTimeInterval = service.config.Node.AsOfSystemTime.DefaultInterval req.AsOfSystemInterval = service.config.Node.AsOfSystemTime.DefaultInterval
} }
if service.config.NodeSelectionCache.Disabled { if service.config.NodeSelectionCache.Disabled {
@ -400,7 +400,7 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req
MinimumVersion: preferences.MinimumVersion, MinimumVersion: preferences.MinimumVersion,
OnlineWindow: preferences.OnlineWindow, OnlineWindow: preferences.OnlineWindow,
DistinctIP: preferences.DistinctIP, DistinctIP: preferences.DistinctIP,
AsOfSystemTimeInterval: req.AsOfSystemTimeInterval, AsOfSystemInterval: req.AsOfSystemInterval,
} }
nodes, err = service.db.SelectStorageNodes(ctx, totalNeededNodes, newNodeCount, &criteria) nodes, err = service.db.SelectStorageNodes(ctx, totalNeededNodes, newNodeCount, &criteria)
if err != nil { if err != nil {

View File

@ -6,7 +6,6 @@ package satellitedb
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
@ -52,7 +51,7 @@ type satelliteDB struct {
opts Options opts Options
log *zap.Logger log *zap.Logger
driver string driver string
implementation dbutil.Implementation impl dbutil.Implementation
source string source string
consoleDBOnce sync.Once consoleDBOnce sync.Once
@ -108,11 +107,11 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options
} }
func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options, override string) (*satelliteDB, error) { func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options, override string) (*satelliteDB, error) {
driver, source, implementation, err := dbutil.SplitConnStr(databaseURL) driver, source, impl, err := dbutil.SplitConnStr(databaseURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if implementation != dbutil.Postgres && implementation != dbutil.Cockroach { if impl != dbutil.Postgres && impl != dbutil.Cockroach {
return nil, Error.New("unsupported driver %q", driver) return nil, Error.New("unsupported driver %q", driver)
} }
@ -140,7 +139,7 @@ func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options
opts: opts, opts: opts,
log: log, log: log,
driver: driver, driver: driver,
implementation: implementation, impl: impl,
source: source, source: source,
} }
@ -158,17 +157,6 @@ func (dbc *satelliteDBCollection) getByName(name string) *satelliteDB {
return dbc.dbs[""] return dbc.dbs[""]
} }
// AsOfSystemTimeClause returns the "AS OF SYSTEM TIME" clause if the DB implementation
// is CockroachDB and the interval is less than or equal to a negative microsecond
// (CRDB does not support intervals in the negative nanoseconds).
func (db *satelliteDB) AsOfSystemTimeClause(interval time.Duration) (asOf string) {
if db.implementation == dbutil.Cockroach && interval <= -time.Microsecond {
asOf = " AS OF SYSTEM TIME '" + interval.String() + "' "
}
return asOf
}
// TestDBAccess for raw database access, // TestDBAccess for raw database access,
// should not be used outside of migration tests. // should not be used outside of migration tests.
func (db *satelliteDB) TestDBAccess() *dbx.DB { return db.DB } func (db *satelliteDB) TestDBAccess() *dbx.DB { return db.DB }

View File

@ -175,10 +175,10 @@ func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context,
// queue items whose nodes have finished the exit before the indicated time // queue items whose nodes have finished the exit before the indicated time
// returning the total number of deleted items. // returning the total number of deleted items.
func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (_ int64, err error) { ctx context.Context, before time.Time, asOfSystemInterval time.Duration, batchSize int) (_ int64, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
switch db.db.implementation { switch db.db.impl {
case dbutil.Postgres: case dbutil.Postgres:
statement := ` statement := `
DELETE FROM graceful_exit_transfer_queue DELETE FROM graceful_exit_transfer_queue
@ -201,11 +201,10 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
return count, nil return count, nil
case dbutil.Cockroach: case dbutil.Cockroach:
asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval)
nodesQuery := ` nodesQuery := `
SELECT id SELECT id
FROM nodes ` + asOf + ` FROM nodes
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
WHERE exit_finished_at IS NOT NULL WHERE exit_finished_at IS NOT NULL
AND exit_finished_at < $1 AND exit_finished_at < $1
LIMIT $2 OFFSET $3 LIMIT $2 OFFSET $3
@ -276,18 +275,16 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
return deleteCount, nil return deleteCount, nil
} }
return 0, Error.New("unsupported implementation: %s", return 0, Error.New("unsupported implementation: %s", db.db.impl)
dbutil.SchemeForImplementation(db.db.implementation),
)
} }
// DeleteFinishedExitProgress deletes exit progress entries for nodes that // DeleteFinishedExitProgress deletes exit progress entries for nodes that
// finished exiting before the indicated time, returns number of deleted entries. // finished exiting before the indicated time, returns number of deleted entries.
func (db *gracefulexitDB) DeleteFinishedExitProgress( func (db *gracefulexitDB) DeleteFinishedExitProgress(
ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ int64, err error) { ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ int64, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemTimeInterval) finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemInterval)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -295,10 +292,12 @@ func (db *gracefulexitDB) DeleteFinishedExitProgress(
} }
// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) { func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (finishedNodes []storj.NodeID, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) stmt := `
stmt := `SELECT id FROM nodes ` + asOf + ` SELECT id
FROM nodes
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
WHERE exit_finished_at IS NOT NULL WHERE exit_finished_at IS NOT NULL
AND exit_finished_at < ? AND exit_finished_at < ?
` `
@ -454,14 +453,13 @@ func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, node
// CountFinishedTransferQueueItemsByNode return a map of the nodes which has // CountFinishedTransferQueueItemsByNode return a map of the nodes which has
// finished the exit before the indicated time but there are at least one item // finished the exit before the indicated time but there are at least one item
// left in the transfer queue. // left in the transfer queue.
func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ map[storj.NodeID]int64, err error) { func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ map[storj.NodeID]int64, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval)
query := `SELECT n.id, count(getq.node_id) query := `SELECT n.id, count(getq.node_id)
FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq
ON n.id = getq.node_id ` + asOf + ` ON n.id = getq.node_id
` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
WHERE n.exit_finished_at IS NOT NULL WHERE n.exit_finished_at IS NOT NULL
AND n.exit_finished_at < ? AND n.exit_finished_at < ?
GROUP BY n.id` GROUP BY n.id`

View File

@ -34,7 +34,7 @@ func (db *satelliteDB) MigrateToLatest(ctx context.Context) error {
// will need to create the database it was told to connect to. These things should // will need to create the database it was told to connect to. These things should
// not really be here, and instead should be assumed to exist. // not really be here, and instead should be assumed to exist.
// This is tracked in jira ticket SM-200 // This is tracked in jira ticket SM-200
switch db.implementation { switch db.impl {
case dbutil.Postgres: case dbutil.Postgres:
schema, err := pgutil.ParseSchemaFromConnstr(db.source) schema, err := pgutil.ParseSchemaFromConnstr(db.source)
if err != nil { if err != nil {
@ -61,7 +61,7 @@ func (db *satelliteDB) MigrateToLatest(ctx context.Context) error {
} }
} }
switch db.implementation { switch db.impl {
case dbutil.Postgres, dbutil.Cockroach: case dbutil.Postgres, dbutil.Cockroach:
migration := db.PostgresMigration() migration := db.PostgresMigration()
// since we merged migration steps 0-69, the current db version should never be // since we merged migration steps 0-69, the current db version should never be
@ -85,7 +85,7 @@ func (db *satelliteDB) MigrateToLatest(ctx context.Context) error {
// TestingMigrateToLatest is a method for creating all tables for database for testing. // TestingMigrateToLatest is a method for creating all tables for database for testing.
func (db *satelliteDB) TestingMigrateToLatest(ctx context.Context) error { func (db *satelliteDB) TestingMigrateToLatest(ctx context.Context) error {
switch db.implementation { switch db.impl {
case dbutil.Postgres: case dbutil.Postgres:
schema, err := pgutil.ParseSchemaFromConnstr(db.source) schema, err := pgutil.ParseSchemaFromConnstr(db.source)
if err != nil { if err != nil {
@ -111,7 +111,7 @@ func (db *satelliteDB) TestingMigrateToLatest(ctx context.Context) error {
} }
} }
switch db.implementation { switch db.impl {
case dbutil.Postgres, dbutil.Cockroach: case dbutil.Postgres, dbutil.Cockroach:
migration := db.PostgresMigration() migration := db.PostgresMigration()
@ -132,7 +132,7 @@ func (db *satelliteDB) TestingMigrateToLatest(ctx context.Context) error {
// CheckVersion confirms the database is at the desired version. // CheckVersion confirms the database is at the desired version.
func (db *satelliteDB) CheckVersion(ctx context.Context) error { func (db *satelliteDB) CheckVersion(ctx context.Context) error {
switch db.implementation { switch db.impl {
case dbutil.Postgres, dbutil.Cockroach: case dbutil.Postgres, dbutil.Cockroach:
migration := db.PostgresMigration() migration := db.PostgresMigration()
return migration.ValidateVersions(ctx, db.log) return migration.ValidateVersions(ctx, db.log)
@ -1354,7 +1354,7 @@ func (db *satelliteDB) PostgresMigration() *migrate.Migration {
Version: 156, Version: 156,
Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error { Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error {
storingClause := func(fields ...string) string { storingClause := func(fields ...string) string {
if db.implementation == dbutil.Cockroach { if db.impl == dbutil.Cockroach {
return fmt.Sprintf("STORING (%s)", strings.Join(fields, ", ")) return fmt.Sprintf("STORING (%s)", strings.Join(fields, ", "))
} }

View File

@ -104,7 +104,7 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable
var reputableNodeQuery, newNodeQuery partialQuery var reputableNodeQuery, newNodeQuery partialQuery
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval) asOf := cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)
// Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not. // Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not.
// Later, the flag allows us to distinguish if a node is new when scanning the db rows. // Later, the flag allows us to distinguish if a node is new when scanning the db rows.

View File

@ -56,11 +56,10 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) { func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
asOf := cache.db.AsOfSystemTimeClause(selectionCfg.AsOfSystemTime.DefaultInterval)
query := ` query := `
SELECT id, address, last_net, last_ip_port, vetted_at SELECT id, address, last_net, last_ip_port, vetted_at
FROM nodes ` + asOf + ` FROM nodes
` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.DefaultInterval) + `
WHERE disqualified IS NULL WHERE disqualified IS NULL
AND unknown_audit_suspended IS NULL AND unknown_audit_suspended IS NULL
AND offline_suspended IS NULL AND offline_suspended IS NULL
@ -139,11 +138,10 @@ func (cache *overlaycache) SelectAllStorageNodesDownload(ctx context.Context, on
func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOfConfig overlay.AsOfSystemTimeConfig) (_ []*overlay.SelectedNode, err error) { func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOfConfig overlay.AsOfSystemTimeConfig) (_ []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
asOf := cache.db.AsOfSystemTimeClause(asOfConfig.DefaultInterval)
query := ` query := `
SELECT id, address, last_net, last_ip_port SELECT id, address, last_net, last_ip_port
FROM nodes ` + asOf + ` FROM nodes
` + cache.db.impl.AsOfSystemInterval(asOfConfig.DefaultInterval) + `
WHERE disqualified IS NULL WHERE disqualified IS NULL
AND exit_finished_at IS NULL AND exit_finished_at IS NULL
AND last_contact_success > $1 AND last_contact_success > $1
@ -312,12 +310,11 @@ func (cache *overlaycache) knownOffline(ctx context.Context, criteria *overlay.N
return nil, Error.New("no ids provided") return nil, Error.New("no ids provided")
} }
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval)
// get offline nodes // get offline nodes
var rows tagsql.Rows var rows tagsql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(` rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes `+asOf+` SELECT id FROM nodes
`+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+`
WHERE id = any($1::bytea[]) WHERE id = any($1::bytea[])
AND last_contact_success < $2 AND last_contact_success < $2
`), pgutil.NodeIDArray(nodeIds), time.Now().Add(-criteria.OnlineWindow), `), pgutil.NodeIDArray(nodeIds), time.Now().Add(-criteria.OnlineWindow),
@ -361,12 +358,12 @@ func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteri
return nil, Error.New("no ids provided") return nil, Error.New("no ids provided")
} }
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval)
// get reliable and online nodes // get reliable and online nodes
var rows tagsql.Rows var rows tagsql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(` rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes `+asOf+` SELECT id
FROM nodes
`+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+`
WHERE id = any($1::bytea[]) WHERE id = any($1::bytea[])
AND disqualified IS NULL AND disqualified IS NULL
AND unknown_audit_suspended IS NULL AND unknown_audit_suspended IS NULL
@ -469,11 +466,11 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC
} }
func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) { func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) {
asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval)
// get reliable and online nodes // get reliable and online nodes
rows, err := cache.db.Query(ctx, cache.db.Rebind(` rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes `+asOf+` SELECT id
FROM nodes
`+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+`
WHERE disqualified IS NULL WHERE disqualified IS NULL
AND unknown_audit_suspended IS NULL AND unknown_audit_suspended IS NULL
AND offline_suspended IS NULL AND offline_suspended IS NULL

View File

@ -476,7 +476,7 @@ func prefixIncrement(origPrefix []byte) (incremented []byte, ok bool) {
// use. // use.
func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, []byte, error) { func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, []byte, error) {
incrementedPrefix, ok := prefixIncrement(prefix) incrementedPrefix, ok := prefixIncrement(prefix)
switch db.db.implementation { switch db.db.impl {
case dbutil.Postgres: case dbutil.Postgres:
if !ok { if !ok {
return fmt.Sprintf(`(%s >= ?)`, expr), nil, nil return fmt.Sprintf(`(%s >= ?)`, expr), nil, nil
@ -488,7 +488,7 @@ func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, []
} }
return fmt.Sprintf(`(%s >= ?:::BYTEA AND %s < ?:::BYTEA)`, expr, expr), incrementedPrefix, nil return fmt.Sprintf(`(%s >= ?:::BYTEA AND %s < ?:::BYTEA)`, expr, expr), incrementedPrefix, nil
default: default:
return "", nil, errs.New("invalid dbType: %v", db.db.driver) return "", nil, errs.New("unhandled database: %v", db.db.driver)
} }
} }
@ -640,7 +640,7 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti
return 0, nil return 0, nil
} }
switch db.db.implementation { switch db.db.impl {
case dbutil.Cockroach: case dbutil.Cockroach:
for { for {
row := db.db.QueryRow(ctx, ` row := db.db.QueryRow(ctx, `

View File

@ -32,7 +32,7 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment
// we want to insert the segment if it is not in the queue, but update the segment health if it already is in the queue // we want to insert the segment if it is not in the queue, but update the segment health if it already is in the queue
// we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query // we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query
// and the separate cockroach query (which the xmax trick does not work for) // and the separate cockroach query (which the xmax trick does not work for)
switch r.db.implementation { switch r.db.impl {
case dbutil.Postgres: case dbutil.Postgres:
query = ` query = `
INSERT INTO injuredsegments INSERT INTO injuredsegments
@ -80,7 +80,7 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment
func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegment, err error) { func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegment, err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
switch r.db.implementation { switch r.db.impl {
case dbutil.Cockroach: case dbutil.Cockroach:
err = r.db.QueryRowContext(ctx, ` err = r.db.QueryRowContext(ctx, `
UPDATE injuredsegments SET attempted = now() UPDATE injuredsegments SET attempted = now()
@ -95,7 +95,7 @@ func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegmen
ORDER BY segment_health ASC, attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1 ORDER BY segment_health ASC, attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
) RETURNING data`).Scan(&seg) ) RETURNING data`).Scan(&seg)
default: default:
return seg, errs.New("invalid dbType: %v", r.db.implementation) return seg, errs.New("unhandled database: %v", r.db.impl)
} }
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
err = storage.ErrEmptyQueue.New("") err = storage.ErrEmptyQueue.New("")

View File

@ -534,7 +534,7 @@ func (db *StoragenodeAccounting) ArchiveRollupsBefore(ctx context.Context, befor
return 0, nil return 0, nil
} }
switch db.db.implementation { switch db.db.impl {
case dbutil.Cockroach: case dbutil.Cockroach:
for { for {
row := db.db.QueryRow(ctx, ` row := db.db.QueryRow(ctx, `
@ -559,6 +559,8 @@ func (db *StoragenodeAccounting) ArchiveRollupsBefore(ctx context.Context, befor
break break
} }
} }
return nodeRollupsDeleted, nil
case dbutil.Postgres: case dbutil.Postgres:
storagenodeStatement := ` storagenodeStatement := `
WITH rollups_to_move AS ( WITH rollups_to_move AS (
@ -571,14 +573,12 @@ func (db *StoragenodeAccounting) ArchiveRollupsBefore(ctx context.Context, befor
SELECT count(*) FROM moved_rollups SELECT count(*) FROM moved_rollups
` `
row := db.db.DB.QueryRow(ctx, storagenodeStatement, before) row := db.db.DB.QueryRow(ctx, storagenodeStatement, before)
var rowCount int err = row.Scan(&nodeRollupsDeleted)
err = row.Scan(&rowCount)
if err != nil {
return nodeRollupsDeleted, err return nodeRollupsDeleted, err
default:
return 0, Error.New("unsupported database: %v", db.db.impl)
} }
nodeRollupsDeleted = rowCount
}
return nodeRollupsDeleted, err
} }
// GetRollupsSince retrieves all archived bandwidth rollup records since a given time. // GetRollupsSince retrieves all archived bandwidth rollup records since a given time.