From 0858c3797a0cbc9e55c390ee13130ae7181cd789 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 11 May 2021 11:49:26 +0300 Subject: [PATCH] satellite/{metabase,satellitedb}: deduplicate AS OF SYSTEM TIME code Currently we were duplicating code for AS OF SYSTEM TIME in several places. This replaces the code with using a method on dbutil.Implementation. As a consequence it's more useful to use a shorter name for implementation - 'impl' should be sufficiently clear in the context. Similarly, using AsOfSystemInterval and AsOfSystemTime to distinguish between the two modes is useful and slightly shorter without causing confusion. Change-Id: Idefe55528efa758b6176591017b6572a8d443e3d --- go.mod | 2 +- go.sum | 4 +- multinode/multinodedb/database.go | 20 +++++----- satellite/metabase/db.go | 12 +++--- satellite/metabase/delete.go | 4 +- satellite/metabase/delete_bucket.go | 4 +- satellite/metabase/delete_objects.go | 16 +------- satellite/metabase/loop.go | 16 +------- satellite/overlay/service.go | 40 +++++++++---------- satellite/satellitedb/database.go | 36 ++++++----------- satellite/satellitedb/gracefulexit.go | 32 +++++++-------- satellite/satellitedb/migrate.go | 12 +++--- satellite/satellitedb/nodeselection.go | 2 +- satellite/satellitedb/overlaycache.go | 27 ++++++------- satellite/satellitedb/projectaccounting.go | 6 +-- satellite/satellitedb/repairqueue.go | 6 +-- .../satellitedb/storagenodeaccounting.go | 16 ++++---- 17 files changed, 107 insertions(+), 148 deletions(-) diff --git a/go.mod b/go.mod index fec36cce4..79d70931b 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,6 @@ require ( storj.io/common v0.0.0-20210429174118-60091ebbbdaf storj.io/drpc v0.0.20 storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 - storj.io/private v0.0.0-20210429173958-0e792382d191 + storj.io/private v0.0.0-20210511083637-239fca6e9894 storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9 ) diff --git a/go.sum b/go.sum index ab1b7ac95..186bf9a0f 100644 --- a/go.sum +++ b/go.sum @@ -850,7 +850,7 @@ storj.io/drpc v0.0.20/go.mod h1:eAxUDk8HWvGl9iqznpuphtZ+WIjIGPJFqNXuKHgRiMM= storj.io/monkit-jaeger v0.0.0-20210225162224-66fb37637bf6/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80= storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7 h1:zi0w9zoBfvuqysSAqxJT1Ton2YB5IhyMM3/3CISjlrQ= storj.io/monkit-jaeger v0.0.0-20210426161729-debb1cbcbbd7/go.mod h1:gj4vuCeyCRjRmH8LIrgoyU9Dc9uR6H+/GcDUXmTbf80= -storj.io/private v0.0.0-20210429173958-0e792382d191 h1:3G4zwDJB/+71A5ahHldelS/TzBkUIuRTn8qwLTD2vck= -storj.io/private v0.0.0-20210429173958-0e792382d191/go.mod h1:iAc+LGwXYCe+YRRTlkfkg95ZBEL8pWHLVZ508/KQjOs= +storj.io/private v0.0.0-20210511083637-239fca6e9894 h1:ANILx94AKXmvXAf+hs0HMb85Qi2Y4k7RjEb9S7OhK+M= +storj.io/private v0.0.0-20210511083637-239fca6e9894/go.mod h1:iAc+LGwXYCe+YRRTlkfkg95ZBEL8pWHLVZ508/KQjOs= storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9 h1:rSP8cSfLqkYtRLUlGkwj1CeafNf7YPRgTstgwHu0tC8= storj.io/uplink v1.5.0-rc.1.0.20210506124440-cfeb286eeeb9/go.mod h1:VzJd+P1sfcVnGCxm0mPPhOBkbov0gLZ+/QXeKkkZ1tI= diff --git a/multinode/multinodedb/database.go b/multinode/multinodedb/database.go index 6a24153cb..070c0e81c 100644 --- a/multinode/multinodedb/database.go +++ b/multinode/multinodedb/database.go @@ -37,20 +37,20 @@ var ( type multinodeDB struct { *dbx.DB - log *zap.Logger - driver string - implementation dbutil.Implementation - source string + log *zap.Logger + driver string + impl dbutil.Implementation + source string } // Open creates instance of database supports postgres. func Open(ctx context.Context, log *zap.Logger, databaseURL string) (multinode.DB, error) { - driver, source, implementation, err := dbutil.SplitConnStr(databaseURL) + driver, source, impl, err := dbutil.SplitConnStr(databaseURL) if err != nil { return nil, err } - switch implementation { + switch impl { case dbutil.SQLite3: source = sqlite3SetDefaultOptions(source) case dbutil.Postgres: @@ -75,10 +75,10 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string) (multinode.D core := &multinodeDB{ DB: dbxDB, - log: log, - driver: driver, - implementation: implementation, - source: source, + log: log, + driver: driver, + impl: impl, + source: source, } return core, nil diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index 445ec6aae..9a9ecaaa2 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -28,10 +28,10 @@ var ( // DB implements a database for storing objects and segments. type DB struct { - log *zap.Logger - db tagsql.DB - connstr string - implementation dbutil.Implementation + log *zap.Logger + db tagsql.DB + connstr string + impl dbutil.Implementation aliasCache *NodeAliasCache } @@ -47,7 +47,7 @@ func Open(ctx context.Context, log *zap.Logger, driverName, connstr string) (*DB db := &DB{log: log, connstr: connstr, db: postgresRebind{rawdb}} db.aliasCache = NewNodeAliasCache(db) - _, _, db.implementation, err = dbutil.SplitConnStr(connstr) + _, _, db.impl, err = dbutil.SplitConnStr(connstr) if err != nil { return nil, Error.Wrap(err) } @@ -94,7 +94,7 @@ func (db *DB) MigrateToLatest(ctx context.Context) error { // will need to create the database it was told to connect to. These things should // not really be here, and instead should be assumed to exist. // This is tracked in jira ticket SM-200 - switch db.implementation { + switch db.impl { case dbutil.Postgres: schema, err := pgutil.ParseSchemaFromConnstr(db.connstr) if err != nil { diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index 2260819ae..b29432721 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -222,7 +222,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa } var query string - switch db.implementation { + switch db.impl { case dbutil.Cockroach: query = ` WITH deleted_objects AS ( @@ -299,7 +299,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id ` default: - return DeleteObjectResult{}, Error.New("invalid dbType: %v", db.implementation) + return DeleteObjectResult{}, Error.New("unhandled database: %v", db.impl) } err = withRows(db.db.Query(ctx, query, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)))(func(rows tagsql.Rows) error { result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) diff --git a/satellite/metabase/delete_bucket.go b/satellite/metabase/delete_bucket.go index 3ed138b91..d54b31ada 100644 --- a/satellite/metabase/delete_bucket.go +++ b/satellite/metabase/delete_bucket.go @@ -39,7 +39,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) } var query string - switch db.implementation { + switch db.impl { case dbutil.Cockroach: query = ` WITH deleted_objects AS ( @@ -67,7 +67,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces ` default: - return deletedObjectCount, Error.New("invalid dbType: %v", db.implementation) + return deletedObjectCount, Error.New("unhandled database: %v", db.impl) } // TODO: fix the count for objects without segments diff --git a/satellite/metabase/delete_objects.go b/satellite/metabase/delete_objects.go index fb2c946c8..5a4adaac2 100644 --- a/satellite/metabase/delete_objects.go +++ b/satellite/metabase/delete_objects.go @@ -6,7 +6,6 @@ package metabase import ( "context" "encoding/hex" - "fmt" "time" "github.com/jackc/pgx/v4" @@ -14,7 +13,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" - "storj.io/private/dbutil" "storj.io/private/tagsql" ) @@ -33,18 +31,13 @@ type DeleteExpiredObjects struct { func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObjects) (err error) { defer mon.Task()(&ctx)(&err) - var asOfSystemTimeString string - if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach { - asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano()) - } - return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) { query := ` SELECT project_id, bucket_name, object_key, version, stream_id, expires_at FROM objects - ` + asOfSystemTimeString + ` + ` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + ` WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) AND expires_at < $5 @@ -104,17 +97,12 @@ type DeleteZombieObjects struct { func (db *DB) DeleteZombieObjects(ctx context.Context, opts DeleteZombieObjects) (err error) { defer mon.Task()(&ctx)(&err) - var asOfSystemTimeString string - if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach { - asOfSystemTimeString = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano()) - } - return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) { query := ` SELECT project_id, bucket_name, object_key, version, stream_id FROM objects - ` + asOfSystemTimeString + ` + ` + db.impl.AsOfSystemTime(opts.AsOfSystemTime) + ` WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) AND status = ` + pendingStatus + ` diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index 7bb9d5f4d..f090bc97d 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -6,7 +6,6 @@ package metabase import ( "bytes" "context" - "fmt" "sort" "time" @@ -14,7 +13,6 @@ import ( "storj.io/common/storj" "storj.io/common/uuid" - "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/private/tagsql" ) @@ -154,11 +152,6 @@ func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool { func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) { defer mon.Task()(&ctx)(&err) - var asOfSystemTime string - if !it.asOfSystemTime.IsZero() && it.db.implementation == dbutil.Cockroach { - asOfSystemTime = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, it.asOfSystemTime.UnixNano()) - } - return it.db.db.Query(ctx, ` SELECT project_id, bucket_name, @@ -168,7 +161,7 @@ func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err err segment_count, LENGTH(COALESCE(encrypted_metadata,'')) FROM objects - `+asOfSystemTime+` + `+it.db.impl.AsOfSystemTime(it.asOfSystemTime)+` WHERE (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC LIMIT $5 @@ -241,11 +234,6 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h bytesIDs[i] = id[:] } - var asOfSystemTime string - if !opts.AsOfSystemTime.IsZero() && db.implementation == dbutil.Cockroach { - asOfSystemTime = fmt.Sprintf(` AS OF SYSTEM TIME '%d' `, opts.AsOfSystemTime.UnixNano()) - } - rows, err := db.db.Query(ctx, ` SELECT stream_id, position, @@ -256,7 +244,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h redundancy, remote_alias_pieces FROM segments - `+asOfSystemTime+` + `+db.impl.AsOfSystemTime(opts.AsOfSystemTime)+` WHERE -- this turns out to be a little bit faster than stream_id IN (SELECT unnest($1::BYTEA[])) stream_id = ANY ($1::BYTEA[]) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 4e85483c3..bc1a02c47 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -133,21 +133,21 @@ type InfoResponse struct { // FindStorageNodesRequest defines easy request parameters. type FindStorageNodesRequest struct { - RequestedCount int - ExcludedIDs []storj.NodeID - MinimumVersion string // semver or empty - AsOfSystemTimeInterval time.Duration // only used for CRDB queries + RequestedCount int + ExcludedIDs []storj.NodeID + MinimumVersion string // semver or empty + AsOfSystemInterval time.Duration // only used for CRDB queries } // NodeCriteria are the requirements for selecting nodes. type NodeCriteria struct { - FreeDisk int64 - ExcludedIDs []storj.NodeID - ExcludedNetworks []string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes - MinimumVersion string // semver or empty - OnlineWindow time.Duration - DistinctIP bool - AsOfSystemTimeInterval time.Duration // only used for CRDB queries + FreeDisk int64 + ExcludedIDs []storj.NodeID + ExcludedNetworks []string // the /24 subnet IPv4 or /64 subnet IPv6 for nodes + MinimumVersion string // semver or empty + OnlineWindow time.Duration + DistinctIP bool + AsOfSystemInterval time.Duration // only used for CRDB queries } // AuditType is an enum representing the outcome of a particular audit reported to the overlay. @@ -337,7 +337,7 @@ func (service *Service) IsOnline(node *NodeDossier) bool { func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) { defer mon.Task()(&ctx)(&err) if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 { - req.AsOfSystemTimeInterval = service.config.Node.AsOfSystemTime.DefaultInterval + req.AsOfSystemInterval = service.config.Node.AsOfSystemTime.DefaultInterval } return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node) } @@ -349,7 +349,7 @@ func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) { defer mon.Task()(&ctx)(&err) if service.config.Node.AsOfSystemTime.Enabled && service.config.Node.AsOfSystemTime.DefaultInterval < 0 { - req.AsOfSystemTimeInterval = service.config.Node.AsOfSystemTime.DefaultInterval + req.AsOfSystemInterval = service.config.Node.AsOfSystemTime.DefaultInterval } if service.config.NodeSelectionCache.Disabled { @@ -394,13 +394,13 @@ func (service *Service) FindStorageNodesWithPreferences(ctx context.Context, req } criteria := NodeCriteria{ - FreeDisk: preferences.MinimumDiskSpace.Int64(), - ExcludedIDs: excludedIDs, - ExcludedNetworks: excludedNetworks, - MinimumVersion: preferences.MinimumVersion, - OnlineWindow: preferences.OnlineWindow, - DistinctIP: preferences.DistinctIP, - AsOfSystemTimeInterval: req.AsOfSystemTimeInterval, + FreeDisk: preferences.MinimumDiskSpace.Int64(), + ExcludedIDs: excludedIDs, + ExcludedNetworks: excludedNetworks, + MinimumVersion: preferences.MinimumVersion, + OnlineWindow: preferences.OnlineWindow, + DistinctIP: preferences.DistinctIP, + AsOfSystemInterval: req.AsOfSystemInterval, } nodes, err = service.db.SelectStorageNodes(ctx, totalNeededNodes, newNodeCount, &criteria) if err != nil { diff --git a/satellite/satellitedb/database.go b/satellite/satellitedb/database.go index fe8137edc..57f351633 100644 --- a/satellite/satellitedb/database.go +++ b/satellite/satellitedb/database.go @@ -6,7 +6,6 @@ package satellitedb import ( "context" "sync" - "time" "github.com/zeebo/errs" "go.uber.org/zap" @@ -49,11 +48,11 @@ type satelliteDB struct { migrationDB tagsql.DB - opts Options - log *zap.Logger - driver string - implementation dbutil.Implementation - source string + opts Options + log *zap.Logger + driver string + impl dbutil.Implementation + source string consoleDBOnce sync.Once consoleDB *ConsoleDB @@ -108,11 +107,11 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options } func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options, override string) (*satelliteDB, error) { - driver, source, implementation, err := dbutil.SplitConnStr(databaseURL) + driver, source, impl, err := dbutil.SplitConnStr(databaseURL) if err != nil { return nil, err } - if implementation != dbutil.Postgres && implementation != dbutil.Cockroach { + if impl != dbutil.Postgres && impl != dbutil.Cockroach { return nil, Error.New("unsupported driver %q", driver) } @@ -137,11 +136,11 @@ func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options core := &satelliteDB{ DB: dbxDB, - opts: opts, - log: log, - driver: driver, - implementation: implementation, - source: source, + opts: opts, + log: log, + driver: driver, + impl: impl, + source: source, } core.migrationDB = core @@ -158,17 +157,6 @@ func (dbc *satelliteDBCollection) getByName(name string) *satelliteDB { return dbc.dbs[""] } -// AsOfSystemTimeClause returns the "AS OF SYSTEM TIME" clause if the DB implementation -// is CockroachDB and the interval is less than or equal to a negative microsecond -// (CRDB does not support intervals in the negative nanoseconds). -func (db *satelliteDB) AsOfSystemTimeClause(interval time.Duration) (asOf string) { - if db.implementation == dbutil.Cockroach && interval <= -time.Microsecond { - asOf = " AS OF SYSTEM TIME '" + interval.String() + "' " - } - - return asOf -} - // TestDBAccess for raw database access, // should not be used outside of migration tests. func (db *satelliteDB) TestDBAccess() *dbx.DB { return db.DB } diff --git a/satellite/satellitedb/gracefulexit.go b/satellite/satellitedb/gracefulexit.go index 3c1823313..07fa44bd2 100644 --- a/satellite/satellitedb/gracefulexit.go +++ b/satellite/satellitedb/gracefulexit.go @@ -175,10 +175,10 @@ func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, // queue items whose nodes have finished the exit before the indicated time // returning the total number of deleted items. func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( - ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (_ int64, err error) { + ctx context.Context, before time.Time, asOfSystemInterval time.Duration, batchSize int) (_ int64, err error) { defer mon.Task()(&ctx)(&err) - switch db.db.implementation { + switch db.db.impl { case dbutil.Postgres: statement := ` DELETE FROM graceful_exit_transfer_queue @@ -201,11 +201,10 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( return count, nil case dbutil.Cockroach: - asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) - nodesQuery := ` SELECT id - FROM nodes ` + asOf + ` + FROM nodes + ` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + ` WHERE exit_finished_at IS NOT NULL AND exit_finished_at < $1 LIMIT $2 OFFSET $3 @@ -276,18 +275,16 @@ func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( return deleteCount, nil } - return 0, Error.New("unsupported implementation: %s", - dbutil.SchemeForImplementation(db.db.implementation), - ) + return 0, Error.New("unsupported implementation: %s", db.db.impl) } // DeleteFinishedExitProgress deletes exit progress entries for nodes that // finished exiting before the indicated time, returns number of deleted entries. func (db *gracefulexitDB) DeleteFinishedExitProgress( - ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ int64, err error) { + ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ int64, err error) { defer mon.Task()(&ctx)(&err) - finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemTimeInterval) + finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemInterval) if err != nil { return 0, err } @@ -295,10 +292,12 @@ func (db *gracefulexitDB) DeleteFinishedExitProgress( } // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. -func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) { +func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (finishedNodes []storj.NodeID, err error) { defer mon.Task()(&ctx)(&err) - asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) - stmt := `SELECT id FROM nodes ` + asOf + ` + stmt := ` + SELECT id + FROM nodes + ` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + ` WHERE exit_finished_at IS NOT NULL AND exit_finished_at < ? ` @@ -454,14 +453,13 @@ func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, node // CountFinishedTransferQueueItemsByNode return a map of the nodes which has // finished the exit before the indicated time but there are at least one item // left in the transfer queue. -func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (_ map[storj.NodeID]int64, err error) { +func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ map[storj.NodeID]int64, err error) { defer mon.Task()(&ctx)(&err) - asOf := db.db.AsOfSystemTimeClause(asOfSystemTimeInterval) - query := `SELECT n.id, count(getq.node_id) FROM nodes as n INNER JOIN graceful_exit_transfer_queue as getq - ON n.id = getq.node_id ` + asOf + ` + ON n.id = getq.node_id + ` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + ` WHERE n.exit_finished_at IS NOT NULL AND n.exit_finished_at < ? GROUP BY n.id` diff --git a/satellite/satellitedb/migrate.go b/satellite/satellitedb/migrate.go index 353d9913d..503bed2a4 100644 --- a/satellite/satellitedb/migrate.go +++ b/satellite/satellitedb/migrate.go @@ -34,7 +34,7 @@ func (db *satelliteDB) MigrateToLatest(ctx context.Context) error { // will need to create the database it was told to connect to. These things should // not really be here, and instead should be assumed to exist. // This is tracked in jira ticket SM-200 - switch db.implementation { + switch db.impl { case dbutil.Postgres: schema, err := pgutil.ParseSchemaFromConnstr(db.source) if err != nil { @@ -61,7 +61,7 @@ func (db *satelliteDB) MigrateToLatest(ctx context.Context) error { } } - switch db.implementation { + switch db.impl { case dbutil.Postgres, dbutil.Cockroach: migration := db.PostgresMigration() // since we merged migration steps 0-69, the current db version should never be @@ -85,7 +85,7 @@ func (db *satelliteDB) MigrateToLatest(ctx context.Context) error { // TestingMigrateToLatest is a method for creating all tables for database for testing. func (db *satelliteDB) TestingMigrateToLatest(ctx context.Context) error { - switch db.implementation { + switch db.impl { case dbutil.Postgres: schema, err := pgutil.ParseSchemaFromConnstr(db.source) if err != nil { @@ -111,7 +111,7 @@ func (db *satelliteDB) TestingMigrateToLatest(ctx context.Context) error { } } - switch db.implementation { + switch db.impl { case dbutil.Postgres, dbutil.Cockroach: migration := db.PostgresMigration() @@ -132,7 +132,7 @@ func (db *satelliteDB) TestingMigrateToLatest(ctx context.Context) error { // CheckVersion confirms the database is at the desired version. func (db *satelliteDB) CheckVersion(ctx context.Context) error { - switch db.implementation { + switch db.impl { case dbutil.Postgres, dbutil.Cockroach: migration := db.PostgresMigration() return migration.ValidateVersions(ctx, db.log) @@ -1354,7 +1354,7 @@ func (db *satelliteDB) PostgresMigration() *migrate.Migration { Version: 156, Action: migrate.Func(func(ctx context.Context, log *zap.Logger, _ tagsql.DB, tx tagsql.Tx) error { storingClause := func(fields ...string) string { - if db.implementation == dbutil.Cockroach { + if db.impl == dbutil.Cockroach { return fmt.Sprintf("STORING (%s)", strings.Join(fields, ", ")) } diff --git a/satellite/satellitedb/nodeselection.go b/satellite/satellitedb/nodeselection.go index e05797b16..cc9b4e468 100644 --- a/satellite/satellitedb/nodeselection.go +++ b/satellite/satellitedb/nodeselection.go @@ -104,7 +104,7 @@ func (cache *overlaycache) selectStorageNodesOnce(ctx context.Context, reputable var reputableNodeQuery, newNodeQuery partialQuery - asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval) + asOf := cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval) // Note: the true/false at the end of each selection string indicates if the selection is for new nodes or not. // Later, the flag allows us to distinguish if a node is new when scanning the db rows. diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 698ce684c..4fefbe22f 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -56,11 +56,10 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) - asOf := cache.db.AsOfSystemTimeClause(selectionCfg.AsOfSystemTime.DefaultInterval) - query := ` SELECT id, address, last_net, last_ip_port, vetted_at - FROM nodes ` + asOf + ` + FROM nodes + ` + cache.db.impl.AsOfSystemInterval(selectionCfg.AsOfSystemTime.DefaultInterval) + ` WHERE disqualified IS NULL AND unknown_audit_suspended IS NULL AND offline_suspended IS NULL @@ -139,11 +138,10 @@ func (cache *overlaycache) SelectAllStorageNodesDownload(ctx context.Context, on func (cache *overlaycache) selectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOfConfig overlay.AsOfSystemTimeConfig) (_ []*overlay.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) - asOf := cache.db.AsOfSystemTimeClause(asOfConfig.DefaultInterval) - query := ` SELECT id, address, last_net, last_ip_port - FROM nodes ` + asOf + ` + FROM nodes + ` + cache.db.impl.AsOfSystemInterval(asOfConfig.DefaultInterval) + ` WHERE disqualified IS NULL AND exit_finished_at IS NULL AND last_contact_success > $1 @@ -312,12 +310,11 @@ func (cache *overlaycache) knownOffline(ctx context.Context, criteria *overlay.N return nil, Error.New("no ids provided") } - asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval) - // get offline nodes var rows tagsql.Rows rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT id FROM nodes `+asOf+` + SELECT id FROM nodes + `+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+` WHERE id = any($1::bytea[]) AND last_contact_success < $2 `), pgutil.NodeIDArray(nodeIds), time.Now().Add(-criteria.OnlineWindow), @@ -361,12 +358,12 @@ func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteri return nil, Error.New("no ids provided") } - asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval) - // get reliable and online nodes var rows tagsql.Rows rows, err = cache.db.Query(ctx, cache.db.Rebind(` - SELECT id FROM nodes `+asOf+` + SELECT id + FROM nodes + `+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+` WHERE id = any($1::bytea[]) AND disqualified IS NULL AND unknown_audit_suspended IS NULL @@ -469,11 +466,11 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC } func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) { - asOf := cache.db.AsOfSystemTimeClause(criteria.AsOfSystemTimeInterval) - // get reliable and online nodes rows, err := cache.db.Query(ctx, cache.db.Rebind(` - SELECT id FROM nodes `+asOf+` + SELECT id + FROM nodes + `+cache.db.impl.AsOfSystemInterval(criteria.AsOfSystemInterval)+` WHERE disqualified IS NULL AND unknown_audit_suspended IS NULL AND offline_suspended IS NULL diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index 954bd6e9a..aad785040 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -476,7 +476,7 @@ func prefixIncrement(origPrefix []byte) (incremented []byte, ok bool) { // use. func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, []byte, error) { incrementedPrefix, ok := prefixIncrement(prefix) - switch db.db.implementation { + switch db.db.impl { case dbutil.Postgres: if !ok { return fmt.Sprintf(`(%s >= ?)`, expr), nil, nil @@ -488,7 +488,7 @@ func (db *ProjectAccounting) prefixMatch(expr string, prefix []byte) (string, [] } return fmt.Sprintf(`(%s >= ?:::BYTEA AND %s < ?:::BYTEA)`, expr, expr), incrementedPrefix, nil default: - return "", nil, errs.New("invalid dbType: %v", db.db.driver) + return "", nil, errs.New("unhandled database: %v", db.db.driver) } } @@ -640,7 +640,7 @@ func (db *ProjectAccounting) ArchiveRollupsBefore(ctx context.Context, before ti return 0, nil } - switch db.db.implementation { + switch db.db.impl { case dbutil.Cockroach: for { row := db.db.QueryRow(ctx, ` diff --git a/satellite/satellitedb/repairqueue.go b/satellite/satellitedb/repairqueue.go index 71ae71332..94b9f0b1d 100644 --- a/satellite/satellitedb/repairqueue.go +++ b/satellite/satellitedb/repairqueue.go @@ -32,7 +32,7 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment // we want to insert the segment if it is not in the queue, but update the segment health if it already is in the queue // we also want to know if the result was an insert or an update - this is the reasoning for the xmax section of the postgres query // and the separate cockroach query (which the xmax trick does not work for) - switch r.db.implementation { + switch r.db.impl { case dbutil.Postgres: query = ` INSERT INTO injuredsegments @@ -80,7 +80,7 @@ func (r *repairQueue) Insert(ctx context.Context, seg *internalpb.InjuredSegment func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegment, err error) { defer mon.Task()(&ctx)(&err) - switch r.db.implementation { + switch r.db.impl { case dbutil.Cockroach: err = r.db.QueryRowContext(ctx, ` UPDATE injuredsegments SET attempted = now() @@ -95,7 +95,7 @@ func (r *repairQueue) Select(ctx context.Context) (seg *internalpb.InjuredSegmen ORDER BY segment_health ASC, attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1 ) RETURNING data`).Scan(&seg) default: - return seg, errs.New("invalid dbType: %v", r.db.implementation) + return seg, errs.New("unhandled database: %v", r.db.impl) } if errors.Is(err, sql.ErrNoRows) { err = storage.ErrEmptyQueue.New("") diff --git a/satellite/satellitedb/storagenodeaccounting.go b/satellite/satellitedb/storagenodeaccounting.go index 7b3a592de..f0af2632a 100644 --- a/satellite/satellitedb/storagenodeaccounting.go +++ b/satellite/satellitedb/storagenodeaccounting.go @@ -534,7 +534,7 @@ func (db *StoragenodeAccounting) ArchiveRollupsBefore(ctx context.Context, befor return 0, nil } - switch db.db.implementation { + switch db.db.impl { case dbutil.Cockroach: for { row := db.db.QueryRow(ctx, ` @@ -559,6 +559,8 @@ func (db *StoragenodeAccounting) ArchiveRollupsBefore(ctx context.Context, befor break } } + return nodeRollupsDeleted, nil + case dbutil.Postgres: storagenodeStatement := ` WITH rollups_to_move AS ( @@ -571,14 +573,12 @@ func (db *StoragenodeAccounting) ArchiveRollupsBefore(ctx context.Context, befor SELECT count(*) FROM moved_rollups ` row := db.db.DB.QueryRow(ctx, storagenodeStatement, before) - var rowCount int - err = row.Scan(&rowCount) - if err != nil { - return nodeRollupsDeleted, err - } - nodeRollupsDeleted = rowCount + err = row.Scan(&nodeRollupsDeleted) + return nodeRollupsDeleted, err + + default: + return 0, Error.New("unsupported database: %v", db.db.impl) } - return nodeRollupsDeleted, err } // GetRollupsSince retrieves all archived bandwidth rollup records since a given time.