add boltDB batching for Put operation, add benchmark test (#1865)

* add boltDB batching for Put operation, add benchmark test

* add batchPut method to kademlia routingTable

* add BatchPut method for other KeyValueStore to satisfy interface

* return err not implemented

* add noSync to boltdb client

* rm boltDB noSync

* make batch block and fix tests

* changes per CR

* rm test setting so it matches prod code behavior

* fix lint errs
This commit is contained in:
Jess G 2019-05-06 13:47:12 -07:00 committed by GitHub
parent 6f964e63e8
commit a9b8b50839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 243 additions and 9 deletions

View File

@ -10,10 +10,13 @@ import (
"github.com/boltdb/bolt"
"github.com/zeebo/errs"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/storage"
)
var mon = monkit.Package()
// Error is the default boltdb errs class
var Error = errs.Class("boltdb error")
@ -106,14 +109,38 @@ func (client *Client) update(fn func(*bolt.Bucket) error) error {
}))
}
func (client *Client) batch(fn func(*bolt.Bucket) error) error {
return Error.Wrap(client.db.Batch(func(tx *bolt.Tx) error {
return fn(tx.Bucket(client.Bucket))
}))
}
func (client *Client) view(fn func(*bolt.Bucket) error) error {
return Error.Wrap(client.db.View(func(tx *bolt.Tx) error {
return fn(tx.Bucket(client.Bucket))
}))
}
// Put adds a value to the provided key in boltdb, returning an error on failure.
// Put adds a key/value to boltDB in a batch, where boltDB commits the batch to to disk every
// 1000 operations or 10ms, whichever is first. The MaxBatchDelay are using default settings.
// Ref: https://github.com/boltdb/bolt/blob/master/db.go#L160
// Note: when using this method, check if it need to be executed asynchronously
// since it blocks for the duration db.MaxBatchDelay.
func (client *Client) Put(key storage.Key, value storage.Value) error {
start := time.Now()
if key.IsZero() {
return storage.ErrEmptyKey.New("")
}
err := client.batch(func(bucket *bolt.Bucket) error {
return bucket.Put(key, value)
})
mon.IntVal("boltdb_batch_time_elapsed").Observe(int64(time.Since(start)))
return err
}
// PutAndCommit adds a key/value to BoltDB and writes it to disk.
func (client *Client) PutAndCommit(key storage.Key, value storage.Value) error {
if key.IsZero() {
return storage.ErrEmptyKey.New("")
}

View File

@ -8,10 +8,11 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"github.com/zeebo/errs"
"storj.io/storj/internal/testcontext"
"storj.io/storj/storage"
"storj.io/storj/storage/testsuite"
)
@ -140,3 +141,189 @@ func BenchmarkSuiteLong(b *testing.B) {
}
testsuite.BenchmarkPathOperationsInLargeDb(b, longStore)
}
func BenchmarkClientWrite(b *testing.B) {
// setup db
ctx := testcontext.New(b)
defer ctx.Cleanup()
dbfile := ctx.File("testbolt.db")
dbs, err := NewShared(dbfile, "kbuckets", "nodes")
if err != nil {
b.Fatalf("failed to create db: %v\n", err)
}
defer func() {
if err := dbs[0].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
if err := dbs[1].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
}()
kdb := dbs[0]
// benchmark test: execute 1000 Put operations where each call to `PutAndCommit` does the following:
// 1) create a BoltDB transaction (tx), 2) execute the db operation, 3) commit the tx which writes it to disk.
for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
key := storage.Key(fmt.Sprintf("testkey%d", i))
value := storage.Value("testvalue")
wg.Add(1)
go func() {
defer wg.Done()
err := kdb.PutAndCommit(key, value)
if err != nil {
b.Fatal("Put err:", err)
}
}()
}
wg.Wait()
}
}
func BenchmarkClientNoSyncWrite(b *testing.B) {
// setup db
ctx := testcontext.New(b)
defer ctx.Cleanup()
dbfile := ctx.File("testbolt.db")
dbs, err := NewShared(dbfile, "kbuckets", "nodes")
if err != nil {
b.Fatalf("failed to create db: %v\n", err)
}
defer func() {
if err := dbs[0].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
if err := dbs[1].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
}()
kdb := dbs[0]
// benchmark test: execute 1000 Put operations with fsync turned off.
// Each call to `PutAndCommit` does the following: 1) creates a BoltDB transaction (tx),
// 2) executes the db operation, and 3) commits the tx which does NOT write it to disk.
kdb.db.NoSync = true
for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
key := storage.Key(fmt.Sprintf("testkey%d", i))
value := storage.Value("testvalue")
wg.Add(1)
go func() {
defer wg.Done()
err := kdb.PutAndCommit(key, value)
if err != nil {
b.Fatal("PutAndCommit Nosync err:", err)
}
}()
}
wg.Wait()
}
err = kdb.db.Sync()
if err != nil {
b.Fatalf("boltDB sync err: %v\n", err)
}
}
func BenchmarkClientBatchWrite(b *testing.B) {
// setup db
ctx := testcontext.New(b)
defer ctx.Cleanup()
dbfile := ctx.File("testbolt.db")
dbs, err := NewShared(dbfile, "kbuckets", "nodes")
if err != nil {
b.Fatalf("failed to create db: %v\n", err)
}
defer func() {
if err := dbs[0].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
if err := dbs[1].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
}()
kdb := dbs[0]
// benchmark test: batch 1000 Put operations.
// Each call to `Put` does the following: 1) adds the db operation to a queue in boltDB,
// 2) every 1000 operations or 10ms, whichever is first, BoltDB creates a single
// transaction for all operations currently in the batch, executes the operations,
// commits, and writes them to disk
for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
key := storage.Key(fmt.Sprintf("testkey%d", i))
value := storage.Value("testvalue")
wg.Add(1)
go func() {
defer wg.Done()
err := kdb.Put(key, value)
if err != nil {
b.Fatalf("boltDB put: %v\n", err)
}
}()
if err != nil {
b.Fatalf("boltDB put: %v\n", err)
}
}
wg.Wait()
}
}
func BenchmarkClientBatchNoSyncWrite(b *testing.B) {
// setup db
ctx := testcontext.New(b)
defer ctx.Cleanup()
dbfile := ctx.File("testbolt.db")
dbs, err := NewShared(dbfile, "kbuckets", "nodes")
if err != nil {
b.Fatalf("failed to create db: %v\n", err)
}
defer func() {
if err := dbs[0].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
if err := dbs[1].Close(); err != nil {
b.Fatalf("failed to close db: %v\n", err)
}
}()
kdb := dbs[0]
// benchmark test: batch 1000 Put operations with fsync turned off.
// Each call to `Put` does the following: 1) adds the db operation to a queue in boltDB,
// 2) every 1000 operations or 2 ms, whichever is first, BoltDB creates a single
// transaction for all operations currently in the batch, executes the operations,
// commits, but does NOT write them to disk
kdb.db.NoSync = true
for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
key := storage.Key(fmt.Sprintf("testkey%d", i))
value := storage.Value("testvalue")
wg.Add(1)
go func() {
defer wg.Done()
err := kdb.Put(key, value)
if err != nil {
b.Fatalf("boltDB put: %v\n", err)
}
}()
if err != nil {
b.Fatalf("boltDB put: %v\n", err)
}
}
wg.Wait()
err := kdb.db.Sync()
if err != nil {
b.Fatalf("boltDB sync err: %v\n", err)
}
}
}

View File

@ -6,6 +6,7 @@ package testsuite
import (
"path"
"strconv"
"sync"
"testing"
"storj.io/storj/storage"
@ -42,12 +43,21 @@ func RunBenchmarks(b *testing.B, store storage.KeyValueStore) {
b.Run("Put", func(b *testing.B) {
b.SetBytes(int64(len(items)))
for k := 0; k < b.N; k++ {
var wg sync.WaitGroup
for _, item := range items {
err := store.Put(item.Key, item.Value)
if err != nil {
b.Fatal(err)
}
key := item.Key
value := item.Value
wg.Add(1)
go func() {
defer wg.Done()
err := store.Put(key, value)
if err != nil {
b.Fatal("store.Put err", err)
}
}()
}
wg.Wait()
}
})

View File

@ -5,6 +5,7 @@ package testsuite
import (
"strconv"
"sync"
"testing"
"storj.io/storj/storage"
@ -35,11 +36,20 @@ func testConstraints(t *testing.T, store storage.KeyValueStore) {
})
}
var wg sync.WaitGroup
for _, item := range items {
if err := store.Put(item.Key, item.Value); err != nil {
t.Fatal(err)
}
key := item.Key
value := item.Value
wg.Add(1)
go func() {
defer wg.Done()
err := store.Put(key, value)
if err != nil {
t.Fatal("store.Put err:", err)
}
}()
}
wg.Wait()
defer cleanupItems(store, items)
t.Run("Put Empty", func(t *testing.T) {