d5963d81d0
This is based on Jeff's most excellent work to identify why non-recursive listing under postgreskv was phenomenally slow. It turns out PostgreSQL's query planner was actually using two sequential scans of the pathdata table to do its job. It's unclear for how long that has been happening, but obviously it won't scale any further. The main change is propagating bucket association with pathnames through the CTE so that the query planner lets itself use the pathdata index on (bucket, fullpath) for the skipping-forward part. Jeff also had some changes to the range ends to keep NULL from being used- I believe with the intent of making sure the query planner was able to use the pathdata index. My tests on postgres 9.6 and 11 indicate that those changes don't make any appreciable difference in performance or query plan, so I'm going to leave them off for now to avoid a careful audit of the semantic differences. There is a test included here, which only serves to check that the new version of the function is indeed active. To actually ensure that no sequential scans are being used in the query plan anymore, our tests would need to be run against a test db with lots of data already loaded in it, and that isn't feasible for now. Change-Id: Iffe9a1f411c54a2f742a4abb8f2df0c64fd662cb
155 lines
3.5 KiB
Go
155 lines
3.5 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package postgreskv
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/lib/pq"
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/storj/internal/dbutil/pgutil/pgtest"
|
|
"storj.io/storj/storage"
|
|
"storj.io/storj/storage/testsuite"
|
|
)
|
|
|
|
var ctx = context.Background() // test context
|
|
|
|
func newTestPostgres(t testing.TB) (store *Client, cleanup func()) {
|
|
if *pgtest.ConnStr == "" {
|
|
t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", pgtest.DefaultConnStr)
|
|
}
|
|
|
|
pgdb, err := New(*pgtest.ConnStr)
|
|
if err != nil {
|
|
t.Fatalf("init: %v", err)
|
|
}
|
|
|
|
return pgdb, func() {
|
|
if err := pgdb.Close(); err != nil {
|
|
t.Fatalf("failed to close db: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSuite(t *testing.T) {
|
|
store, cleanup := newTestPostgres(t)
|
|
defer cleanup()
|
|
|
|
// zap := zaptest.NewLogger(t)
|
|
// loggedStore := storelogger.New(zap, store)
|
|
testsuite.RunTests(t, store)
|
|
}
|
|
|
|
func TestThatMigrationActuallyHappened(t *testing.T) {
|
|
store, cleanup := newTestPostgres(t)
|
|
defer cleanup()
|
|
|
|
rows, err := store.pgConn.Query(`
|
|
SELECT prosrc
|
|
FROM pg_catalog.pg_proc p,
|
|
pg_catalog.pg_namespace n
|
|
WHERE p.pronamespace = n.oid
|
|
AND p.proname = 'list_directory'
|
|
AND n.nspname = ANY(current_schemas(true))
|
|
AND p.pronargs = 4
|
|
`)
|
|
if err != nil {
|
|
t.Fatalf("failed to get list_directory source: %v", err)
|
|
}
|
|
defer func() {
|
|
if err := rows.Close(); err != nil {
|
|
t.Fatalf("failed to close rows: %v", err)
|
|
}
|
|
}()
|
|
|
|
numFound := 0
|
|
for rows.Next() {
|
|
numFound++
|
|
if numFound > 1 {
|
|
t.Fatal("there are multiple eligible list_directory() functions??")
|
|
}
|
|
var source string
|
|
if err := rows.Scan(&source); err != nil {
|
|
t.Fatalf("failed to read list_directory source: %v", err)
|
|
}
|
|
if strings.Contains(source, "distinct_prefix (truncatedpath)") {
|
|
t.Fatal("list_directory() function in pg appears to be the oldnbusted one")
|
|
}
|
|
}
|
|
}
|
|
|
|
func BenchmarkSuite(b *testing.B) {
|
|
store, cleanup := newTestPostgres(b)
|
|
defer cleanup()
|
|
|
|
testsuite.RunBenchmarks(b, store)
|
|
}
|
|
|
|
func bulkImport(db *sql.DB, iter storage.Iterator) (err error) {
|
|
txn, err2 := db.Begin()
|
|
if err2 != nil {
|
|
return errs.New("Failed to start transaction: %v", err2)
|
|
}
|
|
defer func() {
|
|
if err == nil {
|
|
err = errs.Combine(err, txn.Commit())
|
|
} else {
|
|
err = errs.Combine(err, txn.Rollback())
|
|
}
|
|
}()
|
|
|
|
stmt, err2 := txn.Prepare(pq.CopyIn("pathdata", "bucket", "fullpath", "metadata"))
|
|
if err2 != nil {
|
|
return errs.New("Failed to initialize COPY FROM: %v", err)
|
|
}
|
|
defer func() {
|
|
err2 := stmt.Close()
|
|
if err2 != nil {
|
|
err = errs.Combine(err, errs.New("Failed to close COPY FROM statement: %v", err2))
|
|
}
|
|
}()
|
|
|
|
var item storage.ListItem
|
|
for iter.Next(ctx, &item) {
|
|
if _, err := stmt.Exec([]byte(""), []byte(item.Key), []byte(item.Value)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if _, err = stmt.Exec(); err != nil {
|
|
return errs.New("Failed to complete COPY FROM: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func bulkDelete(db *sql.DB) error {
|
|
_, err := db.Exec("TRUNCATE pathdata")
|
|
if err != nil {
|
|
return errs.New("Failed to TRUNCATE pathdata table: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type pgLongBenchmarkStore struct {
|
|
*Client
|
|
}
|
|
|
|
func (store *pgLongBenchmarkStore) BulkImport(iter storage.Iterator) error {
|
|
return bulkImport(store.pgConn, iter)
|
|
}
|
|
|
|
func (store *pgLongBenchmarkStore) BulkDelete() error {
|
|
return bulkDelete(store.pgConn)
|
|
}
|
|
|
|
func BenchmarkSuiteLong(b *testing.B) {
|
|
store, cleanup := newTestPostgres(b)
|
|
defer cleanup()
|
|
|
|
testsuite.BenchmarkPathOperationsInLargeDb(b, &pgLongBenchmarkStore{store})
|
|
}
|