2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-10-25 18:11:28 +01:00
|
|
|
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package postgreskv
|
|
|
|
|
|
|
|
import (
|
2019-06-05 15:23:10 +01:00
|
|
|
"context"
|
2018-10-25 18:11:28 +01:00
|
|
|
"database/sql"
|
|
|
|
|
2018-12-21 10:54:20 +00:00
|
|
|
"github.com/zeebo/errs"
|
|
|
|
|
2018-10-25 18:11:28 +01:00
|
|
|
"storj.io/storj/storage"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
alternateSQLSetup = `
|
|
|
|
CREATE OR REPLACE FUNCTION local_path (fullpath BYTEA, prefix BYTEA, delimiter BYTEA)
|
|
|
|
RETURNS BYTEA AS $$
|
|
|
|
DECLARE
|
|
|
|
relative BYTEA;
|
|
|
|
pos INTEGER;
|
|
|
|
BEGIN
|
|
|
|
relative := substring(fullpath FROM (octet_length(prefix)+1));
|
|
|
|
pos := position(delimiter IN relative);
|
|
|
|
IF pos = 0 THEN
|
|
|
|
RETURN relative;
|
|
|
|
END IF;
|
|
|
|
RETURN substring(relative FOR pos);
|
|
|
|
END;
|
|
|
|
$$ LANGUAGE 'plpgsql'
|
|
|
|
IMMUTABLE STRICT;
|
|
|
|
`
|
|
|
|
|
|
|
|
alternateSQLTeardown = `
|
|
|
|
DROP FUNCTION local_path(BYTEA, BYTEA, BYTEA);
|
|
|
|
`
|
|
|
|
|
|
|
|
alternateForwardQuery = `
|
|
|
|
SELECT DISTINCT
|
|
|
|
$2::BYTEA || x.localpath AS p,
|
|
|
|
first_value(x.metadata) OVER (PARTITION BY x.localpath ORDER BY x.fullpath) AS m
|
|
|
|
FROM (
|
|
|
|
SELECT
|
|
|
|
pd.fullpath,
|
|
|
|
local_path(pd.fullpath, $2::BYTEA, set_byte(' '::BYTEA, 0, b.delim)) AS localpath,
|
|
|
|
pd.metadata
|
|
|
|
FROM
|
|
|
|
pathdata pd,
|
|
|
|
buckets b
|
|
|
|
WHERE
|
|
|
|
b.bucketname = $1::BYTEA
|
|
|
|
AND pd.bucket = b.bucketname
|
|
|
|
AND pd.fullpath >= $2::BYTEA
|
|
|
|
AND ($2::BYTEA = ''::BYTEA OR pd.fullpath < bytea_increment($2::BYTEA))
|
|
|
|
AND pd.fullpath >= $3::BYTEA
|
|
|
|
) x
|
|
|
|
ORDER BY p
|
|
|
|
LIMIT $4
|
|
|
|
`
|
|
|
|
|
|
|
|
alternateReverseQuery = `
|
|
|
|
SELECT DISTINCT
|
|
|
|
$2::BYTEA || x.localpath AS p,
|
|
|
|
first_value(x.metadata) OVER (PARTITION BY x.localpath ORDER BY x.fullpath) AS m
|
|
|
|
FROM (
|
|
|
|
SELECT
|
|
|
|
pd.fullpath,
|
|
|
|
local_path(pd.fullpath, $2::BYTEA, set_byte(' '::BYTEA, 0, b.delim)) AS localpath,
|
|
|
|
pd.metadata
|
|
|
|
FROM
|
|
|
|
pathdata pd,
|
|
|
|
buckets b
|
|
|
|
WHERE
|
|
|
|
b.bucketname = $1::BYTEA
|
|
|
|
AND pd.bucket = b.bucketname
|
|
|
|
AND pd.fullpath >= $2::BYTEA
|
|
|
|
AND ($2::BYTEA = ''::BYTEA OR pd.fullpath < bytea_increment($2::BYTEA))
|
|
|
|
AND ($3::BYTEA = ''::BYTEA OR pd.fullpath <= $3::BYTEA)
|
|
|
|
) x
|
|
|
|
ORDER BY p DESC
|
|
|
|
LIMIT $4
|
|
|
|
`
|
|
|
|
)
|
|
|
|
|
|
|
|
// AlternateClient is the entrypoint into an alternate postgreskv data store
|
|
|
|
type AlternateClient struct {
|
|
|
|
*Client
|
|
|
|
}
|
|
|
|
|
|
|
|
// AltNew instantiates a new postgreskv AlternateClient given db URL
|
|
|
|
func AltNew(dbURL string) (*AlternateClient, error) {
|
|
|
|
client, err := New(dbURL)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
_, err = client.pgConn.Exec(alternateSQLSetup)
|
|
|
|
if err != nil {
|
2018-12-21 10:54:20 +00:00
|
|
|
return nil, errs.Combine(err, client.Close())
|
2018-10-25 18:11:28 +01:00
|
|
|
}
|
|
|
|
return &AlternateClient{Client: client}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes an AlternateClient and frees its resources.
|
|
|
|
func (altClient *AlternateClient) Close() error {
|
|
|
|
_, err := altClient.pgConn.Exec(alternateSQLTeardown)
|
2018-12-21 10:54:20 +00:00
|
|
|
return errs.Combine(err, altClient.Client.Close())
|
2018-10-25 18:11:28 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
type alternateOrderedPostgresIterator struct {
|
|
|
|
*orderedPostgresIterator
|
|
|
|
}
|
|
|
|
|
2019-06-05 15:23:10 +01:00
|
|
|
func (opi *alternateOrderedPostgresIterator) doNextQuery(ctx context.Context) (_ *sql.Rows, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-10-25 18:11:28 +01:00
|
|
|
if opi.opts.Recurse {
|
2019-06-05 15:23:10 +01:00
|
|
|
return opi.orderedPostgresIterator.doNextQuery(ctx)
|
2018-10-25 18:11:28 +01:00
|
|
|
}
|
|
|
|
start := opi.lastKeySeen
|
|
|
|
if start == nil {
|
|
|
|
start = opi.opts.First
|
|
|
|
}
|
|
|
|
var query string
|
|
|
|
if opi.opts.Reverse {
|
|
|
|
query = alternateReverseQuery
|
|
|
|
} else {
|
|
|
|
query = alternateForwardQuery
|
|
|
|
}
|
|
|
|
return opi.client.pgConn.Query(query, []byte(opi.bucket), []byte(opi.opts.Prefix), []byte(start), opi.batchSize+1)
|
|
|
|
}
|
|
|
|
|
2019-06-05 15:23:10 +01:00
|
|
|
func newAlternateOrderedPostgresIterator(ctx context.Context, altClient *AlternateClient, opts storage.IterateOptions, batchSize int) (_ *alternateOrderedPostgresIterator, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-10-25 18:11:28 +01:00
|
|
|
if opts.Prefix == nil {
|
|
|
|
opts.Prefix = storage.Key("")
|
|
|
|
}
|
|
|
|
if opts.First == nil {
|
|
|
|
opts.First = storage.Key("")
|
|
|
|
}
|
|
|
|
opi1 := &orderedPostgresIterator{
|
|
|
|
client: altClient.Client,
|
|
|
|
opts: &opts,
|
|
|
|
bucket: storage.Key(defaultBucket),
|
|
|
|
delimiter: byte('/'),
|
|
|
|
batchSize: batchSize,
|
|
|
|
curIndex: 0,
|
|
|
|
}
|
|
|
|
opi := &alternateOrderedPostgresIterator{orderedPostgresIterator: opi1}
|
|
|
|
opi.nextQuery = opi.doNextQuery
|
2019-06-05 15:23:10 +01:00
|
|
|
newRows, err := opi.nextQuery(ctx)
|
2018-10-25 18:11:28 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
opi.curRows = newRows
|
|
|
|
return opi, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Iterate iterates over items based on opts
|
2019-06-05 15:23:10 +01:00
|
|
|
func (altClient *AlternateClient) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
2019-06-05 18:22:46 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-06-05 15:23:10 +01:00
|
|
|
opi, err := newAlternateOrderedPostgresIterator(ctx, altClient, opts, defaultBatchSize)
|
2018-10-25 18:11:28 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer func() {
|
2018-12-21 10:54:20 +00:00
|
|
|
err = errs.Combine(err, opi.Close())
|
2018-10-25 18:11:28 +01:00
|
|
|
}()
|
|
|
|
|
2019-06-05 15:23:10 +01:00
|
|
|
return fn(ctx, opi)
|
2018-10-25 18:11:28 +01:00
|
|
|
}
|