better database error handling (#784)
* better database error handling Change-Id: I28dbd69cf6c2fa268e02405521ff6e6c1a68a702 * missing comments added * missing comment added
This commit is contained in:
parent
023eb95a9b
commit
1c96db01ba
@ -4,17 +4,20 @@
|
|||||||
package accounting
|
package accounting
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
"storj.io/storj/internal/migrate"
|
"storj.io/storj/internal/migrate"
|
||||||
dbx "storj.io/storj/pkg/accounting/dbx"
|
dbx "storj.io/storj/pkg/accounting/dbx"
|
||||||
"storj.io/storj/pkg/utils"
|
"storj.io/storj/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LastBandwidthTally is a name in the accounting timestamps database
|
var (
|
||||||
var LastBandwidthTally dbx.Timestamps_Name_Field
|
// Error is the default accountingdb errs class
|
||||||
|
Error = errs.Class("accountingdb")
|
||||||
|
|
||||||
func init() {
|
// LastBandwidthTally is a name in the accounting timestamps database
|
||||||
LastBandwidthTally = dbx.Timestamps_Name("LastBandwidthTally")
|
LastBandwidthTally = dbx.Timestamps_Name("LastBandwidthTally")
|
||||||
}
|
)
|
||||||
|
|
||||||
// NewDb - constructor for DB
|
// NewDb - constructor for DB
|
||||||
func NewDb(databaseURL string) (*dbx.DB, error) {
|
func NewDb(databaseURL string) (*dbx.DB, error) {
|
||||||
@ -24,7 +27,8 @@ func NewDb(databaseURL string) (*dbx.DB, error) {
|
|||||||
}
|
}
|
||||||
db, err := dbx.Open(dbURL.Scheme, dbURL.Path)
|
db, err := dbx.Open(dbURL.Scheme, dbURL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.New("failed opening database %q, %q: %v",
|
||||||
|
dbURL.Scheme, dbURL.Path, err)
|
||||||
}
|
}
|
||||||
err = migrate.Create("accounting", db)
|
err = migrate.Create("accounting", db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -5,3 +5,13 @@ package dbx
|
|||||||
|
|
||||||
// go:generate dbx.v1 schema -d postgres -d sqlite3 accounting.dbx .
|
// go:generate dbx.v1 schema -d postgres -d sqlite3 accounting.dbx .
|
||||||
// go:generate dbx.v1 golang -d postgres -d sqlite3 -p dbx accounting.dbx .
|
// go:generate dbx.v1 golang -d postgres -d sqlite3 -p dbx accounting.dbx .
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// catch dbx errors
|
||||||
|
c := errs.Class("accountingdb")
|
||||||
|
WrapErr = func(e *Error) error { return c.Wrap(e) }
|
||||||
|
}
|
||||||
|
@ -5,3 +5,13 @@ package irreparabledb
|
|||||||
|
|
||||||
//go:generate dbx.v1 schema -d postgres -d sqlite3 irreparabledb.dbx .
|
//go:generate dbx.v1 schema -d postgres -d sqlite3 irreparabledb.dbx .
|
||||||
//go:generate dbx.v1 golang -d postgres -d sqlite3 irreparabledb.dbx .
|
//go:generate dbx.v1 golang -d postgres -d sqlite3 irreparabledb.dbx .
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// catch dbx errors
|
||||||
|
c := errs.Class("irreparabledb")
|
||||||
|
WrapErr = func(e *Error) error { return c.Wrap(e) }
|
||||||
|
}
|
||||||
|
@ -39,7 +39,8 @@ func New(source string) (*Database, error) {
|
|||||||
|
|
||||||
db, err := dbx.Open(u.Scheme, u.Path)
|
db, err := dbx.Open(u.Scheme, u.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.New("failed opening database %q, %q: %v",
|
||||||
|
u.Scheme, u.Path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = migrate.Create("irreparabledb", db)
|
err = migrate.Create("irreparabledb", db)
|
||||||
|
@ -18,7 +18,7 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"storj.io/storj/internal/identity"
|
testidentity "storj.io/storj/internal/identity"
|
||||||
"storj.io/storj/internal/teststorj"
|
"storj.io/storj/internal/teststorj"
|
||||||
"storj.io/storj/pkg/node"
|
"storj.io/storj/pkg/node"
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
|
@ -18,7 +18,7 @@ import (
|
|||||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||||
"storj.io/storj/pkg/provider"
|
"storj.io/storj/pkg/provider"
|
||||||
"storj.io/storj/pkg/storage/buckets"
|
"storj.io/storj/pkg/storage/buckets"
|
||||||
"storj.io/storj/pkg/storage/ec"
|
ecclient "storj.io/storj/pkg/storage/ec"
|
||||||
"storj.io/storj/pkg/storage/segments"
|
"storj.io/storj/pkg/storage/segments"
|
||||||
"storj.io/storj/pkg/storage/streams"
|
"storj.io/storj/pkg/storage/streams"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
|
@ -16,17 +16,21 @@ import (
|
|||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
_ "github.com/mattn/go-sqlite3" // register sqlite to sql
|
_ "github.com/mattn/go-sqlite3" // register sqlite to sql
|
||||||
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||||
|
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/piecestore"
|
pstore "storj.io/storj/pkg/piecestore"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
"storj.io/storj/pkg/utils"
|
"storj.io/storj/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mon = monkit.Package()
|
mon = monkit.Package()
|
||||||
|
// Error is the default psdb errs class
|
||||||
|
Error = errs.Class("psdb")
|
||||||
|
|
||||||
defaultCheckInterval = flag.Duration("piecestore.ttl.check_interval", time.Hour, "number of seconds to sleep between ttl checks")
|
defaultCheckInterval = flag.Duration("piecestore.ttl.check_interval", time.Hour, "number of seconds to sleep between ttl checks")
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -54,7 +58,7 @@ func Open(ctx context.Context, dataPath, DBPath string) (db *DB, err error) {
|
|||||||
|
|
||||||
sqlite, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&mutex=full", DBPath))
|
sqlite, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&mutex=full", DBPath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
db = &DB{
|
db = &DB{
|
||||||
DB: sqlite,
|
DB: sqlite,
|
||||||
|
@ -4,11 +4,18 @@
|
|||||||
package satellitedb
|
package satellitedb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
"storj.io/storj/internal/migrate"
|
"storj.io/storj/internal/migrate"
|
||||||
"storj.io/storj/pkg/satellite"
|
"storj.io/storj/pkg/satellite"
|
||||||
"storj.io/storj/pkg/satellite/satellitedb/dbx"
|
"storj.io/storj/pkg/satellite/satellitedb/dbx"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Error is the default satellitedb errs class
|
||||||
|
Error = errs.Class("satellitedb")
|
||||||
|
)
|
||||||
|
|
||||||
// Database contains access to different satellite databases
|
// Database contains access to different satellite databases
|
||||||
type Database struct {
|
type Database struct {
|
||||||
db *dbx.DB
|
db *dbx.DB
|
||||||
@ -17,9 +24,9 @@ type Database struct {
|
|||||||
// New - constructor for DB
|
// New - constructor for DB
|
||||||
func New(driver, source string) (satellite.DB, error) {
|
func New(driver, source string) (satellite.DB, error) {
|
||||||
db, err := dbx.Open(driver, source)
|
db, err := dbx.Open(driver, source)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.New("failed opening database %q, %q: %v",
|
||||||
|
driver, source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
database := &Database{
|
database := &Database{
|
||||||
|
@ -5,3 +5,13 @@ package dbx
|
|||||||
|
|
||||||
//go:generate dbx.v1 golang -d sqlite3 -p dbx satellitedb.dbx .
|
//go:generate dbx.v1 golang -d sqlite3 -p dbx satellitedb.dbx .
|
||||||
//go:generate dbx.v1 schema -d sqlite3 satellitedb.dbx .
|
//go:generate dbx.v1 schema -d sqlite3 satellitedb.dbx .
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// catch dbx errors
|
||||||
|
c := errs.Class("satellitedb")
|
||||||
|
WrapErr = func(e *Error) error { return c.Wrap(e) }
|
||||||
|
}
|
||||||
|
@ -5,3 +5,13 @@ package statdb
|
|||||||
|
|
||||||
// go:generate dbx.v1 schema -d postgres -d sqlite3 statdb.dbx .
|
// go:generate dbx.v1 schema -d postgres -d sqlite3 statdb.dbx .
|
||||||
// go:generate dbx.v1 golang -d postgres -d sqlite3 statdb.dbx .
|
// go:generate dbx.v1 golang -d postgres -d sqlite3 statdb.dbx .
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// catch dbx errors
|
||||||
|
c := errs.Class("statdb")
|
||||||
|
WrapErr = func(e *Error) error { return c.Wrap(e) }
|
||||||
|
}
|
||||||
|
@ -36,7 +36,8 @@ type StatDB struct {
|
|||||||
func NewStatDB(driver, source string, log *zap.Logger) (*StatDB, error) {
|
func NewStatDB(driver, source string, log *zap.Logger) (*StatDB, error) {
|
||||||
db, err := dbx.Open(driver, source)
|
db, err := dbx.Open(driver, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.New("failed opening database %q, %q: %v",
|
||||||
|
driver, source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = migrate.Create("statdb", db)
|
err = migrate.Create("statdb", db)
|
||||||
|
@ -4,12 +4,19 @@
|
|||||||
package satellitedb
|
package satellitedb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
|
||||||
"storj.io/storj/internal/migrate"
|
"storj.io/storj/internal/migrate"
|
||||||
"storj.io/storj/pkg/bwagreement"
|
"storj.io/storj/pkg/bwagreement"
|
||||||
"storj.io/storj/pkg/utils"
|
"storj.io/storj/pkg/utils"
|
||||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Error is the default satellitedb errs class
|
||||||
|
Error = errs.Class("satellitedb")
|
||||||
|
)
|
||||||
|
|
||||||
// DB contains access to different database tables
|
// DB contains access to different database tables
|
||||||
type DB struct {
|
type DB struct {
|
||||||
db *dbx.DB
|
db *dbx.DB
|
||||||
@ -27,7 +34,8 @@ func NewDB(databaseURL string) (*DB, error) {
|
|||||||
}
|
}
|
||||||
db, err := dbx.Open(dbURL.Scheme, source)
|
db, err := dbx.Open(dbURL.Scheme, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.New("failed opening database %q, %q: %v",
|
||||||
|
dbURL.Scheme, source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &DB{db: db}, nil
|
return &DB{db: db}, nil
|
||||||
|
@ -5,3 +5,13 @@ package satellitedb
|
|||||||
|
|
||||||
//go:generate dbx.v1 golang -d postgres -d sqlite3 satellitedb.dbx .
|
//go:generate dbx.v1 golang -d postgres -d sqlite3 satellitedb.dbx .
|
||||||
//go:generate dbx.v1 schema -d postgres -d sqlite3 satellitedb.dbx .
|
//go:generate dbx.v1 schema -d postgres -d sqlite3 satellitedb.dbx .
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// catch dbx errors
|
||||||
|
c := errs.Class("satellitedb")
|
||||||
|
WrapErr = func(e *Error) error { return c.Wrap(e) }
|
||||||
|
}
|
||||||
|
@ -33,16 +33,15 @@ const (
|
|||||||
func New(path, bucket string) (*Client, error) {
|
func New(path, bucket string) (*Client, error) {
|
||||||
db, err := bolt.Open(path, fileMode, &bolt.Options{Timeout: defaultTimeout})
|
db, err := bolt.Open(path, fileMode, &bolt.Options{Timeout: defaultTimeout})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.Update(func(tx *bolt.Tx) error {
|
err = Error.Wrap(db.Update(func(tx *bolt.Tx) error {
|
||||||
_, err = tx.CreateBucketIfNotExists([]byte(bucket))
|
_, err = tx.CreateBucketIfNotExists([]byte(bucket))
|
||||||
return err
|
return err
|
||||||
})
|
}))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if closeErr := db.Close(); closeErr != nil {
|
if closeErr := Error.Wrap(db.Close()); closeErr != nil {
|
||||||
return nil, utils.CombineErrors(err, closeErr)
|
return nil, utils.CombineErrors(err, closeErr)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -63,10 +62,10 @@ func New(path, bucket string) (*Client, error) {
|
|||||||
func NewShared(path string, buckets ...string) ([]*Client, error) {
|
func NewShared(path string, buckets ...string) ([]*Client, error) {
|
||||||
db, err := bolt.Open(path, fileMode, &bolt.Options{Timeout: defaultTimeout})
|
db, err := bolt.Open(path, fileMode, &bolt.Options{Timeout: defaultTimeout})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.Update(func(tx *bolt.Tx) error {
|
err = Error.Wrap(db.Update(func(tx *bolt.Tx) error {
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
_, err := tx.CreateBucketIfNotExists([]byte(bucket))
|
_, err := tx.CreateBucketIfNotExists([]byte(bucket))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -74,10 +73,9 @@ func NewShared(path string, buckets ...string) ([]*Client, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
})
|
}))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if closeErr := db.Close(); closeErr != nil {
|
if closeErr := Error.Wrap(db.Close()); closeErr != nil {
|
||||||
return nil, utils.CombineErrors(err, closeErr)
|
return nil, utils.CombineErrors(err, closeErr)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -100,15 +98,15 @@ func NewShared(path string, buckets ...string) ([]*Client, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) update(fn func(*bolt.Bucket) error) error {
|
func (client *Client) update(fn func(*bolt.Bucket) error) error {
|
||||||
return client.db.Update(func(tx *bolt.Tx) error {
|
return Error.Wrap(client.db.Update(func(tx *bolt.Tx) error {
|
||||||
return fn(tx.Bucket(client.Bucket))
|
return fn(tx.Bucket(client.Bucket))
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *Client) view(fn func(*bolt.Bucket) error) error {
|
func (client *Client) view(fn func(*bolt.Bucket) error) error {
|
||||||
return client.db.View(func(tx *bolt.Tx) error {
|
return Error.Wrap(client.db.View(func(tx *bolt.Tx) error {
|
||||||
return fn(tx.Bucket(client.Bucket))
|
return fn(tx.Bucket(client.Bucket))
|
||||||
})
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put adds a value to the provided key in boltdb, returning an error on failure.
|
// Put adds a value to the provided key in boltdb, returning an error on failure.
|
||||||
@ -153,19 +151,21 @@ func (client *Client) Delete(key storage.Key) error {
|
|||||||
|
|
||||||
// List returns either a list of keys for which boltdb has values or an error.
|
// List returns either a list of keys for which boltdb has values or an error.
|
||||||
func (client *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
func (client *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||||
return storage.ListKeys(client, first, limit)
|
rv, err := storage.ListKeys(client, first, limit)
|
||||||
|
return rv, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReverseList returns either a list of keys for which boltdb has values or an error.
|
// ReverseList returns either a list of keys for which boltdb has values or an error.
|
||||||
// Starts from first and iterates backwards
|
// Starts from first and iterates backwards
|
||||||
func (client *Client) ReverseList(first storage.Key, limit int) (storage.Keys, error) {
|
func (client *Client) ReverseList(first storage.Key, limit int) (storage.Keys, error) {
|
||||||
return storage.ReverseListKeys(client, first, limit)
|
rv, err := storage.ReverseListKeys(client, first, limit)
|
||||||
|
return rv, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes a BoltDB client
|
// Close closes a BoltDB client
|
||||||
func (client *Client) Close() error {
|
func (client *Client) Close() error {
|
||||||
if atomic.AddInt32(client.referenceCount, -1) == 0 {
|
if atomic.AddInt32(client.referenceCount, -1) == 0 {
|
||||||
return client.db.Close()
|
return Error.Wrap(client.db.Close())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user