storj/storage/postgreskv/alternateclient.go
paul cannon 0c025fa937 storage/: remove reverse-key-listing feature
We don't use reverse listing in any of our code, outside of tests, and
it is only exposed through libuplink in the
lib/uplink.(*Project).ListBuckets() API. We also don't know of any users
who might have a need for reverse listing through ListBuckets().

Since one of our prospective pointerdb backends can not support
backwards iteration, and because of the above considerations, we are
going to remove the reverse listing feature.

Change-Id: I8d2a1f33d01ee70b79918d584b8c671f57eef2a0
2019-11-12 18:47:51 +00:00

142 lines
3.5 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package postgreskv
import (
"context"
"database/sql"
"github.com/zeebo/errs"
"storj.io/storj/storage"
)
const (
alternateSQLSetup = `
CREATE OR REPLACE FUNCTION local_path (fullpath BYTEA, prefix BYTEA, delimiter BYTEA)
RETURNS BYTEA AS $$
DECLARE
relative BYTEA;
pos INTEGER;
BEGIN
relative := substring(fullpath FROM (octet_length(prefix)+1));
pos := position(delimiter IN relative);
IF pos = 0 THEN
RETURN relative;
END IF;
RETURN substring(relative FOR pos);
END;
$$ LANGUAGE 'plpgsql'
IMMUTABLE STRICT;
`
alternateSQLTeardown = `
DROP FUNCTION local_path(BYTEA, BYTEA, BYTEA);
`
alternateForwardQuery = `
SELECT DISTINCT
$2::BYTEA || x.localpath AS p,
first_value(x.metadata) OVER (PARTITION BY x.localpath ORDER BY x.fullpath) AS m
FROM (
SELECT
pd.fullpath,
local_path(pd.fullpath, $2::BYTEA, set_byte(' '::BYTEA, 0, b.delim)) AS localpath,
pd.metadata
FROM
pathdata pd,
buckets b
WHERE
b.bucketname = $1::BYTEA
AND pd.bucket = b.bucketname
AND pd.fullpath >= $2::BYTEA
AND ($2::BYTEA = ''::BYTEA OR pd.fullpath < bytea_increment($2::BYTEA))
AND pd.fullpath >= $3::BYTEA
) x
ORDER BY p
LIMIT $4
`
)
// AlternateClient is the entrypoint into an alternate postgreskv data store
type AlternateClient struct {
*Client
}
// AltNew instantiates a new postgreskv AlternateClient given db URL
func AltNew(dbURL string) (*AlternateClient, error) {
client, err := New(dbURL)
if err != nil {
return nil, err
}
_, err = client.pgConn.Exec(alternateSQLSetup)
if err != nil {
return nil, errs.Combine(err, client.Close())
}
return &AlternateClient{Client: client}, nil
}
// Close closes an AlternateClient and frees its resources.
func (altClient *AlternateClient) Close() error {
_, err := altClient.pgConn.Exec(alternateSQLTeardown)
return errs.Combine(err, altClient.Client.Close())
}
type alternateOrderedPostgresIterator struct {
*orderedPostgresIterator
}
func (opi *alternateOrderedPostgresIterator) doNextQuery(ctx context.Context) (_ *sql.Rows, err error) {
defer mon.Task()(&ctx)(&err)
if opi.opts.Recurse {
return opi.orderedPostgresIterator.doNextQuery(ctx)
}
start := opi.lastKeySeen
if start == nil {
start = opi.opts.First
}
return opi.client.pgConn.Query(alternateForwardQuery, []byte(opi.bucket), []byte(opi.opts.Prefix), []byte(start), opi.batchSize+1)
}
func newAlternateOrderedPostgresIterator(ctx context.Context, altClient *AlternateClient, opts storage.IterateOptions, batchSize int) (_ *alternateOrderedPostgresIterator, err error) {
defer mon.Task()(&ctx)(&err)
if opts.Prefix == nil {
opts.Prefix = storage.Key("")
}
if opts.First == nil {
opts.First = storage.Key("")
}
opi1 := &orderedPostgresIterator{
client: altClient.Client,
opts: &opts,
bucket: storage.Key(defaultBucket),
delimiter: byte('/'),
batchSize: batchSize,
curIndex: 0,
}
opi := &alternateOrderedPostgresIterator{orderedPostgresIterator: opi1}
opi.nextQuery = opi.doNextQuery
newRows, err := opi.nextQuery(ctx)
if err != nil {
return nil, err
}
opi.curRows = newRows
return opi, nil
}
// Iterate iterates over items based on opts
func (altClient *AlternateClient) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
defer mon.Task()(&ctx)(&err)
opi, err := newAlternateOrderedPostgresIterator(ctx, altClient, opts, defaultBatchSize)
if err != nil {
return err
}
defer func() {
err = errs.Combine(err, opi.Close())
}()
return fn(ctx, opi)
}