// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package postgreskv import ( "context" "database/sql" "github.com/zeebo/errs" "storj.io/storj/storage" ) const ( alternateSQLSetup = ` CREATE OR REPLACE FUNCTION local_path (fullpath BYTEA, prefix BYTEA, delimiter BYTEA) RETURNS BYTEA AS $$ DECLARE relative BYTEA; pos INTEGER; BEGIN relative := substring(fullpath FROM (octet_length(prefix)+1)); pos := position(delimiter IN relative); IF pos = 0 THEN RETURN relative; END IF; RETURN substring(relative FOR pos); END; $$ LANGUAGE 'plpgsql' IMMUTABLE STRICT; ` alternateSQLTeardown = ` DROP FUNCTION local_path(BYTEA, BYTEA, BYTEA); ` alternateForwardQuery = ` SELECT DISTINCT $2::BYTEA || x.localpath AS p, first_value(x.metadata) OVER (PARTITION BY x.localpath ORDER BY x.fullpath) AS m FROM ( SELECT pd.fullpath, local_path(pd.fullpath, $2::BYTEA, set_byte(' '::BYTEA, 0, b.delim)) AS localpath, pd.metadata FROM pathdata pd, buckets b WHERE b.bucketname = $1::BYTEA AND pd.bucket = b.bucketname AND pd.fullpath >= $2::BYTEA AND ($2::BYTEA = ''::BYTEA OR pd.fullpath < bytea_increment($2::BYTEA)) AND pd.fullpath >= $3::BYTEA ) x ORDER BY p LIMIT $4 ` ) // AlternateClient is the entrypoint into an alternate postgreskv data store type AlternateClient struct { *Client } // AltNew instantiates a new postgreskv AlternateClient given db URL func AltNew(dbURL string) (*AlternateClient, error) { client, err := New(dbURL) if err != nil { return nil, err } _, err = client.pgConn.Exec(alternateSQLSetup) if err != nil { return nil, errs.Combine(err, client.Close()) } return &AlternateClient{Client: client}, nil } // Close closes an AlternateClient and frees its resources. func (altClient *AlternateClient) Close() error { _, err := altClient.pgConn.Exec(alternateSQLTeardown) return errs.Combine(err, altClient.Client.Close()) } type alternateOrderedPostgresIterator struct { *orderedPostgresIterator } func (opi *alternateOrderedPostgresIterator) doNextQuery(ctx context.Context) (_ *sql.Rows, err error) { defer mon.Task()(&ctx)(&err) if opi.opts.Recurse { return opi.orderedPostgresIterator.doNextQuery(ctx) } start := opi.lastKeySeen if start == nil { start = opi.opts.First } return opi.client.pgConn.Query(alternateForwardQuery, []byte(opi.bucket), []byte(opi.opts.Prefix), []byte(start), opi.batchSize+1) } func newAlternateOrderedPostgresIterator(ctx context.Context, altClient *AlternateClient, opts storage.IterateOptions, batchSize int) (_ *alternateOrderedPostgresIterator, err error) { defer mon.Task()(&ctx)(&err) if opts.Prefix == nil { opts.Prefix = storage.Key("") } if opts.First == nil { opts.First = storage.Key("") } opi1 := &orderedPostgresIterator{ client: altClient.Client, opts: &opts, bucket: storage.Key(defaultBucket), delimiter: byte('/'), batchSize: batchSize, curIndex: 0, } opi := &alternateOrderedPostgresIterator{orderedPostgresIterator: opi1} opi.nextQuery = opi.doNextQuery newRows, err := opi.nextQuery(ctx) if err != nil { return nil, err } opi.curRows = newRows return opi, nil } // Iterate iterates over items based on opts func (altClient *AlternateClient) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) opi, err := newAlternateOrderedPostgresIterator(ctx, altClient, opts, defaultBatchSize) if err != nil { return err } defer func() { err = errs.Combine(err, opi.Close()) }() return fn(ctx, opi) }