satellite/metainfo: Use cockroachdb client for metainfo db
Change-Id: I3cf7a00de4f654eacaffbb494f4841c64a2d9ce6
This commit is contained in:
parent
f15192ea40
commit
1df7b360d7
@ -20,4 +20,4 @@ var CrdbConnStr = flag.String("cockroach-test-db", os.Getenv("STORJ_COCKROACH_TE
|
||||
const DefaultConnStr = "postgres://storj:storj-pass@test-postgres/teststorj?sslmode=disable"
|
||||
|
||||
// DefaultCrdbConnStr is expected to work when a local cockroachDB instance is running
|
||||
const DefaultCrdbConnStr = "postgres://root@localhost:26257/master?sslmode=disable"
|
||||
const DefaultCrdbConnStr = "cockroach://root@localhost:26257/master?sslmode=disable"
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"storj.io/storj/private/memory"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storage/boltdb"
|
||||
"storj.io/storj/storage/cockroachkv"
|
||||
"storj.io/storj/storage/postgreskv"
|
||||
)
|
||||
|
||||
@ -54,18 +55,20 @@ type PointerDB interface {
|
||||
|
||||
// NewStore returns database for storing pointer data
|
||||
func NewStore(logger *zap.Logger, dbURLString string) (db PointerDB, err error) {
|
||||
driver, source, _, err := dbutil.SplitConnStr(dbURLString)
|
||||
_, source, implementation, err := dbutil.SplitConnStr(dbURLString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch driver {
|
||||
case "bolt":
|
||||
switch implementation {
|
||||
case dbutil.Bolt:
|
||||
db, err = boltdb.New(source, BoltPointerBucket)
|
||||
case "postgresql", "postgres":
|
||||
case dbutil.Postgres:
|
||||
db, err = postgreskv.New(source)
|
||||
case dbutil.Cockroach:
|
||||
db, err = cockroachkv.New(source)
|
||||
default:
|
||||
err = Error.New("unsupported db scheme: %s", driver)
|
||||
err = Error.New("unsupported db implementation: %s", dbURLString)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -7,16 +7,15 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"net/url"
|
||||
"path"
|
||||
|
||||
"github.com/cockroachdb/cockroach-go/crdb"
|
||||
"github.com/lib/pq"
|
||||
"github.com/zeebo/errs"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/private/dbutil"
|
||||
"storj.io/storj/storage"
|
||||
"storj.io/storj/storage/cockroachkv/schema"
|
||||
)
|
||||
|
||||
const defaultBatchSize = 128
|
||||
@ -27,63 +26,36 @@ var (
|
||||
|
||||
// Client is the entrypoint into a cockroachkv data store
|
||||
type Client struct {
|
||||
URL string
|
||||
conn *sql.DB
|
||||
}
|
||||
|
||||
func modifyURL(dbURL string) (modifiedURL string, databaseName string, err error) {
|
||||
u, err := url.Parse(dbURL)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
u.Scheme = "postgres"
|
||||
|
||||
q := u.Query()
|
||||
if schema := q.Get("search_path"); schema != "" {
|
||||
u.Path = path.Join(u.Path, schema)
|
||||
q.Del("search_path")
|
||||
u.RawQuery = q.Encode()
|
||||
}
|
||||
|
||||
return u.String(), u.Path[1:], nil
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// New instantiates a new postgreskv client given db URL
|
||||
func New(dbURL string) (*Client, error) {
|
||||
modifiedURL, _, err := modifyURL(dbURL)
|
||||
db, err := sql.Open("postgres", dbURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := sql.Open("postgres", modifiedURL)
|
||||
dbutil.Configure(db, mon)
|
||||
|
||||
err = schema.PrepareDB(db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbutil.Configure(conn, mon)
|
||||
return NewWith(db), nil
|
||||
}
|
||||
|
||||
// TODO: Need to bring this back but sourcing CockroachDB compatible schema.
|
||||
// err = schema.PrepareDB(pgConn, dbURL)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
return &Client{
|
||||
URL: modifiedURL,
|
||||
conn: conn,
|
||||
}, nil
|
||||
// NewWith instantiates a new postgreskv client given db.
|
||||
func NewWith(db *sql.DB) *Client {
|
||||
return &Client{db: db}
|
||||
}
|
||||
|
||||
// Close closes the client
|
||||
func (client *Client) Close() error {
|
||||
return client.conn.Close()
|
||||
return client.db.Close()
|
||||
}
|
||||
|
||||
// TODO: Need to bring this back but sourcing CockroachDB compatible schema.
|
||||
// DropSchema drops the schema.
|
||||
// func (client *Client) DropSchema(schema string) error {
|
||||
// return pgutil.DropSchema(client.pgConn, schema)
|
||||
// }
|
||||
|
||||
// Put sets the value for the provided key.
|
||||
func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -98,7 +70,7 @@ func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Va
|
||||
ON CONFLICT (fullpath) DO UPDATE SET metadata = EXCLUDED.metadata
|
||||
`
|
||||
|
||||
_, err = client.conn.Exec(q, []byte(key), []byte(value))
|
||||
_, err = client.db.Exec(q, []byte(key), []byte(value))
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -111,7 +83,7 @@ func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value
|
||||
}
|
||||
|
||||
q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA"
|
||||
row := client.conn.QueryRow(q, []byte(key))
|
||||
row := client.db.QueryRow(q, []byte(key))
|
||||
|
||||
var val []byte
|
||||
err = row.Scan(&val)
|
||||
@ -139,7 +111,7 @@ func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.
|
||||
ON (pd.fullpath = pk.request)
|
||||
ORDER BY pk.ord
|
||||
`
|
||||
rows, err := client.conn.Query(q, pq.ByteaArray(keys.ByteSlices()))
|
||||
rows, err := client.db.Query(q, pq.ByteaArray(keys.ByteSlices()))
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
@ -166,7 +138,7 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
||||
}
|
||||
|
||||
q := "DELETE FROM pathdata WHERE fullpath = $1:::BYTEA"
|
||||
result, err := client.conn.Exec(q, []byte(key))
|
||||
result, err := client.db.Exec(q, []byte(key))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -211,7 +183,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa
|
||||
|
||||
if oldValue == nil && newValue == nil {
|
||||
q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA"
|
||||
row := client.conn.QueryRow(q, []byte(key))
|
||||
row := client.db.QueryRow(q, []byte(key))
|
||||
|
||||
var val []byte
|
||||
err = row.Scan(&val)
|
||||
@ -231,7 +203,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa
|
||||
ON CONFLICT DO NOTHING
|
||||
RETURNING 1
|
||||
`
|
||||
row := client.conn.QueryRow(q, []byte(key), []byte(newValue))
|
||||
row := client.db.QueryRow(q, []byte(key), []byte(newValue))
|
||||
|
||||
var val []byte
|
||||
err = row.Scan(&val)
|
||||
@ -241,7 +213,7 @@ func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldVa
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
return crdb.ExecuteTx(ctx, client.conn, nil, func(txn *sql.Tx) error {
|
||||
return crdb.ExecuteTx(ctx, client.db, nil, func(txn *sql.Tx) error {
|
||||
q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA;"
|
||||
row := txn.QueryRowContext(ctx, q, []byte(key))
|
||||
|
||||
|
@ -7,22 +7,29 @@ import (
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
"storj.io/storj/private/dbutil/cockroachutil"
|
||||
"storj.io/storj/private/dbutil/pgutil/pgtest"
|
||||
"storj.io/storj/storage/cockroachkv/schema"
|
||||
"storj.io/storj/storage/testsuite"
|
||||
)
|
||||
|
||||
func newTestCockroachDB(t testing.TB) (store *Client, cleanup func()) {
|
||||
if *pgtest.CrdbConnStr == "" {
|
||||
t.Skipf("postgres flag missing, example:\n-cockroach-test-db=%s", pgtest.DefaultCrdbConnStr)
|
||||
t.Skipf("cockroach flag missing, example:\n-cockroach-test-db=%s", pgtest.DefaultCrdbConnStr)
|
||||
}
|
||||
|
||||
crdb, err := New(*pgtest.CrdbConnStr)
|
||||
tdb, err := cockroachutil.OpenUnique(*pgtest.CrdbConnStr, "test-schema")
|
||||
if err != nil {
|
||||
t.Fatalf("init: %+v", err)
|
||||
}
|
||||
|
||||
return crdb, func() {
|
||||
if err := crdb.Close(); err != nil {
|
||||
err = schema.PrepareDB(tdb.DB)
|
||||
if err != nil {
|
||||
t.Fatalf("init: %+v", err)
|
||||
}
|
||||
|
||||
return NewWith(tdb.DB), func() {
|
||||
if err := tdb.Close(); err != nil {
|
||||
t.Fatalf("failed to close db: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ func (opi *orderedCockroachIterator) doNextQuery(ctx context.Context) (_ *sql.Ro
|
||||
gt = ">="
|
||||
}
|
||||
|
||||
return opi.client.conn.Query(fmt.Sprintf(`
|
||||
return opi.client.db.Query(fmt.Sprintf(`
|
||||
SELECT pd.fullpath, pd.metadata
|
||||
FROM pathdata pd
|
||||
WHERE pd.fullpath %s $1:::BYTEA
|
||||
|
29
storage/cockroachkv/schema/migrate.go
Normal file
29
storage/cockroachkv/schema/migrate.go
Normal file
@ -0,0 +1,29 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package schema
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
// PrepareDB creates the pathdata tables if they don't already exist.
|
||||
func PrepareDB(db *sql.DB) (err error) {
|
||||
// Note: the buckets table is unused. It exists here to ease importing
|
||||
// backups from postgres. Similarly, the bucket column in pathdata is
|
||||
// also unused and exists to ease imports.
|
||||
_, err = db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS buckets (
|
||||
bucketname BYTES PRIMARY KEY,
|
||||
delim INT8 NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS pathdata (
|
||||
fullpath BYTEA PRIMARY KEY,
|
||||
metadata BYTEA NOT NULL,
|
||||
bucket BYTEA
|
||||
);
|
||||
`)
|
||||
return errs.Wrap(err)
|
||||
}
|
Loading…
Reference in New Issue
Block a user