Allow for DB application names per process. (#3983)

This commit is contained in:
Ethan Adams 2020-12-04 05:24:39 -05:00 committed by GitHub
parent d75e4be11f
commit f90ea10a4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 59 additions and 39 deletions

View File

@ -27,6 +27,7 @@ func cmdAdminRun(cmd *cobra.Command, args []string) (err error) {
} }
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
ApplicationName: "satellite-admin",
APIKeysLRUOptions: runCfg.APIKeysLRUOptions(), APIKeysLRUOptions: runCfg.APIKeysLRUOptions(),
}) })
if err != nil { if err != nil {

View File

@ -32,6 +32,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
} }
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
ApplicationName: "satellite-api",
APIKeysLRUOptions: runCfg.APIKeysLRUOptions(), APIKeysLRUOptions: runCfg.APIKeysLRUOptions(),
RevocationLRUOptions: runCfg.RevocationLRUOptions(), RevocationLRUOptions: runCfg.RevocationLRUOptions(),
}) })
@ -42,7 +43,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close()) err = errs.Combine(err, db.Close())
}() }()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL) pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL, "satellite-api")
if err != nil { if err != nil {
return errs.New("Error creating metainfodb connection on satellite api: %+v", err) return errs.New("Error creating metainfodb connection on satellite api: %+v", err)
} }

View File

@ -24,7 +24,7 @@ import (
func runBillingCmd(ctx context.Context, cmdFunc func(context.Context, *stripecoinpayments.Service, *dbx.DB) error) error { func runBillingCmd(ctx context.Context, cmdFunc func(context.Context, *stripecoinpayments.Service, *dbx.DB) error) error {
// Open SatelliteDB for the Payment Service // Open SatelliteDB for the Payment Service
logger := zap.L() logger := zap.L()
db, err := satellitedb.Open(ctx, logger.Named("db"), runCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, logger.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-billing"})
if err != nil { if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err) return errs.New("error connecting to master database on satellite: %+v", err)
} }

View File

@ -32,7 +32,7 @@ func generateInvoicesCSV(ctx context.Context, period compensation.Period, out io
WithheldPercents: generateInvoicesCfg.Compensation.WithheldPercents, WithheldPercents: generateInvoicesCfg.Compensation.WithheldPercents,
} }
db, err := satellitedb.Open(ctx, zap.L().Named("db"), generateInvoicesCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, zap.L().Named("db"), generateInvoicesCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
if err != nil { if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err) return errs.New("error connecting to master database on satellite: %+v", err)
} }
@ -141,7 +141,7 @@ func recordPeriod(ctx context.Context, paystubsCSV, paymentsCSV string) (int, in
return 0, 0, err return 0, 0, err
} }
db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordPeriodCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordPeriodCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
if err != nil { if err != nil {
return 0, 0, errs.New("error connecting to master database on satellite: %+v", err) return 0, 0, errs.New("error connecting to master database on satellite: %+v", err)
} }
@ -165,7 +165,7 @@ func recordOneOffPayments(ctx context.Context, paymentsCSV string) (int, error)
return 0, err return 0, err
} }
db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordOneOffPaymentsCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, zap.L().Named("db"), recordOneOffPaymentsCfg.Database, satellitedb.Options{ApplicationName: "satellite-compensation"})
if err != nil { if err != nil {
return 0, errs.New("error connecting to master database on satellite: %+v", err) return 0, errs.New("error connecting to master database on satellite: %+v", err)
} }

View File

@ -28,7 +28,7 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Failed to load identity: %+v", err) return errs.New("Failed to load identity: %+v", err)
} }
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-gc"})
if err != nil { if err != nil {
return errs.New("Error starting master database on satellite GC: %+v", err) return errs.New("Error starting master database on satellite GC: %+v", err)
} }
@ -36,7 +36,7 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close()) err = errs.Combine(err, db.Close())
}() }()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL) pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-gc")
if err != nil { if err != nil {
return errs.New("Error creating pointerDB connection GC: %+v", err) return errs.New("Error creating pointerDB connection GC: %+v", err)
} }

View File

@ -27,7 +27,7 @@ import (
// generateGracefulExitCSV creates a report with graceful exit data for exiting or exited nodes in a given period. // generateGracefulExitCSV creates a report with graceful exit data for exiting or exited nodes in a given period.
func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Time, end time.Time, output io.Writer) error { func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Time, end time.Time, output io.Writer) error {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), gracefulExitCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, zap.L().Named("db"), gracefulExitCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil { if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err) return errs.New("error connecting to master database on satellite: %+v", err)
} }

View File

@ -326,6 +326,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
} }
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{
ApplicationName: "satellite-core",
ReportedRollupsReadBatchSize: runCfg.Orders.SettlementBatchSize, ReportedRollupsReadBatchSize: runCfg.Orders.SettlementBatchSize,
SaveRollupBatchSize: runCfg.Tally.SaveRollupBatchSize, SaveRollupBatchSize: runCfg.Tally.SaveRollupBatchSize,
ReadRollupBatchSize: runCfg.Tally.ReadRollupBatchSize, ReadRollupBatchSize: runCfg.Tally.ReadRollupBatchSize,
@ -337,7 +338,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close()) err = errs.Combine(err, db.Close())
}() }()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL) pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-core")
if err != nil { if err != nil {
return errs.New("Error creating metainfodb connection: %+v", err) return errs.New("Error creating metainfodb connection: %+v", err)
} }
@ -401,7 +402,7 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd) ctx, _ := process.Ctx(cmd)
log := zap.L() log := zap.L()
db, err := satellitedb.Open(ctx, log.Named("migration"), runCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, log.Named("migration"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-migration"})
if err != nil { if err != nil {
return errs.New("Error creating new master database connection for satellitedb migration: %+v", err) return errs.New("Error creating new master database connection for satellitedb migration: %+v", err)
} }
@ -414,7 +415,7 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Error creating tables for master database on satellite: %+v", err) return errs.New("Error creating tables for master database on satellite: %+v", err)
} }
pdb, err := metainfo.OpenStore(ctx, log.Named("migration"), runCfg.Metainfo.DatabaseURL) pdb, err := metainfo.OpenStore(ctx, log.Named("migration"), runCfg.Metainfo.DatabaseURL, "satellite-migration")
if err != nil { if err != nil {
return errs.New("Error creating pointer database connection on satellite: %+v", err) return errs.New("Error creating pointer database connection on satellite: %+v", err)
} }
@ -452,7 +453,7 @@ func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd) ctx, _ := process.Ctx(cmd)
// open the master db // open the master db
database, err := satellitedb.Open(ctx, zap.L().Named("db"), qdiagCfg.Database, satellitedb.Options{}) database, err := satellitedb.Open(ctx, zap.L().Named("db"), qdiagCfg.Database, satellitedb.Options{ApplicationName: "satellite-qdiag"})
if err != nil { if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err) return errs.New("error connecting to master database on satellite: %+v", err)
} }

View File

@ -31,7 +31,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Failed to load identity: %+v", err) return errs.New("Failed to load identity: %+v", err)
} }
db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, log.Named("db"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-repairer"})
if err != nil { if err != nil {
return errs.New("Error starting master database: %+v", err) return errs.New("Error starting master database: %+v", err)
} }
@ -39,7 +39,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close()) err = errs.Combine(err, db.Close())
}() }()
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL) pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-repairer")
if err != nil { if err != nil {
return errs.New("Error creating metainfo database connection: %+v", err) return errs.New("Error creating metainfo database connection: %+v", err)
} }

View File

@ -32,7 +32,7 @@ var headers = []string{
// GenerateAttributionCSV creates a report with. // GenerateAttributionCSV creates a report with.
func GenerateAttributionCSV(ctx context.Context, database string, partnerID uuid.UUID, start time.Time, end time.Time, output io.Writer) error { func GenerateAttributionCSV(ctx context.Context, database string, partnerID uuid.UUID, start time.Time, end time.Time, output io.Writer) error {
log := zap.L().Named("db") log := zap.L().Named("db")
db, err := satellitedb.Open(ctx, log, database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, log, database, satellitedb.Options{ApplicationName: "satellite-attribution"})
if err != nil { if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err) return errs.New("error connecting to master database on satellite: %+v", err)
} }

View File

@ -21,7 +21,7 @@ import (
// generateNodeUsageCSV creates a report with node usage data for all nodes in a given period which can be used for payments. // generateNodeUsageCSV creates a report with node usage data for all nodes in a given period which can be used for payments.
func generateNodeUsageCSV(ctx context.Context, start time.Time, end time.Time, output io.Writer) error { func generateNodeUsageCSV(ctx context.Context, start time.Time, end time.Time, output io.Writer) error {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), nodeUsageCfg.Database, satellitedb.Options{}) db, err := satellitedb.Open(ctx, zap.L().Named("db"), nodeUsageCfg.Database, satellitedb.Options{ApplicationName: "satellite-nodeusage"})
if err != nil { if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err) return errs.New("error connecting to master database on satellite: %+v", err)
} }

View File

@ -49,7 +49,7 @@ func cmdDelete(cmd *cobra.Command, args []string) (err error) {
ctx, _ := process.Ctx(cmd) ctx, _ := process.Ctx(cmd)
log := zap.L() log := zap.L()
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), deleteCfg.DatabaseURL) db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), deleteCfg.DatabaseURL, "satellite-reaper")
if err != nil { if err != nil {
return errs.New("error connecting database: %+v", err) return errs.New("error connecting database: %+v", err)
} }

View File

@ -49,7 +49,7 @@ func cmdDetect(cmd *cobra.Command, args []string) (err error) {
log.Warn("Failed to initialize telemetry batcher on segment reaper", zap.Error(err)) log.Warn("Failed to initialize telemetry batcher on segment reaper", zap.Error(err))
} }
db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), detectCfg.DatabaseURL) db, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), detectCfg.DatabaseURL, "satellite-reaper")
if err != nil { if err != nil {
return errs.New("error connecting database: %+v", err) return errs.New("error connecting database: %+v", err)
} }

View File

@ -52,7 +52,10 @@ func Open(ctx context.Context, log *zap.Logger, databaseURL string) (multinode.D
return nil, Error.New("unsupported driver %q", driver) return nil, Error.New("unsupported driver %q", driver)
} }
source = pgutil.CheckApplicationName(source) source, err = pgutil.CheckApplicationName(source, "storagenode-multinode")
if err != nil {
return nil, err
}
dbxDB, err := dbx.Open(driver, source) dbxDB, err := dbx.Open(driver, source)
if err != nil { if err != nil {

View File

@ -92,17 +92,21 @@ func QuerySnapshot(ctx context.Context, db dbschema.Queryer) (*dbschema.Snapshot
} }
// CheckApplicationName ensures that the Connection String contains an application name. // CheckApplicationName ensures that the Connection String contains an application name.
func CheckApplicationName(s string) (r string) { func CheckApplicationName(s string, app string) (string, error) {
if !strings.Contains(s, "application_name") { if !strings.Contains(s, "application_name") {
if !strings.Contains(s, "?") { if strings.TrimSpace(app) == "" {
r = s + "?application_name=Satellite" return s, errs.New("application name cannot be empty")
return
} }
r = s + "&application_name=Satellite"
return if !strings.Contains(s, "?") {
return s + "?application_name=" + app, nil
}
return s + "&application_name=" + app, nil
} }
// return source as is if application_name is set // return source as is if application_name is set
return s return s, nil
} }
// IsConstraintError checks if given error is about constraint violation. // IsConstraintError checks if given error is about constraint violation.

View File

@ -140,7 +140,7 @@ type PointerDB interface {
} }
// OpenStore returns database for storing pointer data. // OpenStore returns database for storing pointer data.
func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string) (db PointerDB, err error) { func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string, app string) (db PointerDB, err error) {
_, source, implementation, err := dbutil.SplitConnStr(dbURLString) _, source, implementation, err := dbutil.SplitConnStr(dbURLString)
if err != nil { if err != nil {
return nil, err return nil, err
@ -148,9 +148,9 @@ func OpenStore(ctx context.Context, logger *zap.Logger, dbURLString string) (db
switch implementation { switch implementation {
case dbutil.Postgres: case dbutil.Postgres:
db, err = postgreskv.Open(ctx, source) db, err = postgreskv.Open(ctx, source, app)
case dbutil.Cockroach: case dbutil.Cockroach:
db, err = cockroachkv.Open(ctx, source) db, err = cockroachkv.Open(ctx, source, app)
default: default:
err = Error.New("unsupported db implementation: %s", dbURLString) err = Error.New("unsupported db implementation: %s", dbURLString)
} }

View File

@ -66,6 +66,7 @@ type satelliteDB struct {
// Options includes options for how a satelliteDB runs. // Options includes options for how a satelliteDB runs.
type Options struct { type Options struct {
ApplicationName string
APIKeysLRUOptions cache.Options APIKeysLRUOptions cache.Options
RevocationLRUOptions cache.Options RevocationLRUOptions cache.Options
@ -121,7 +122,10 @@ func open(ctx context.Context, log *zap.Logger, databaseURL string, opts Options
return nil, Error.New("unsupported driver %q", driver) return nil, Error.New("unsupported driver %q", driver)
} }
source = pgutil.CheckApplicationName(source) source, err = pgutil.CheckApplicationName(source, opts.ApplicationName)
if err != nil {
return nil, err
}
dbxDB, err := dbx.Open(driver, source) dbxDB, err := dbx.Open(driver, source)
if err != nil { if err != nil {

View File

@ -157,7 +157,7 @@ func migrateTest(t *testing.T, connStr string) {
defer func() { require.NoError(t, tempDB.Close()) }() defer func() { require.NoError(t, tempDB.Close()) }()
// create a new satellitedb connection // create a new satellitedb connection
db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{}) db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{ApplicationName: "satellite-migration-test"})
require.NoError(t, err) require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
@ -249,7 +249,7 @@ func benchmarkSetup(b *testing.B, connStr string, merged bool) {
defer func() { require.NoError(b, tempDB.Close()) }() defer func() { require.NoError(b, tempDB.Close()) }()
// create a new satellitedb connection // create a new satellitedb connection
db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{}) db, err := satellitedb.Open(ctx, log, tempDB.ConnStr, satellitedb.Options{ApplicationName: "satellite-migration-test"})
require.NoError(b, err) require.NoError(b, err)
defer func() { require.NoError(b, db.Close()) }() defer func() { require.NoError(b, db.Close()) }()

View File

@ -119,7 +119,7 @@ func CreateMasterDB(ctx context.Context, log *zap.Logger, name string, category
// CreateMasterDBOnTopOf creates a new satellite database on top of an already existing // CreateMasterDBOnTopOf creates a new satellite database on top of an already existing
// temporary database. // temporary database.
func CreateMasterDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db satellite.DB, err error) { func CreateMasterDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db satellite.DB, err error) {
masterDB, err := satellitedb.Open(ctx, log.Named("db"), tempDB.ConnStr, satellitedb.Options{}) masterDB, err := satellitedb.Open(ctx, log.Named("db"), tempDB.ConnStr, satellitedb.Options{ApplicationName: "satellite-satellitdb-test"})
return &tempMasterDB{DB: masterDB, tempDB: tempDB}, err return &tempMasterDB{DB: masterDB, tempDB: tempDB}, err
} }
@ -156,7 +156,7 @@ func CreatePointerDB(ctx context.Context, log *zap.Logger, name string, category
// CreatePointerDBOnTopOf creates a new satellite database on top of an already existing // CreatePointerDBOnTopOf creates a new satellite database on top of an already existing
// temporary database. // temporary database.
func CreatePointerDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db metainfo.PointerDB, err error) { func CreatePointerDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db metainfo.PointerDB, err error) {
pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), tempDB.ConnStr) pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), tempDB.ConnStr, "satellite-satellitdb-test")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -34,8 +34,11 @@ type Client struct {
} }
// Open connects a new cockroachkv client given db URL. // Open connects a new cockroachkv client given db URL.
func Open(ctx context.Context, dbURL string) (*Client, error) { func Open(ctx context.Context, dbURL string, app string) (*Client, error) {
dbURL = pgutil.CheckApplicationName(dbURL) dbURL, err := pgutil.CheckApplicationName(dbURL, app)
if err != nil {
return nil, err
}
db, err := tagsql.Open(ctx, "cockroach", dbURL) db, err := tagsql.Open(ctx, "cockroach", dbURL)
if err != nil { if err != nil {

View File

@ -31,8 +31,11 @@ type Client struct {
} }
// Open connects a new postgreskv client given db URL. // Open connects a new postgreskv client given db URL.
func Open(ctx context.Context, dbURL string) (*Client, error) { func Open(ctx context.Context, dbURL string, app string) (*Client, error) {
dbURL = pgutil.CheckApplicationName(dbURL) dbURL, err := pgutil.CheckApplicationName(dbURL, app)
if err != nil {
return nil, err
}
db, err := tagsql.Open(ctx, "pgx", dbURL) db, err := tagsql.Open(ctx, "pgx", dbURL)
if err != nil { if err != nil {

View File

@ -23,7 +23,7 @@ import (
func openTestPostgres(ctx context.Context, t testing.TB) (store *Client, cleanup func()) { func openTestPostgres(ctx context.Context, t testing.TB) (store *Client, cleanup func()) {
connstr := pgtest.PickPostgres(t) connstr := pgtest.PickPostgres(t)
pgdb, err := Open(ctx, connstr) pgdb, err := Open(ctx, connstr, "satellite-test-postgres")
if err != nil { if err != nil {
t.Fatalf("init: %v", err) t.Fatalf("init: %v", err)
} }