diff --git a/storage/cockroachkv/client.go b/storage/cockroachkv/client.go new file mode 100644 index 000000000..443636058 --- /dev/null +++ b/storage/cockroachkv/client.go @@ -0,0 +1,299 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package cockroachkv + +import ( + "bytes" + "context" + "database/sql" + "net/url" + "path" + + "github.com/cockroachdb/cockroach-go/crdb" + "github.com/lib/pq" + "github.com/zeebo/errs" + monkit "gopkg.in/spacemonkeygo/monkit.v2" + + "storj.io/storj/private/dbutil" + "storj.io/storj/storage" +) + +const defaultBatchSize = 128 + +var ( + mon = monkit.Package() +) + +// Client is the entrypoint into a cockroachkv data store +type Client struct { + URL string + conn *sql.DB +} + +func modifyURL(dbURL string) (modifiedURL string, databaseName string, err error) { + u, err := url.Parse(dbURL) + if err != nil { + return "", "", err + } + u.Scheme = "postgres" + + q := u.Query() + if schema := q.Get("search_path"); schema != "" { + u.Path = path.Join(u.Path, schema) + q.Del("search_path") + u.RawQuery = q.Encode() + } + + return u.String(), u.Path[1:], nil +} + +// New instantiates a new postgreskv client given db URL +func New(dbURL string) (*Client, error) { + modifiedURL, _, err := modifyURL(dbURL) + if err != nil { + return nil, err + } + + conn, err := sql.Open("postgres", modifiedURL) + if err != nil { + return nil, err + } + + dbutil.Configure(conn, mon) + + // TODO: Need to bring this back but sourcing CockroachDB compatible schema. + // err = schema.PrepareDB(pgConn, dbURL) + // if err != nil { + // return nil, err + // } + return &Client{ + URL: modifiedURL, + conn: conn, + }, nil +} + +// Close closes the client +func (client *Client) Close() error { + return client.conn.Close() +} + +// TODO: Need to bring this back but sourcing CockroachDB compatible schema. +// DropSchema drops the schema. +// func (client *Client) DropSchema(schema string) error { +// return pgutil.DropSchema(client.pgConn, schema) +// } + +// Put sets the value for the provided key. +func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) { + defer mon.Task()(&ctx)(&err) + + if key.IsZero() { + return storage.ErrEmptyKey.New("") + } + + q := ` + INSERT INTO pathdata (fullpath, metadata) + VALUES ($1:::BYTEA, $2:::BYTEA) + ON CONFLICT (fullpath) DO UPDATE SET metadata = EXCLUDED.metadata + ` + + _, err = client.conn.Exec(q, []byte(key), []byte(value)) + return Error.Wrap(err) +} + +// Get looks up the provided key and returns its value (or an error). +func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) { + defer mon.Task()(&ctx)(&err) + + if key.IsZero() { + return nil, storage.ErrEmptyKey.New("") + } + + q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA" + row := client.conn.QueryRow(q, []byte(key)) + + var val []byte + err = row.Scan(&val) + if err == sql.ErrNoRows { + return nil, storage.ErrKeyNotFound.New("%q", key) + } + + return val, Error.Wrap(err) +} + +// GetAll finds all values for the provided keys (up to storage.LookupLimit). +// If more keys are provided than the maximum, an error will be returned. +func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) { + defer mon.Task()(&ctx)(&err) + + if len(keys) > storage.LookupLimit { + return nil, storage.ErrLimitExceeded + } + + q := ` + SELECT metadata + FROM pathdata pd + RIGHT JOIN + unnest($1:::BYTEA[]) WITH ORDINALITY pk(request, ord) + ON (pd.fullpath = pk.request) + ORDER BY pk.ord + ` + rows, err := client.conn.Query(q, pq.ByteaArray(keys.ByteSlices())) + if err != nil { + return nil, errs.Wrap(err) + } + defer func() { err = errs.Combine(err, Error.Wrap(rows.Close())) }() + + values := make([]storage.Value, 0, len(keys)) + for rows.Next() { + var value []byte + if err := rows.Scan(&value); err != nil { + return nil, Error.Wrap(err) + } + values = append(values, storage.Value(value)) + } + + return values, Error.Wrap(rows.Err()) +} + +// Delete deletes the given key and its associated value. +func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) { + defer mon.Task()(&ctx)(&err) + + if key.IsZero() { + return storage.ErrEmptyKey.New("") + } + + q := "DELETE FROM pathdata WHERE fullpath = $1:::BYTEA" + result, err := client.conn.Exec(q, []byte(key)) + if err != nil { + return err + } + + numRows, err := result.RowsAffected() + if err != nil { + return err + } + if numRows == 0 { + return storage.ErrKeyNotFound.New("%q", key) + } + return nil +} + +// List returns either a list of known keys, in order, or an error. +func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) { + defer mon.Task()(&ctx)(&err) + return storage.ListKeys(ctx, client, first, limit) +} + +// Iterate calls the callback with an iterator over the keys. +func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { + defer mon.Task()(&ctx)(&err) + opi, err := newOrderedCockroachIterator(ctx, client, opts, defaultBatchSize) + if err != nil { + return err + } + defer func() { + err = errs.Combine(err, opi.Close()) + }() + + return fn(ctx, opi) +} + +// CompareAndSwap atomically compares and swaps oldValue with newValue +func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldValue, newValue storage.Value) (err error) { + defer mon.Task()(&ctx)(&err) + + if key.IsZero() { + return storage.ErrEmptyKey.New("") + } + + if oldValue == nil && newValue == nil { + q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA" + row := client.conn.QueryRow(q, []byte(key)) + + var val []byte + err = row.Scan(&val) + if err == sql.ErrNoRows { + return nil + } + + if err != nil { + return Error.Wrap(err) + } + return storage.ErrValueChanged.New("%q", key) + } + + if oldValue == nil { + q := ` + INSERT INTO pathdata (fullpath, metadata) VALUES ($1:::BYTEA, $2:::BYTEA) + ON CONFLICT DO NOTHING + RETURNING 1 + ` + row := client.conn.QueryRow(q, []byte(key), []byte(newValue)) + + var val []byte + err = row.Scan(&val) + if err == sql.ErrNoRows { + return storage.ErrValueChanged.New("%q", key) + } + return Error.Wrap(err) + } + + return crdb.ExecuteTx(ctx, client.conn, nil, func(txn *sql.Tx) error { + q := "SELECT metadata FROM pathdata WHERE fullpath = $1:::BYTEA;" + row := txn.QueryRowContext(ctx, q, []byte(key)) + + var metadata []byte + err = row.Scan(&metadata) + if err == sql.ErrNoRows { + // Row not found for this fullpath. + // Potentially because another concurrent transaction changed the row. + return storage.ErrKeyNotFound.New("%q", key) + } + if err != nil { + return Error.Wrap(err) + } + + if equal := bytes.Compare(metadata, oldValue); equal != 0 { + // If the row is found but the metadata has been already changed + // we can't continue to delete it. + return storage.ErrValueChanged.New("%q", key) + } + + var res sql.Result + if newValue == nil { + q = ` + DELETE FROM pathdata + WHERE pathdata.fullpath = $1:::BYTEA + AND pathdata.metadata = $2:::BYTEA + ` + + res, err = txn.ExecContext(ctx, q, []byte(key), []byte(oldValue)) + } else { + q = ` + UPDATE pathdata + SET metadata = $3:::BYTEA + WHERE pathdata.fullpath = $1:::BYTEA + AND pathdata.metadata = $2:::BYTEA + ` + res, err = txn.ExecContext(ctx, q, []byte(key), []byte(oldValue), []byte(newValue)) + } + + if err != nil { + return Error.Wrap(err) + } + + affected, err := res.RowsAffected() + if err != nil { + return Error.Wrap(err) + } + + if affected != 1 { + return storage.ErrValueChanged.New("%q", key) + } + + return nil + }) +} diff --git a/storage/cockroachkv/client_test.go b/storage/cockroachkv/client_test.go new file mode 100644 index 000000000..4501682f5 --- /dev/null +++ b/storage/cockroachkv/client_test.go @@ -0,0 +1,36 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. +package cockroachkv + +import ( + "testing" + + _ "github.com/lib/pq" + + "storj.io/storj/private/dbutil/pgutil/pgtest" + "storj.io/storj/storage/testsuite" +) + +func newTestCockroachDB(t testing.TB) (store *Client, cleanup func()) { + if *pgtest.CrdbConnStr == "" { + t.Skipf("postgres flag missing, example:\n-cockroach-test-db=%s", pgtest.DefaultCrdbConnStr) + } + + crdb, err := New(*pgtest.CrdbConnStr) + if err != nil { + t.Fatalf("init: %+v", err) + } + + return crdb, func() { + if err := crdb.Close(); err != nil { + t.Fatalf("failed to close db: %v", err) + } + } +} + +func TestSuite(t *testing.T) { + store, cleanup := newTestCockroachDB(t) + defer cleanup() + + testsuite.RunTests(t, store) +} diff --git a/storage/cockroachkv/common.go b/storage/cockroachkv/common.go new file mode 100644 index 000000000..4776ebf2c --- /dev/null +++ b/storage/cockroachkv/common.go @@ -0,0 +1,11 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package cockroachkv + +import ( + "github.com/zeebo/errs" +) + +// Error is the default postgreskv errs class +var Error = errs.Class("cockroachkv error") diff --git a/storage/cockroachkv/ordered_iterator.go b/storage/cockroachkv/ordered_iterator.go new file mode 100644 index 000000000..f5d38aeec --- /dev/null +++ b/storage/cockroachkv/ordered_iterator.go @@ -0,0 +1,147 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package cockroachkv + +import ( + "bytes" + "context" + "database/sql" + "fmt" + + "github.com/zeebo/errs" + + "storj.io/storj/storage" +) + +type orderedCockroachIterator struct { + client *Client + opts *storage.IterateOptions + delimiter byte + batchSize int + curIndex int + curRows *sql.Rows + skipPrefix bool + lastKeySeen storage.Key + largestKey storage.Key + errEncountered error +} + +func newOrderedCockroachIterator(ctx context.Context, cli *Client, opts storage.IterateOptions, batchSize int) (_ *orderedCockroachIterator, err error) { + defer mon.Task()(&ctx)(&err) + if opts.Prefix == nil { + opts.Prefix = storage.Key("") + } + if opts.First == nil { + opts.First = storage.Key("") + } + if opts.First.Less(opts.Prefix) { + opts.First = opts.Prefix + } + + opi := &orderedCockroachIterator{ + client: cli, + opts: &opts, + delimiter: byte('/'), + batchSize: batchSize, + curIndex: 0, + } + + if len(opts.Prefix) > 0 { + opi.largestKey = storage.AfterPrefix(opts.Prefix) + } + + newRows, err := opi.doNextQuery(ctx) + if err != nil { + return nil, err + } + opi.curRows = newRows + + return opi, nil +} + +func (opi *orderedCockroachIterator) Close() error { + return errs.Combine(opi.errEncountered, opi.curRows.Close()) +} + +// Next fills in info for the next item in an ongoing listing. +func (opi *orderedCockroachIterator) Next(ctx context.Context, item *storage.ListItem) bool { + defer mon.Task()(&ctx)(nil) + + for { + for !opi.curRows.Next() { + if err := opi.curRows.Err(); err != nil && err != sql.ErrNoRows { + opi.errEncountered = errs.Wrap(err) + return false + } + if err := opi.curRows.Close(); err != nil { + opi.errEncountered = errs.Wrap(err) + return false + } + if opi.curIndex < opi.batchSize { + return false + } + newRows, err := opi.doNextQuery(ctx) + if err != nil { + opi.errEncountered = errs.Wrap(err) + return false + } + opi.curRows = newRows + opi.curIndex = 0 + } + + var k, v []byte + err := opi.curRows.Scan(&k, &v) + if err != nil { + opi.errEncountered = errs.Wrap(err) + return false + } + opi.curIndex++ + + if !bytes.HasPrefix(k, []byte(opi.opts.Prefix)) { + return false + } + + item.Key = storage.Key(k) + item.Value = storage.Value(v) + item.IsPrefix = false + + if !opi.opts.Recurse { + if idx := bytes.IndexByte(item.Key[len(opi.opts.Prefix):], opi.delimiter); idx >= 0 { + item.Key = item.Key[:len(opi.opts.Prefix)+idx+1] + item.Value = nil + item.IsPrefix = true + } + } + if opi.lastKeySeen.Equal(item.Key) { + continue + } + + opi.skipPrefix = item.IsPrefix + opi.lastKeySeen = item.Key + return true + } +} + +func (opi *orderedCockroachIterator) doNextQuery(ctx context.Context) (_ *sql.Rows, err error) { + defer mon.Task()(&ctx)(&err) + + gt := ">" + start := opi.lastKeySeen + + if len(start) == 0 { + start = opi.opts.First + gt = ">=" + } else if opi.skipPrefix { + start = storage.AfterPrefix(start) + gt = ">=" + } + + return opi.client.conn.Query(fmt.Sprintf(` + SELECT pd.fullpath, pd.metadata + FROM pathdata pd + WHERE pd.fullpath %s $1:::BYTEA + AND ($2:::BYTEA = '':::BYTEA OR pd.fullpath < $2:::BYTEA) + LIMIT $3 + `, gt), start, []byte(opi.largestKey), opi.batchSize) +}