V3-1989: Storage node database is locked for several minutes while submiting orders (#2410)

* remove infodb locks and give a unique name for each in memory created.

* changed max idle and open to 1 for memory DBs.  fixes table locking errors

* fixed race condition

* added file based infodb test

* added busy timeout parameter to the file based infodb for testing

* fixed imports

* removed db.locked() after merge from master
This commit is contained in:
ethanadams 2019-07-02 17:23:02 -04:00 committed by GitHub
parent 42e124cd08
commit 47e4584fbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 168 additions and 42 deletions

View File

@ -26,7 +26,6 @@ func (db *InfoDB) Bandwidth() bandwidth.DB { return &bandwidthdb{db} }
// Add adds bandwidth usage to the table
func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err = db.db.Exec(`
INSERT INTO
@ -39,7 +38,6 @@ func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action
// Summary returns summary of bandwidth usages
func (db *bandwidthdb) Summary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
usage := &bandwidth.Usage{}
@ -72,7 +70,6 @@ func (db *bandwidthdb) Summary(ctx context.Context, from, to time.Time) (_ *band
// SummaryBySatellite returns summary of bandwidth usage grouping by satellite.
func (db *bandwidthdb) SummaryBySatellite(ctx context.Context, from, to time.Time) (_ map[storj.NodeID]*bandwidth.Usage, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
entries := map[storj.NodeID]*bandwidth.Usage{}

View File

@ -36,8 +36,6 @@ func (db *certdb) Include(ctx context.Context, pi *identity.PeerIdentity) (certi
chain := encodePeerIdentity(pi)
defer db.locked()()
result, err := db.db.Exec(`INSERT INTO certificate(node_id, peer_identity) VALUES(?, ?)`, pi.ID, chain)
if err != nil && sqliteutil.IsConstraintError(err) {
err = db.db.QueryRow(`SELECT cert_id FROM certificate WHERE peer_identity = ?`, chain).Scan(&certid)
@ -56,9 +54,7 @@ func (db *certdb) LookupByCertID(ctx context.Context, id int64) (_ *identity.Pee
var pem *[]byte
db.mu.Lock()
err = db.db.QueryRow(`SELECT peer_identity FROM certificate WHERE cert_id = ?`, id).Scan(&pem)
db.mu.Unlock()
if err != nil {
return nil, ErrInfo.Wrap(err)

View File

@ -28,7 +28,6 @@ func (db *DB) Console() console.DB { return db.info.Console() }
// at least once
func (db *consoledb) GetSatelliteIDs(ctx context.Context, from, to time.Time) (_ storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
var satellites storj.NodeIDList
@ -89,7 +88,6 @@ func (db *consoledb) GetDailyBandwidthUsed(ctx context.Context, satelliteID stor
// sorted in ascending order and applied condition if any
func (db *consoledb) getDailyBandwidthUsed(ctx context.Context, cond string, args ...interface{}) (_ []console.BandwidthUsed, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
qb := strings.Builder{}
qb.WriteString("SELECT action, SUM(amount), created_at ")

View File

@ -5,12 +5,14 @@ package storagenodedb
import (
"database/sql"
"fmt"
"math/rand"
"os"
"path/filepath"
"sync"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/dbutil"
"storj.io/storj/internal/migrate"
@ -21,7 +23,6 @@ var ErrInfo = errs.Class("infodb")
// InfoDB implements information database for piecestore.
type InfoDB struct {
mu sync.Mutex
db *sql.DB
}
@ -31,7 +32,7 @@ func newInfo(path string) (*InfoDB, error) {
return nil, err
}
db, err := sql.Open("sqlite3", "file:"+path+"?_journal=WAL")
db, err := sql.Open("sqlite3", "file:"+path+"?_journal=WAL&_busy_timeout=10000")
if err != nil {
return nil, ErrInfo.Wrap(err)
}
@ -43,12 +44,22 @@ func newInfo(path string) (*InfoDB, error) {
// NewInfoInMemory creates a new inmemory InfoDB.
func NewInfoInMemory() (*InfoDB, error) {
db, err := sql.Open("sqlite3", ":memory:")
// create memory DB with a shared cache and a unique name to avoid collisions
db, err := sql.Open("sqlite3", fmt.Sprintf("file:memdb%d?mode=memory&cache=shared", rand.Int63()))
if err != nil {
return nil, ErrInfo.Wrap(err)
}
dbutil.Configure(db, mon)
// Set max idle and max open to 1 to control concurrent access to the memory DB
// Setting max open higher than 1 results in table locked errors
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
db.SetConnMaxLifetime(-1)
mon.Chain("db_stats", monkit.StatSourceFunc(
func(cb func(name string, val float64)) {
monkit.StatSourceFromStruct(db.Stats()).Stats(cb)
}))
return &InfoDB{db: db}, nil
}
@ -58,12 +69,6 @@ func (db *InfoDB) Close() error {
return db.db.Close()
}
// locked allows easy locking the database.
func (db *InfoDB) locked() func() {
db.mu.Lock()
return db.mu.Unlock
}
// CreateTables creates any necessary tables.
func (db *InfoDB) CreateTables(log *zap.Logger) error {
migration := db.Migration()

View File

@ -51,8 +51,6 @@ func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) (err error)
return ErrInfo.Wrap(err)
}
defer db.locked()()
_, err = db.db.Exec(`
INSERT INTO unsent_order(
satellite_id, serial_number,
@ -67,7 +65,6 @@ func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) (err error)
// ListUnsent returns orders that haven't been sent yet.
func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.Query(`
SELECT order_limit_serialized, order_serialized, certificate.peer_identity
@ -123,7 +120,6 @@ func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info
// Does not return uplink identity.
func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*orders.Info, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
// TODO: add some limiting
rows, err := db.db.Query(`
@ -171,7 +167,6 @@ func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.Node
// Archive marks order as being handled.
func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status orders.Status) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
result, err := db.db.Exec(`
INSERT INTO order_archive (
@ -208,7 +203,6 @@ func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial
// ListArchived returns orders that have been sent.
func (db *ordersdb) ListArchived(ctx context.Context, limit int) (_ []*orders.ArchivedInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.Query(`
SELECT order_limit_serialized, order_serialized, certificate.peer_identity,

View File

@ -38,8 +38,6 @@ func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) {
return ErrInfo.Wrap(err)
}
defer db.locked()()
_, err = db.db.ExecContext(ctx, db.Rebind(`
INSERT INTO
pieceinfo(satellite_id, piece_id, piece_size, piece_expiration, uplink_piece_hash, uplink_cert_id)
@ -59,14 +57,12 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
var uplinkPieceHash []byte
var uplinkIdentity []byte
db.mu.Lock()
err = db.db.QueryRowContext(ctx, db.Rebind(`
SELECT piece_size, piece_expiration, uplink_piece_hash, certificate.peer_identity
FROM pieceinfo
INNER JOIN certificate ON pieceinfo.uplink_cert_id = certificate.cert_id
WHERE satellite_id = ? AND piece_id = ?
`), satelliteID, pieceID).Scan(&info.PieceSize, &info.PieceExpiration, &uplinkPieceHash, &uplinkIdentity)
db.mu.Unlock()
if err != nil {
return nil, ErrInfo.Wrap(err)
@ -89,7 +85,6 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
// Delete deletes piece information.
func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err = db.db.ExecContext(ctx, db.Rebind(`
DELETE FROM pieceinfo
@ -103,7 +98,6 @@ func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, piece
// DeleteFailed marks piece as a failed deletion.
func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err = db.db.ExecContext(ctx, db.Rebind(`
UPDATE pieceinfo
@ -118,7 +112,6 @@ func (db *pieceinfo) DeleteFailed(ctx context.Context, satelliteID storj.NodeID,
// GetExpired gets pieceinformation identites that are expired.
func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (infos []pieces.ExpiredInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.QueryContext(ctx, db.Rebind(`
SELECT satellite_id, piece_id, piece_size
@ -145,7 +138,6 @@ func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit
// SpaceUsed calculates disk space used by all pieces
func (db *pieceinfo) SpaceUsed(ctx context.Context) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
var sum sql.NullInt64
err = db.db.QueryRowContext(ctx, db.Rebind(`
@ -162,7 +154,6 @@ func (db *pieceinfo) SpaceUsed(ctx context.Context) (_ int64, err error) {
// SpaceUsed calculates disk space used by all pieces
func (db *pieceinfo) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
var sum sql.NullInt64
err = db.db.QueryRowContext(ctx, db.Rebind(`

View File

@ -4,9 +4,24 @@
package storagenodedbtest_test
import (
"runtime"
"sync"
"testing"
"github.com/golang/protobuf/ptypes"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testidentity"
"storj.io/storj/internal/testrand"
"storj.io/storj/pkg/auth/signing"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/storagenodedb"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
@ -14,3 +29,140 @@ func TestDatabase(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
})
}
func TestFileConcurrency(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
log := zaptest.NewLogger(t)
db, err := storagenodedb.New(log, storagenodedb.Config{
Pieces: ctx.Dir("storage"),
Info2: ctx.Dir("storage") + "/info.db",
Kademlia: ctx.Dir("storage") + "/kademlia",
})
if err != nil {
t.Fatal(err)
}
defer ctx.Check(db.Close)
testConcurrency(t, ctx, db)
}
func TestInMemoryConcurrency(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
log := zaptest.NewLogger(t)
db, err := storagenodedb.NewInMemory(log, ctx.Dir("storage"))
if err != nil {
t.Fatal(err)
}
defer ctx.Check(db.Close)
testConcurrency(t, ctx, db)
}
func testConcurrency(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB) {
t.Run("Sqlite", func(t *testing.T) {
runtime.GOMAXPROCS(2)
err := db.CreateTables()
if err != nil {
t.Fatal(err)
}
ordersMap := make(map[string]orders.Info)
err = createOrders(t, ctx, ordersMap, 1000)
require.NoError(t, err)
err = insertOrders(t, ctx, db, ordersMap)
require.NoError(t, err)
err = verifyOrders(t, ctx, db, ordersMap)
require.NoError(t, err)
})
}
func insertOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, ordersMap map[string]orders.Info) (err error) {
var wg sync.WaitGroup
for _, order := range ordersMap {
wg.Add(1)
o := order
go insertOrder(t, ctx, db, &wg, &o)
}
wg.Wait()
return nil
}
func insertOrder(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, wg *sync.WaitGroup, order *orders.Info) {
defer wg.Done()
err := db.Orders().Enqueue(ctx, order)
require.NoError(t, err)
}
func verifyOrders(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB, orders map[string]orders.Info) (err error) {
dbOrders, _ := db.Orders().ListUnsent(ctx, 10000)
found := 0
for _, order := range orders {
for _, dbOrder := range dbOrders {
if order.Order.SerialNumber == dbOrder.Order.SerialNumber {
//fmt.Printf("Found %v\n", order.Order.SerialNumber)
found++
}
}
}
require.Equal(t, len(orders), found, "Number found must equal the length of the test data")
return nil
}
func createOrders(t *testing.T, ctx *testcontext.Context, orders map[string]orders.Info, count int) (err error) {
for i := 0; i < count; i++ {
key, err := uuid.New()
if err != nil {
return err
}
order := createOrder(t, ctx)
orders[key.String()] = *order
}
return nil
}
func createOrder(t *testing.T, ctx *testcontext.Context) (info *orders.Info) {
storageNodeIdentity := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion())
satelliteIdentity := testidentity.MustPregeneratedSignedIdentity(1, storj.LatestIDVersion())
uplink := testidentity.MustPregeneratedSignedIdentity(3, storj.LatestIDVersion())
piece := storj.NewPieceID()
serialNumber := testrand.SerialNumber()
exp := ptypes.TimestampNow()
limit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satelliteIdentity), &pb.OrderLimit{
SerialNumber: serialNumber,
SatelliteId: satelliteIdentity.ID,
UplinkId: uplink.ID,
StorageNodeId: storageNodeIdentity.ID,
PieceId: piece,
Limit: 100,
Action: pb.PieceAction_GET,
PieceExpiration: exp,
OrderExpiration: exp,
})
require.NoError(t, err)
order, err := signing.SignOrder(ctx, signing.SignerFromFullIdentity(uplink), &pb.Order{
SerialNumber: serialNumber,
Amount: 50,
})
require.NoError(t, err)
return &orders.Info{
Limit: limit,
Order: order,
Uplink: uplink.PeerIdentity(),
}
}

View File

@ -26,7 +26,6 @@ func (db *InfoDB) UsedSerials() piecestore.UsedSerials { return &usedSerials{db}
// Add adds a serial to the database.
func (db *usedSerials) Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err = db.db.Exec(`
INSERT INTO
@ -39,7 +38,6 @@ func (db *usedSerials) Add(ctx context.Context, satelliteID storj.NodeID, serial
// DeleteExpired deletes expired serial numbers
func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
_, err = db.db.Exec(`DELETE FROM used_serial WHERE expiration < ?`, now)
return ErrInfo.Wrap(err)
@ -49,7 +47,6 @@ func (db *usedSerials) DeleteExpired(ctx context.Context, now time.Time) (err er
// Note, this will lock the database and should only be used during startup.
func (db *usedSerials) IterateAll(ctx context.Context, fn piecestore.SerialNumberFn) (err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
rows, err := db.db.Query(`SELECT satellite_id, serial_number, expiration FROM used_serial`)
if err != nil {

View File

@ -40,8 +40,6 @@ func (db *vouchersdb) Put(ctx context.Context, voucher *pb.Voucher) (err error)
return ErrInfo.Wrap(err)
}
defer db.locked()()
_, err = db.db.Exec(`
INSERT INTO vouchers(
satellite_id,
@ -59,7 +57,6 @@ func (db *vouchersdb) Put(ctx context.Context, voucher *pb.Voucher) (err error)
// NeedVoucher returns true if a voucher from a particular satellite is expired, about to expire, or does not exist
func (db *vouchersdb) NeedVoucher(ctx context.Context, satelliteID storj.NodeID, expirationBuffer time.Duration) (need bool, err error) {
defer mon.Task()(&ctx)(&err)
defer db.locked()()
expiresBefore := time.Now().UTC().Add(expirationBuffer)
@ -85,7 +82,6 @@ func (db *vouchersdb) NeedVoucher(ctx context.Context, satelliteID storj.NodeID,
func (db *vouchersdb) GetValid(ctx context.Context, satellites []storj.NodeID) (*pb.Voucher, error) {
var err error
defer mon.Task()(&ctx)(&err)
defer db.locked()()
var args []interface{}