storage/postgreskv: use transactional helper
We may never need this code to work with CockroachDB, but I'm on a mission to avoid problematic uses of Begin() and BeginTx(), and anywhere they appear is a possible place for someone to copy-and-paste and do something wrong. dbutil.WithTx makes this code a little bit simpler too, so it seems worthwhile. Change-Id: I9b4ab484db4590cad5ab07de515bbf5d9708daed
This commit is contained in:
parent
f3aee1b758
commit
0135852a0e
@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
"storj.io/storj/private/dbutil/pgutil/pgtest"
|
"storj.io/storj/private/dbutil/pgutil/pgtest"
|
||||||
|
"storj.io/storj/private/dbutil/txutil"
|
||||||
"storj.io/storj/storage"
|
"storj.io/storj/storage"
|
||||||
"storj.io/storj/storage/testsuite"
|
"storj.io/storj/storage/testsuite"
|
||||||
)
|
)
|
||||||
@ -90,40 +91,30 @@ func BenchmarkSuite(b *testing.B) {
|
|||||||
testsuite.RunBenchmarks(b, store)
|
testsuite.RunBenchmarks(b, store)
|
||||||
}
|
}
|
||||||
|
|
||||||
func bulkImport(db *sql.DB, iter storage.Iterator) (err error) {
|
func bulkImport(db *sql.DB, iter storage.Iterator) error {
|
||||||
txn, err2 := db.Begin()
|
return txutil.WithTx(ctx, db, nil, func(ctx context.Context, txn *sql.Tx) (err error) {
|
||||||
if err2 != nil {
|
stmt, err := txn.Prepare(pq.CopyIn("pathdata", "bucket", "fullpath", "metadata"))
|
||||||
return errs.New("Failed to start transaction: %v", err2)
|
if err != nil {
|
||||||
}
|
return errs.New("Failed to initialize COPY FROM: %v", err)
|
||||||
defer func() {
|
|
||||||
if err == nil {
|
|
||||||
err = errs.Combine(err, txn.Commit())
|
|
||||||
} else {
|
|
||||||
err = errs.Combine(err, txn.Rollback())
|
|
||||||
}
|
}
|
||||||
}()
|
defer func() {
|
||||||
|
err2 := stmt.Close()
|
||||||
|
if err2 != nil {
|
||||||
|
err = errs.Combine(err, errs.New("Failed to close COPY FROM statement: %v", err2))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
stmt, err2 := txn.Prepare(pq.CopyIn("pathdata", "bucket", "fullpath", "metadata"))
|
var item storage.ListItem
|
||||||
if err2 != nil {
|
for iter.Next(ctx, &item) {
|
||||||
return errs.New("Failed to initialize COPY FROM: %v", err)
|
if _, err := stmt.Exec([]byte(""), []byte(item.Key), []byte(item.Value)); err != nil {
|
||||||
}
|
return err
|
||||||
defer func() {
|
}
|
||||||
err2 := stmt.Close()
|
|
||||||
if err2 != nil {
|
|
||||||
err = errs.Combine(err, errs.New("Failed to close COPY FROM statement: %v", err2))
|
|
||||||
}
|
}
|
||||||
}()
|
if _, err = stmt.Exec(); err != nil {
|
||||||
|
return errs.New("Failed to complete COPY FROM: %v", err)
|
||||||
var item storage.ListItem
|
|
||||||
for iter.Next(ctx, &item) {
|
|
||||||
if _, err := stmt.Exec([]byte(""), []byte(item.Key), []byte(item.Value)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
if _, err = stmt.Exec(); err != nil {
|
})
|
||||||
return errs.New("Failed to complete COPY FROM: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func bulkDeleteAll(db *sql.DB) error {
|
func bulkDeleteAll(db *sql.DB) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user