From 1df7b360d791fa134b960cdaaefb81d49c036221 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Thu, 5 Dec 2019 10:22:27 -0700 Subject: [PATCH] satellite/metainfo: Use cockroachdb client for metainfo db Change-Id: I3cf7a00de4f654eacaffbb494f4841c64a2d9ce6 --- private/dbutil/pgutil/pgtest/flag.go | 2 +- satellite/metainfo/config.go | 13 +++-- storage/cockroachkv/client.go | 68 ++++++++----------------- storage/cockroachkv/client_test.go | 15 ++++-- storage/cockroachkv/ordered_iterator.go | 2 +- storage/cockroachkv/schema/migrate.go | 29 +++++++++++ 6 files changed, 70 insertions(+), 59 deletions(-) create mode 100644 storage/cockroachkv/schema/migrate.go diff --git a/private/dbutil/pgutil/pgtest/flag.go b/private/dbutil/pgutil/pgtest/flag.go index 38a6f4bfc..678f4c47b 100644 --- a/private/dbutil/pgutil/pgtest/flag.go +++ b/private/dbutil/pgutil/pgtest/flag.go @@ -20,4 +20,4 @@ var CrdbConnStr = flag.String("cockroach-test-db", os.Getenv("STORJ_COCKROACH_TE const DefaultConnStr = "postgres://storj:storj-pass@test-postgres/teststorj?sslmode=disable" // DefaultCrdbConnStr is expected to work when a local cockroachDB instance is running -const DefaultCrdbConnStr = "postgres://root@localhost:26257/master?sslmode=disable" +const DefaultCrdbConnStr = "cockroach://root@localhost:26257/master?sslmode=disable" diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 48897629b..5841e19f9 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -12,6 +12,7 @@ import ( "storj.io/storj/private/memory" "storj.io/storj/storage" "storj.io/storj/storage/boltdb" + "storj.io/storj/storage/cockroachkv" "storj.io/storj/storage/postgreskv" ) @@ -54,18 +55,20 @@ type PointerDB interface { // NewStore returns database for storing pointer data func NewStore(logger *zap.Logger, dbURLString string) (db PointerDB, err error) { - driver, source, _, err := dbutil.SplitConnStr(dbURLString) + _, source, implementation, err := dbutil.SplitConnStr(dbURLString) if err != nil { return nil, err } - switch driver { - case "bolt": + switch implementation { + case dbutil.Bolt: db, err = boltdb.New(source, BoltPointerBucket) - case "postgresql", "postgres": + case dbutil.Postgres: db, err = postgreskv.New(source) + case dbutil.Cockroach: + db, err = cockroachkv.New(source) default: - err = Error.New("unsupported db scheme: %s", driver) + err = Error.New("unsupported db implementation: %s", dbURLString) } if err != nil { diff --git a/storage/cockroachkv/client.go b/storage/cockroachkv/client.go index 443636058..3508e0df7 100644 --- a/storage/cockroachkv/client.go +++ b/storage/cockroachkv/client.go @@ -7,16 +7,15 @@ import ( "bytes" "context" "database/sql" - "net/url" - "path" "github.com/cockroachdb/cockroach-go/crdb" "github.com/lib/pq" "github.com/zeebo/errs" - monkit "gopkg.in/spacemonkeygo/monkit.v2" + "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/private/dbutil" "storj.io/storj/storage" + "storj.io/storj/storage/cockroachkv/schema" ) const defaultBatchSize = 128 @@ -27,63 +26,36 @@ var ( // Client is the entrypoint into a cockroachkv data store type Client struct { - URL string - conn *sql.DB -} - -func modifyURL(dbURL string) (modifiedURL string, databaseName string, err error) { - u, err := url.Parse(dbURL) - if err != nil { - return "", "", err - } - u.Scheme = "postgres" - - q := u.Query() - if schema := q.Get("search_path"); schema != "" { - u.Path = path.Join(u.Path, schema) - q.Del("search_path") - u.RawQuery = q.Encode() - } - - return u.String(), u.Path[1:], nil + db *sql.DB } // New instantiates a new postgreskv client given db URL func New(dbURL string) (*Client, error) { - modifiedURL, _, err := modifyURL(dbURL) + db, err := sql.Open("postgres", dbURL) if err != nil { return nil, err } - conn, err := sql.Open("postgres", modifiedURL) + dbutil.Configure(db, mon) + + err = schema.PrepareDB(db) if err != nil { return nil, err } - dbutil.Configure(conn, mon) + return NewWith(db), nil +} - // TODO: Need to bring this back but sourcing CockroachDB compatible schema. - // err = schema.PrepareDB(pgConn, dbURL) - // if err != nil { - // return nil, err - // } - return &Client{ - URL: modifiedURL, - conn: conn, - }, nil +// NewWith instantiates a new postgreskv client given db. +func NewWith(db *sql.DB) *Client { + return &Client{db: db} } // Close closes the client func (client *Client) Close() error { - return client.conn.Close() + return client.db.Close() } -// TODO: Need to bring this back but sourcing CockroachDB compatible schema. -// DropSchema drops the schema. -// func (client *Client) DropSchema(schema string) error { -// return pgutil.DropSchema(client.pgConn, schema) -// } - // Put sets the value for the provided key. func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) { defer mon.Task()(&ctx)(&err) @@ -98,7 +70,7 @@ func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Va ON CONFLICT (fullpath) DO UPDATE SET metadata = EXCLUDED.metadata ` - _, err = client.conn.Exec(q, []byte(key), []byte(value)) + _, err = client.db.Exec(q, []byte(key), []byte(value)) return Error.Wrap(err) } @@ -111,7 +83,7 @@ func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value } q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA" - row := client.conn.QueryRow(q, []byte(key)) + row := client.db.QueryRow(q, []byte(key)) var val []byte err = row.Scan(&val) @@ -139,7 +111,7 @@ func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage. ON (pd.fullpath = pk.request) ORDER BY pk.ord ` - rows, err := client.conn.Query(q, pq.ByteaArray(keys.ByteSlices())) + rows, err := client.db.Query(q, pq.ByteaArray(keys.ByteSlices())) if err != nil { return nil, errs.Wrap(err) } @@ -166,7 +138,7 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) { } q := "DELETE FROM pathdata WHERE fullpath = $1:::BYTEA" - result, err := client.conn.Exec(q, []byte(key)) + result, err := client.db.Exec(q, []byte(key)) if err != nil { return err } @@ -211,7 +183,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa if oldValue == nil && newValue == nil { q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA" - row := client.conn.QueryRow(q, []byte(key)) + row := client.db.QueryRow(q, []byte(key)) var val []byte err = row.Scan(&val) @@ -231,7 +203,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa ON CONFLICT DO NOTHING RETURNING 1 ` - row := client.conn.QueryRow(q, []byte(key), []byte(newValue)) + row := client.db.QueryRow(q, []byte(key), []byte(newValue)) var val []byte err = row.Scan(&val) @@ -241,7 +213,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa return Error.Wrap(err) } - return crdb.ExecuteTx(ctx, client.conn, nil, func(txn *sql.Tx) error { + return crdb.ExecuteTx(ctx, client.db, nil, func(txn *sql.Tx) error { q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA;" row := txn.QueryRowContext(ctx, q, []byte(key)) diff --git a/storage/cockroachkv/client_test.go b/storage/cockroachkv/client_test.go index 4501682f5..199605f24 100644 --- a/storage/cockroachkv/client_test.go +++ b/storage/cockroachkv/client_test.go @@ -7,22 +7,29 @@ import ( _ "github.com/lib/pq" + "storj.io/storj/private/dbutil/cockroachutil" "storj.io/storj/private/dbutil/pgutil/pgtest" + "storj.io/storj/storage/cockroachkv/schema" "storj.io/storj/storage/testsuite" ) func newTestCockroachDB(t testing.TB) (store *Client, cleanup func()) { if *pgtest.CrdbConnStr == "" { - t.Skipf("postgres flag missing, example:\n-cockroach-test-db=%s", pgtest.DefaultCrdbConnStr) + t.Skipf("cockroach flag missing, example:\n-cockroach-test-db=%s", pgtest.DefaultCrdbConnStr) } - crdb, err := New(*pgtest.CrdbConnStr) + tdb, err := cockroachutil.OpenUnique(*pgtest.CrdbConnStr, "test-schema") if err != nil { t.Fatalf("init: %+v", err) } - return crdb, func() { - if err := crdb.Close(); err != nil { + err = schema.PrepareDB(tdb.DB) + if err != nil { + t.Fatalf("init: %+v", err) + } + + return NewWith(tdb.DB), func() { + if err := tdb.Close(); err != nil { t.Fatalf("failed to close db: %v", err) } } diff --git a/storage/cockroachkv/ordered_iterator.go b/storage/cockroachkv/ordered_iterator.go index f5d38aeec..ff3adae38 100644 --- a/storage/cockroachkv/ordered_iterator.go +++ b/storage/cockroachkv/ordered_iterator.go @@ -137,7 +137,7 @@ func (opi *orderedCockroachIterator) doNextQuery(ctx context.Context) (_ *sql.Ro gt = ">=" } - return opi.client.conn.Query(fmt.Sprintf(` + return opi.client.db.Query(fmt.Sprintf(` SELECT pd.fullpath, pd.metadata FROM pathdata pd WHERE pd.fullpath %s $1:::BYTEA diff --git a/storage/cockroachkv/schema/migrate.go b/storage/cockroachkv/schema/migrate.go new file mode 100644 index 000000000..436525f0b --- /dev/null +++ b/storage/cockroachkv/schema/migrate.go @@ -0,0 +1,29 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package schema + +import ( + "database/sql" + + "github.com/zeebo/errs" +) + +// PrepareDB creates the pathdata tables if they don't already exist. +func PrepareDB(db *sql.DB) (err error) { + // Note: the buckets table is unused. It exists here to ease importing + // backups from postgres. Similarly, the bucket column in pathdata is + // also unused and exists to ease imports. + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS buckets ( + bucketname BYTES PRIMARY KEY, + delim INT8 NOT NULL + ); + CREATE TABLE IF NOT EXISTS pathdata ( + fullpath BYTEA PRIMARY KEY, + metadata BYTEA NOT NULL, + bucket BYTEA + ); + `) + return errs.Wrap(err) +}