// Copyright (C) 2020 Storj Labs, Inc. // See LICENSE for copying information. // Package metabase implements storing objects and segements. package metabase import ( "context" "fmt" "sort" "strconv" _ "github.com/jackc/pgx/v4" // registers pgx as a tagsql driver. _ "github.com/jackc/pgx/v4/stdlib" // registers pgx as a tagsql driver. "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/storj" "storj.io/common/uuid" "storj.io/private/dbutil" "storj.io/private/dbutil/pgutil" "storj.io/private/tagsql" "storj.io/storj/private/migrate" ) var ( mon = monkit.Package() ) // DB implements a database for storing objects and segments. type DB struct { log *zap.Logger db tagsql.DB connstr string implementation dbutil.Implementation aliasCache *NodeAliasCache } // Open opens a connection to metabase. func Open(ctx context.Context, log *zap.Logger, driverName, connstr string) (*DB, error) { rawdb, err := tagsql.Open(ctx, driverName, connstr) if err != nil { return nil, Error.Wrap(err) } dbutil.Configure(ctx, rawdb, "metabase", mon) db := &DB{log: log, connstr: connstr, db: postgresRebind{rawdb}} db.aliasCache = NewNodeAliasCache(db) _, _, db.implementation, err = dbutil.SplitConnStr(connstr) if err != nil { return nil, Error.Wrap(err) } return db, nil } // InternalImplementation returns *metabase.DB. // TODO: remove. func (db *DB) InternalImplementation() interface{} { return db } // UnderlyingTagSQL returns *tagsql.DB. // TODO: remove. func (db *DB) UnderlyingTagSQL() tagsql.DB { return db.db } // Ping checks whether connection has been established. func (db *DB) Ping(ctx context.Context) error { return Error.Wrap(db.db.PingContext(ctx)) } // Close closes the connection to database. func (db *DB) Close() error { return Error.Wrap(db.db.Close()) } // DestroyTables deletes all tables. // // TODO: remove this, only for bootstrapping. func (db *DB) DestroyTables(ctx context.Context) error { _, err := db.db.ExecContext(ctx, ` DROP TABLE IF EXISTS objects; DROP TABLE IF EXISTS segments; DROP TABLE IF EXISTS node_aliases; DROP SEQUENCE IF EXISTS node_alias_seq; `) db.aliasCache = NewNodeAliasCache(db) return Error.Wrap(err) } // MigrateToLatest migrates database to the latest version. func (db *DB) MigrateToLatest(ctx context.Context) error { // First handle the idiosyncrasies of postgres and cockroach migrations. Postgres // will need to create any schemas specified in the search path, and cockroach // will need to create the database it was told to connect to. These things should // not really be here, and instead should be assumed to exist. // This is tracked in jira ticket SM-200 switch db.implementation { case dbutil.Postgres: schema, err := pgutil.ParseSchemaFromConnstr(db.connstr) if err != nil { return errs.New("error parsing schema: %+v", err) } if schema != "" { err = pgutil.CreateSchema(ctx, db.db, schema) if err != nil { return errs.New("error creating schema: %+v", err) } } case dbutil.Cockroach: var dbName string if err := db.db.QueryRow(ctx, `SELECT current_database();`).Scan(&dbName); err != nil { return errs.New("error querying current database: %+v", err) } _, err := db.db.Exec(ctx, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s;`, pgutil.QuoteIdentifier(dbName))) if err != nil { return errs.Wrap(err) } } migration := db.PostgresMigration() return migration.Run(ctx, db.log.Named("migrate")) } // CheckVersion checks the database is the correct version. func (db *DB) CheckVersion(ctx context.Context) error { migration := db.PostgresMigration() return migration.ValidateVersions(ctx, db.log) } // PostgresMigration returns steps needed for migrating postgres database. func (db *DB) PostgresMigration() *migrate.Migration { // TODO: merge this with satellite migration code or a way to keep them in sync. return &migrate.Migration{ Table: "metabase_versions", Steps: []*migrate.Step{ { DB: &db.db, Description: "initial setup", Version: 1, Action: migrate.SQL{ `CREATE TABLE objects ( project_id BYTEA NOT NULL, bucket_name BYTEA NOT NULL, -- we're using bucket_name here to avoid a lookup into buckets table object_key BYTEA NOT NULL, -- using 'object_key' instead of 'key' to avoid reserved word version INT4 NOT NULL, stream_id BYTEA NOT NULL, created_at TIMESTAMPTZ NOT NULL default now(), expires_at TIMESTAMPTZ, status INT2 NOT NULL default ` + pendingStatus + `, segment_count INT4 NOT NULL default 0, encrypted_metadata_nonce BYTEA default NULL, encrypted_metadata BYTEA default NULL, encrypted_metadata_encrypted_key BYTEA default NULL, total_plain_size INT4 NOT NULL default 0, -- migrated objects have this = 0 total_encrypted_size INT4 NOT NULL default 0, fixed_segment_size INT4 NOT NULL default 0, -- migrated objects have this = 0 encryption INT8 NOT NULL default 0, zombie_deletion_deadline TIMESTAMPTZ default now() + '1 day', PRIMARY KEY (project_id, bucket_name, object_key, version) )`, `CREATE TABLE segments ( stream_id BYTEA NOT NULL, position INT8 NOT NULL, root_piece_id BYTEA NOT NULL, encrypted_key_nonce BYTEA NOT NULL, encrypted_key BYTEA NOT NULL, encrypted_size INT4 NOT NULL, plain_offset INT8 NOT NULL, -- migrated objects have this = 0 plain_size INT4 NOT NULL, -- migrated objects have this = 0 redundancy INT8 NOT NULL default 0, inline_data BYTEA DEFAULT NULL, remote_pieces BYTEA[], PRIMARY KEY (stream_id, position) )`, }, }, { DB: &db.db, Description: "change total_plain_size and total_encrypted_size to INT8", Version: 2, Action: migrate.SQL{ `ALTER TABLE objects ALTER COLUMN total_plain_size TYPE INT8;`, `ALTER TABLE objects ALTER COLUMN total_encrypted_size TYPE INT8;`, }, }, { DB: &db.db, Description: "add node aliases table", Version: 3, Action: migrate.SQL{ // We use a custom sequence to ensure small alias values. `CREATE SEQUENCE node_alias_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 -- MaxInt32 START WITH 1 `, `CREATE TABLE node_aliases ( node_id BYTEA NOT NULL UNIQUE, node_alias INT4 NOT NULL UNIQUE default nextval('node_alias_seq') )`, }, }, { DB: &db.db, Description: "add remote_alias_pieces column", Version: 4, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN remote_alias_pieces BYTEA`, }, }, { DB: &db.db, Description: "convert remote_pieces to remote_alias_pieces", Version: 5, Action: migrate.Func(func(ctx context.Context, log *zap.Logger, db tagsql.DB, tx tagsql.Tx) error { type segmentPieces struct { StreamID uuid.UUID Position SegmentPosition RemotePieces Pieces } var allSegments []segmentPieces err := withRows(tx.QueryContext(ctx, `SELECT stream_id, position, remote_pieces FROM segments WHERE remote_pieces IS NOT NULL`))( func(rows tagsql.Rows) error { for rows.Next() { var seg segmentPieces if err := rows.Scan(&seg.StreamID, &seg.Position, &seg.RemotePieces); err != nil { return Error.Wrap(err) } allSegments = append(allSegments, seg) } return nil }) if err != nil { return Error.Wrap(err) } allNodes := map[storj.NodeID]struct{}{} for i := range allSegments { seg := &allSegments[i] for k := range seg.RemotePieces { p := &seg.RemotePieces[k] allNodes[p.StorageNode] = struct{}{} } } nodesList := []storj.NodeID{} for id := range allNodes { nodesList = append(nodesList, id) } aliasCache := NewNodeAliasCache(&txNodeAliases{tx}) _, err = aliasCache.Aliases(ctx, nodesList) if err != nil { return Error.Wrap(err) } err = func() (err error) { stmt, err := tx.PrepareContext(ctx, `UPDATE segments SET remote_alias_pieces = $3 WHERE stream_id = $1 AND position = $2`) if err != nil { return Error.Wrap(err) } defer func() { err = errs.Combine(err, Error.Wrap(stmt.Close())) }() for i := range allSegments { seg := &allSegments[i] if len(seg.RemotePieces) == 0 { continue } aliases, err := aliasCache.ConvertPiecesToAliases(ctx, seg.RemotePieces) if err != nil { return Error.Wrap(err) } sort.Slice(aliases, func(i, k int) bool { return aliases[i].Number < aliases[k].Number }) _, err = stmt.ExecContext(ctx, seg.StreamID, seg.Position, aliases) if err != nil { return Error.Wrap(err) } } return nil }() if err != nil { return err } return nil }), }, { DB: &db.db, Description: "drop remote_pieces from segments table", Version: 6, Action: migrate.SQL{ `ALTER TABLE segments DROP COLUMN remote_pieces`, }, }, { DB: &db.db, Description: "add created_at and repaired_at columns to segments table", Version: 7, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN created_at TIMESTAMPTZ`, `ALTER TABLE segments ADD COLUMN repaired_at TIMESTAMPTZ`, }, }, { DB: &db.db, Description: "change default of created_at column in segments table to now()", Version: 8, Action: migrate.SQL{ `ALTER TABLE segments ALTER COLUMN created_at SET DEFAULT now()`, }, }, { DB: &db.db, Description: "add encrypted_etag column to segments table", Version: 9, Action: migrate.SQL{ `ALTER TABLE segments ADD COLUMN encrypted_etag BYTEA default NULL`, }, }, { DB: &db.db, Description: "add index on pending objects", Version: 10, Action: migrate.SQL{ `CREATE INDEX IF NOT EXISTS pending_index ON objects (project_id, bucket_name) WHERE status=` + pendingStatus, }, }, }, } } // This is needed for migrate to work. // TODO: clean this up. type postgresRebind struct{ tagsql.DB } func (pq postgresRebind) Rebind(sql string) string { type sqlParseState int const ( sqlParseStart sqlParseState = iota sqlParseInStringLiteral sqlParseInQuotedIdentifier sqlParseInComment ) out := make([]byte, 0, len(sql)+10) j := 1 state := sqlParseStart for i := 0; i < len(sql); i++ { ch := sql[i] switch state { case sqlParseStart: switch ch { case '?': out = append(out, '$') out = append(out, strconv.Itoa(j)...) state = sqlParseStart j++ continue case '-': if i+1 < len(sql) && sql[i+1] == '-' { state = sqlParseInComment } case '"': state = sqlParseInQuotedIdentifier case '\'': state = sqlParseInStringLiteral } case sqlParseInStringLiteral: if ch == '\'' { state = sqlParseStart } case sqlParseInQuotedIdentifier: if ch == '"' { state = sqlParseStart } case sqlParseInComment: if ch == '\n' { state = sqlParseStart } } out = append(out, ch) } return string(out) }