// Copyright (C) 2020 Storj Labs, Inc. // See LICENSE for copying information. // Package metabase implements storing objects and segements. package metabase import ( "context" "fmt" "strconv" "time" _ "github.com/jackc/pgx/v5" // registers pgx as a tagsql driver. _ "github.com/jackc/pgx/v5/stdlib" // registers pgx as a tagsql driver. "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/memory" "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/private/tagsql" "storj.io/storj/private/migrate" ) var ( mon = monkit.Package() ) // Config is a configuration struct for part validation. type Config struct { ApplicationName string MinPartSize memory.Size MaxNumberOfParts int // TODO remove this flag when server-side copy implementation will be finished ServerSideCopy bool ServerSideCopyDisabled bool } // DB implements a database for storing objects and segments. type DB struct { log *zap.Logger db tagsql.DB connstr string impl dbutil.Implementation aliasCache *NodeAliasCache testCleanup func() error config Config } // Open opens a connection to metabase. func Open(ctx context.Context, log *zap.Logger, connstr string, config Config) (*DB, error) { var driverName string _, _, impl, err := dbutil.SplitConnStr(connstr) if err != nil { return nil, Error.Wrap(err) } switch impl { case dbutil.Postgres: driverName = "pgx" case dbutil.Cockroach: driverName = "cockroach" default: return nil, Error.New("unsupported implementation: %s", connstr) } connstr, err = pgutil.CheckApplicationName(connstr, config.ApplicationName) if err != nil { return nil, Error.Wrap(err) } rawdb, err := tagsql.Open(ctx, driverName, connstr) if err != nil { return nil, Error.Wrap(err) } dbutil.Configure(ctx, rawdb, "metabase", mon) db := &DB{ log: log, db: postgresRebind{rawdb}, connstr: connstr, impl: impl, testCleanup: func() error { return nil }, config: config, } db.aliasCache = NewNodeAliasCache(db) log.Debug("Connected", zap.String("db source", connstr)) return db, nil } // Implementation rturns the database implementation. func (db *DB) Implementation() dbutil.Implementation { return db.impl } // UnderlyingTagSQL returns *tagsql.DB. // TODO: remove. func (db *DB) UnderlyingTagSQL() tagsql.DB { return db.db } // Ping checks whether connection has been established. func (db *DB) Ping(ctx context.Context) error { return Error.Wrap(db.db.PingContext(ctx)) } // TestingSetCleanup is used to set the callback for cleaning up test database. func (db *DB) TestingSetCleanup(cleanup func() error) { db.testCleanup = cleanup } // Close closes the connection to database. func (db *DB) Close() error { return errs.Combine(Error.Wrap(db.db.Close()), db.testCleanup()) } // DestroyTables deletes all tables. // // TODO: remove this, only for bootstrapping. func (db *DB) DestroyTables(ctx context.Context) error { _, err := db.db.ExecContext(ctx, ` DROP TABLE IF EXISTS objects; DROP TABLE IF EXISTS segments; DROP TABLE IF EXISTS node_aliases; DROP SEQUENCE IF EXISTS node_alias_seq; `) db.aliasCache = NewNodeAliasCache(db) return Error.Wrap(err) } // TestMigrateToLatest replaces the migration steps with only one step to create metabase db. func (db *DB) TestMigrateToLatest(ctx context.Context) error { // First handle the idiosyncrasies of postgres and cockroach migrations. Postgres // will need to create any schemas specified in the search path, and cockroach // will need to create the database it was told to connect to. These things should // not really be here, and instead should be assumed to exist. // This is tracked in jira ticket SM-200 switch db.impl { case dbutil.Postgres: schema, err := pgutil.ParseSchemaFromConnstr(db.connstr) if err != nil { return errs.New("error parsing schema: %+v", err) } if schema != "" { err = pgutil.CreateSchema(ctx, db.db, schema) if err != nil { return errs.New("error creating schema: %+v", err) } } case dbutil.Cockroach: var dbName string if err := db.db.QueryRowContext(ctx, `SELECT current_database();`).Scan(&dbName); err != nil { return errs.New("error querying current database: %+v", err) } _, err := db.db.ExecContext(ctx, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s;`, pgutil.QuoteIdentifier(dbName))) if err != nil { return errs.Wrap(err) } } migration := &migrate.Migration{ Table: "metabase_versions", Steps: []*migrate.Step{ { DB: &db.db, Description: "Test snapshot", Version: 16, Action: migrate.SQL{ `CREATE TABLE objects ( project_id BYTEA NOT NULL, bucket_name BYTEA NOT NULL, -- we're using bucket_name here to avoid a lookup into buckets table object_key BYTEA NOT NULL, -- using 'object_key' instead of 'key' to avoid reserved word version INT4 NOT NULL, stream_id BYTEA NOT NULL, created_at TIMESTAMPTZ NOT NULL default now(), expires_at TIMESTAMPTZ, status INT2 NOT NULL default ` + pendingStatus + `, segment_count INT4 NOT NULL default 0, encrypted_metadata_nonce BYTEA default NULL, encrypted_metadata BYTEA default NULL, encrypted_metadata_encrypted_key BYTEA default NULL, total_plain_size INT8 NOT NULL default 0, -- migrated objects have this = 0 total_encrypted_size INT8 NOT NULL default 0, fixed_segment_size INT4 NOT NULL default 0, -- migrated objects have this = 0 encryption INT8 NOT NULL default 0, zombie_deletion_deadline TIMESTAMPTZ default now() + '1 day', PRIMARY KEY (project_id, bucket_name, object_key, version) ); COMMENT ON TABLE objects is 'Objects table contains information about path and streams.'; COMMENT ON COLUMN objects.project_id is 'project_id is a uuid referring to project.id.'; COMMENT ON COLUMN objects.bucket_name is 'bucket_name is a alpha-numeric string referring to bucket_metainfo.name.'; COMMENT ON COLUMN objects.object_key is 'object_key is an encrypted path of the object.'; COMMENT ON COLUMN objects.version is 'version is a monotonically increasing number per object. currently unused.'; COMMENT ON COLUMN objects.stream_id is 'stream_id is a random identifier for the content uploaded to the object.'; COMMENT ON COLUMN objects.created_at is 'created_at is the creation date of this object.'; COMMENT ON COLUMN objects.expires_at is 'expires_at is the date when this object will be marked for deletion.'; COMMENT ON COLUMN objects.status is 'status refers to metabase.ObjectStatus, where pending=1 and committed=3.'; COMMENT ON COLUMN objects.segment_count is 'segment_count indicates, how many segments are in the segments table for this object. This is zero until the object is committed.'; COMMENT ON COLUMN objects.encrypted_metadata_nonce is 'encrypted_metadata_nonce is random identifier used as part of encryption for encrypted_metadata.'; COMMENT ON COLUMN objects.encrypted_metadata is 'encrypted_metadata is encrypted key-value pairs of user-specified data.'; COMMENT ON COLUMN objects.encrypted_metadata_encrypted_key is 'encrypted_metadata_encrypted_key is the encrypted key for encrypted_metadata.'; COMMENT ON COLUMN objects.total_plain_size is 'total_plain_size is the user-specified total size of the object. This can be zero for old migrated objects.'; COMMENT ON COLUMN objects.total_encrypted_size is 'total_encrypted_size is the sum of the encrypted data sizes of segments.'; COMMENT ON COLUMN objects.fixed_segment_size is 'fixed_segment_size is specified for objects that have a uniform segment sizes (except the last segment). This can be zero for old migrated objects.'; COMMENT ON COLUMN objects.encryption is 'encryption contains object encryption parameters encoded into a uint32. See metabase.encryptionParameters type for the implementation.'; COMMENT ON COLUMN objects.zombie_deletion_deadline is 'zombie_deletion_deadline defines when a pending object can be deleted due to a failed upload.'; CREATE TABLE segments ( stream_id BYTEA NOT NULL, position INT8 NOT NULL, root_piece_id BYTEA NOT NULL, encrypted_key_nonce BYTEA NOT NULL, encrypted_key BYTEA NOT NULL, remote_alias_pieces BYTEA, encrypted_size INT4 NOT NULL, plain_offset INT8 NOT NULL, -- migrated objects have this = 0 plain_size INT4 NOT NULL, -- migrated objects have this = 0 redundancy INT8 NOT NULL default 0, inline_data BYTEA DEFAULT NULL, created_at TIMESTAMPTZ DEFAULT now() NOT NULL, repaired_at TIMESTAMPTZ, expires_at TIMESTAMPTZ, placement integer, encrypted_etag BYTEA default NULL, PRIMARY KEY (stream_id, position) ); COMMENT ON TABLE segments is 'segments table contains where segment data is located and other metadata about them.'; COMMENT ON COLUMN segments.stream_id is 'stream_id is a uuid referring to segments that belong to the same object.'; COMMENT ON COLUMN segments.position is 'position is a segment sequence number, determining the order they should be read in. It is represented as uint64, where the upper 32bits indicate the part-number and the lower 32bits indicate the index inside the part.'; COMMENT ON COLUMN segments.root_piece_id is 'root_piece_id is used for deriving per storagenode piece numbers.'; COMMENT ON COLUMN segments.encrypted_key_nonce is 'encrypted_key_nonce is random data used for encrypting the encrypted_key.'; COMMENT ON COLUMN segments.encrypted_key is 'encrypted_key is the encrypted key that was used for encrypting the data in this segment.'; COMMENT ON COLUMN segments.remote_alias_pieces is 'remote_alias_pieces is a compressed list of storagenodes that contain the pieces. See metabase.AliasPieces to see how they are compressed.'; COMMENT ON COLUMN segments.encrypted_size is 'encrypted_size is the data size after compression, but before Reed-Solomon encoding.'; COMMENT ON COLUMN segments.plain_offset is 'plain_offset is the offset of this segment in the unencrypted data stream. Old migrated objects do not have this information, and is zero.'; COMMENT ON COLUMN segments.plain_size is 'plain_size is the user-specified unencrypted size of this segment. Old migrated objects do not have this information, and it is zero.'; COMMENT ON COLUMN segments.redundancy is 'redundancy is the compressed Reed-Solomon redundancy parameters for this segment. See metabase.redundancyScheme for the compression.'; COMMENT ON COLUMN segments.inline_data is 'inline_data contains encrypted data for small objects.'; COMMENT ON COLUMN segments.created_at is 'created_at is the date when the segment was committed to the table.'; COMMENT ON COLUMN segments.repaired_at is 'repaired_at is the last date when the segment was repaired.'; COMMENT ON COLUMN segments.expires_at is 'expires_at is the date when the segment is marked for deletion.'; COMMENT ON COLUMN segments.placement is 'placement is the country or region restriction for the segment data. See storj.PlacementConstraint for the values.'; COMMENT ON COLUMN segments.encrypted_etag is 'encrypted_etag is etag that has been encrypted.'; CREATE SEQUENCE node_alias_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 -- MaxInt32 START WITH 1; CREATE TABLE node_aliases ( node_id BYTEA NOT NULL UNIQUE, node_alias INT4 NOT NULL UNIQUE default nextval('node_alias_seq') ); COMMENT ON TABLE node_aliases is 'node_aliases table contains unique identifiers (aliases) for storagenodes that take less space than a NodeID.'; COMMENT ON COLUMN node_aliases.node_id is 'node_id refers to the storj.NodeID'; COMMENT ON COLUMN node_aliases.node_alias is 'node_alias is a unique integer value assigned for the node_id. It is used for compressing segments.remote_alias_pieces.'; CREATE TABLE segment_copies ( stream_id BYTEA NOT NULL PRIMARY KEY, ancestor_stream_id BYTEA NOT NULL, CONSTRAINT not_self_ancestor CHECK (stream_id != ancestor_stream_id) ); CREATE INDEX ON segment_copies (ancestor_stream_id); COMMENT ON TABLE segment_copies is 'segment_copies contains a reference for sharing stream_id-s.'; COMMENT ON COLUMN segment_copies.stream_id is 'stream_id refers to the objects.stream_id.'; COMMENT ON COLUMN segment_copies.ancestor_stream_id is 'ancestor_stream_id refers to the actual segments where data is stored.'; `, }, }, }, } return migration.Run(ctx, db.log.Named("migrate")) } // MigrateToLatest migrates database to the latest version. func (db *DB) MigrateToLatest(ctx context.Context) error { // First handle the idiosyncrasies of postgres and cockroach migrations. Postgres // will need to create any schemas specified in the search path, and cockroach // will need to create the database it was told to connect to. These things should // not really be here, and instead should be assumed to exist. // This is tracked in jira ticket SM-200 switch db.impl { case dbutil.Postgres: schema, err := pgutil.ParseSchemaFromConnstr(db.connstr) if err != nil { return errs.New("error parsing schema: %+v", err) } if schema != "" { err = pgutil.CreateSchema(ctx, db.db, schema) if err != nil { return errs.New("error creating schema: %+v", err) } } case dbutil.Cockroach: var dbName string if err := db.db.QueryRowContext(ctx, `SELECT current_database();`).Scan(&dbName); err != nil { return errs.New("error querying current database: %+v", err) } _, err := db.db.ExecContext(ctx, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s;`, pgutil.QuoteIdentifier(dbName))) if err != nil { return errs.Wrap(err) } } migration := db.PostgresMigration() return migration.Run(ctx, db.log.Named("migrate")) } // CheckVersion checks the database is the correct version. func (db *DB) CheckVersion(ctx context.Context) error { migration := db.PostgresMigration() return migration.ValidateVersions(ctx, db.log) } // PostgresMigration returns steps needed for migrating postgres database. func (db *DB) PostgresMigration() *migrate.Migration { // TODO: merge this with satellite migration code or a way to keep them in sync. return &migrate.Migration{ Table: "metabase_versions", Steps: []*migrate.Step{ { DB: &db.db, Description: "initial setup", Version: 1, Action: migrate.SQL{ `CREATE TABLE objects ( project_id BYTEA NOT NULL, bucket_name BYTEA NOT NULL, -- we're using bucket_name here to avoid a lookup into buckets table object_key BYTEA NOT NULL, -- using 'object_key' instead of 'key' to avoid reserved word version INT4 NOT NULL, stream_id BYTEA NOT NULL, created_at TIMESTAMPTZ NOT NULL default now(), expires_at TIMESTAMPTZ, status INT2 NOT NULL default ` + pendingStatus + `, segment_count INT4 NOT NULL default 0, encrypted_metadata_nonce BYTEA default NULL, encrypted_metadata BYTEA default NULL, encrypted_metadata_encrypted_key BYTEA default NULL, total_plain_size INT4 NOT NULL default 0, -- migrated objects have this = 0 total_encrypted_size INT4 NOT NULL default 0, fixed_segment_size INT4 NOT NULL default 0, -- migrated objects have this = 0 encryption INT8 NOT NULL default 0, zombie_deletion_deadline TIMESTAMPTZ default now() + '1 day', PRIMARY KEY (project_id, bucket_name, object_key, version) )`, `CREATE TABLE segments ( stream_id BYTEA NOT NULL, position INT8 NOT NULL, root_piece_id BYTEA NOT NULL, encrypted_key_nonce BYTEA NOT NULL, encrypted_key BYTEA NOT NULL, encrypted_size INT4 NOT NULL, plain_offset INT8 NOT NULL, -- migrated objects have this = 0 plain_size INT4 NOT NULL, -- migrated objects have this = 0 redundancy INT8 NOT NULL default 0, inline_data BYTEA DEFAULT NULL, remote_pieces BYTEA[], PRIMARY KEY (stream_id, position) )`, }, }, { DB: &db.db, Description: "change total_plain_size and total_encrypted_size to INT8", Version: 2, Action: migrate.SQL{ `ALTER TABLE objects ALTER COLUMN total_plain_size TYPE INT8;`, `ALTER TABLE objects ALTER COLUMN total_encrypted_size TYPE INT8;`, }, }, { DB: &db.db, Description: "add node aliases table", Version: 3, Action: migrate.SQL{ // We use a custom sequence to ensure small alias values. `CREATE SEQUENCE node_alias_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 -- MaxInt32 START WITH 1 `, `CREATE TABLE node_aliases ( node_id BYTEA NOT NULL UNIQUE, node_alias INT4 NOT NULL UNIQUE default nextval('node_alias_seq') )`, }, }, { DB: &db.db, Description: "add remote_alias_pieces column", Version: 4, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN remote_alias_pieces BYTEA`, }, }, { DB: &db.db, Description: "drop remote_pieces from segments table", Version: 6, Action: migrate.SQL{ `ALTER TABLE segments DROP COLUMN remote_pieces`, }, }, { DB: &db.db, Description: "add created_at and repaired_at columns to segments table", Version: 7, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN created_at TIMESTAMPTZ`, `ALTER TABLE segments ADD COLUMN repaired_at TIMESTAMPTZ`, }, }, { DB: &db.db, Description: "change default of created_at column in segments table to now()", Version: 8, Action: migrate.SQL{ `ALTER TABLE segments ALTER COLUMN created_at SET DEFAULT now()`, }, }, { DB: &db.db, Description: "add encrypted_etag column to segments table", Version: 9, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN encrypted_etag BYTEA default NULL`, }, }, { DB: &db.db, Description: "add index on pending objects", Version: 10, Action: migrate.SQL{}, }, { DB: &db.db, Description: "drop pending_index on objects", Version: 11, Action: migrate.SQL{ `DROP INDEX IF EXISTS pending_index`, }, }, { DB: &db.db, Description: "add expires_at column to segments", Version: 12, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN expires_at TIMESTAMPTZ`, }, }, { DB: &db.db, Description: "add NOT NULL constraint to created_at column in segments table", Version: 13, Action: migrate.SQL{ `ALTER TABLE segments ALTER COLUMN created_at SET NOT NULL`, }, }, { DB: &db.db, Description: "ADD placement to the segments table", Version: 14, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN placement integer`, }, }, { DB: &db.db, Description: "add table for segment copies", Version: 15, Action: migrate.SQL{ `CREATE TABLE segment_copies ( stream_id BYTEA NOT NULL PRIMARY KEY, ancestor_stream_id BYTEA NOT NULL, CONSTRAINT not_self_ancestor CHECK (stream_id != ancestor_stream_id) )`, `CREATE INDEX ON segment_copies (ancestor_stream_id)`, }, }, { DB: &db.db, Description: "add database comments", Version: 16, Action: migrate.SQL{` -- objects table COMMENT ON TABLE objects is 'Objects table contains information about path and streams.'; COMMENT ON COLUMN objects.project_id is 'project_id is a uuid referring to project.id.'; COMMENT ON COLUMN objects.bucket_name is 'bucket_name is a alpha-numeric string referring to bucket_metainfo.name.'; COMMENT ON COLUMN objects.object_key is 'object_key is an encrypted path of the object.'; COMMENT ON COLUMN objects.version is 'version is a monotonically increasing number per object. currently unused.'; COMMENT ON COLUMN objects.stream_id is 'stream_id is a random identifier for the content uploaded to the object.'; COMMENT ON COLUMN objects.created_at is 'created_at is the creation date of this object.'; COMMENT ON COLUMN objects.expires_at is 'expires_at is the date when this object will be marked for deletion.'; COMMENT ON COLUMN objects.status is 'status refers to metabase.ObjectStatus, where pending=1 and committed=3.'; COMMENT ON COLUMN objects.segment_count is 'segment_count indicates, how many segments are in the segments table for this object. This is zero until the object is committed.'; COMMENT ON COLUMN objects.encrypted_metadata_nonce is 'encrypted_metadata_nonce is random identifier used as part of encryption for encrypted_metadata.'; COMMENT ON COLUMN objects.encrypted_metadata is 'encrypted_metadata is encrypted key-value pairs of user-specified data.'; COMMENT ON COLUMN objects.encrypted_metadata_encrypted_key is 'encrypted_metadata_encrypted_key is the encrypted key for encrypted_metadata.'; COMMENT ON COLUMN objects.total_plain_size is 'total_plain_size is the user-specified total size of the object. This can be zero for old migrated objects.'; COMMENT ON COLUMN objects.total_encrypted_size is 'total_encrypted_size is the sum of the encrypted data sizes of segments.'; COMMENT ON COLUMN objects.fixed_segment_size is 'fixed_segment_size is specified for objects that have a uniform segment sizes (except the last segment). This can be zero for old migrated objects.'; COMMENT ON COLUMN objects.encryption is 'encryption contains object encryption parameters encoded into a uint32. See metabase.encryptionParameters type for the implementation.'; COMMENT ON COLUMN objects.zombie_deletion_deadline is 'zombie_deletion_deadline defines when a pending object can be deleted due to a failed upload.'; -- segments table COMMENT ON TABLE segments is 'segments table contains where segment data is located and other metadata about them.'; COMMENT ON COLUMN segments.stream_id is 'stream_id is a uuid referring to segments that belong to the same object.'; COMMENT ON COLUMN segments.position is 'position is a segment sequence number, determining the order they should be read in. It is represented as uint64, where the upper 32bits indicate the part-number and the lower 32bits indicate the index inside the part.'; COMMENT ON COLUMN segments.root_piece_id is 'root_piece_id is used for deriving per storagenode piece numbers.'; COMMENT ON COLUMN segments.encrypted_key_nonce is 'encrypted_key_nonce is random data used for encrypting the encrypted_key.'; COMMENT ON COLUMN segments.encrypted_key is 'encrypted_key is the encrypted key that was used for encrypting the data in this segment.'; COMMENT ON COLUMN segments.remote_alias_pieces is 'remote_alias_pieces is a compressed list of storagenodes that contain the pieces. See metabase.AliasPieces to see how they are compressed.'; COMMENT ON COLUMN segments.encrypted_size is 'encrypted_size is the data size after compression, but before Reed-Solomon encoding.'; COMMENT ON COLUMN segments.plain_offset is 'plain_offset is the offset of this segment in the unencrypted data stream. Old migrated objects do not have this information, and is zero.'; COMMENT ON COLUMN segments.plain_size is 'plain_size is the user-specified unencrypted size of this segment. Old migrated objects do not have this information, and it is zero.'; COMMENT ON COLUMN segments.redundancy is 'redundancy is the compressed Reed-Solomon redundancy parameters for this segment. See metabase.redundancyScheme for the compression.'; COMMENT ON COLUMN segments.inline_data is 'inline_data contains encrypted data for small objects.'; COMMENT ON COLUMN segments.created_at is 'created_at is the date when the segment was committed to the table.'; COMMENT ON COLUMN segments.repaired_at is 'repaired_at is the last date when the segment was repaired.'; COMMENT ON COLUMN segments.expires_at is 'expires_at is the date when the segment is marked for deletion.'; COMMENT ON COLUMN segments.placement is 'placement is the country or region restriction for the segment data. See storj.PlacementConstraint for the values.'; COMMENT ON COLUMN segments.encrypted_etag is 'encrypted_etag is etag that has been encrypted.'; -- node aliases table COMMENT ON TABLE node_aliases is 'node_aliases table contains unique identifiers (aliases) for storagenodes that take less space than a NodeID.'; COMMENT ON COLUMN node_aliases.node_id is 'node_id refers to the storj.NodeID'; COMMENT ON COLUMN node_aliases.node_alias is 'node_alias is a unique integer value assigned for the node_id. It is used for compressing segments.remote_alias_pieces.'; -- segment copies table COMMENT ON TABLE segment_copies is 'segment_copies contains a reference for sharing stream_id-s.'; COMMENT ON COLUMN segment_copies.stream_id is 'stream_id refers to the objects.stream_id.'; COMMENT ON COLUMN segment_copies.ancestor_stream_id is 'ancestor_stream_id refers to the actual segments where data is stored.'; `}, }, }, } } // This is needed for migrate to work. // TODO: clean this up. type postgresRebind struct{ tagsql.DB } func (pq postgresRebind) Rebind(sql string) string { type sqlParseState int const ( sqlParseStart sqlParseState = iota sqlParseInStringLiteral sqlParseInQuotedIdentifier sqlParseInComment ) out := make([]byte, 0, len(sql)+10) j := 1 state := sqlParseStart for i := 0; i < len(sql); i++ { ch := sql[i] switch state { case sqlParseStart: switch ch { case '?': out = append(out, '$') out = append(out, strconv.Itoa(j)...) state = sqlParseStart j++ continue case '-': if i+1 < len(sql) && sql[i+1] == '-' { state = sqlParseInComment } case '"': state = sqlParseInQuotedIdentifier case '\'': state = sqlParseInStringLiteral } case sqlParseInStringLiteral: if ch == '\'' { state = sqlParseStart } case sqlParseInQuotedIdentifier: if ch == '"' { state = sqlParseStart } case sqlParseInComment: if ch == '\n' { state = sqlParseStart } } out = append(out, ch) } return string(out) } // Now returns time on the database. func (db *DB) Now(ctx context.Context) (time.Time, error) { var t time.Time err := db.db.QueryRowContext(ctx, `SELECT now()`).Scan(&t) return t, Error.Wrap(err) } func (db *DB) asOfTime(asOfSystemTime time.Time, asOfSystemInterval time.Duration) string { return limitedAsOfSystemTime(db.impl, time.Now(), asOfSystemTime, asOfSystemInterval) } func limitedAsOfSystemTime(impl dbutil.Implementation, now, baseline time.Time, maxInterval time.Duration) string { if baseline.IsZero() || now.IsZero() { return impl.AsOfSystemInterval(maxInterval) } interval := now.Sub(baseline) if interval < 0 { return "" } // maxInterval is negative if maxInterval < 0 && interval > -maxInterval { return impl.AsOfSystemInterval(maxInterval) } return impl.AsOfSystemTime(baseline) }