Make teststore thread-safe (#689)
This commit is contained in:
parent
4734a7447b
commit
eb07715d53
@ -7,6 +7,7 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
@ -15,6 +16,8 @@ var errInternal = errors.New("internal error")
|
||||
|
||||
// Client implements in-memory key value store
|
||||
type Client struct {
|
||||
mu sync.Mutex
|
||||
|
||||
Items []storage.ListItem
|
||||
ForceError int
|
||||
|
||||
@ -47,6 +50,11 @@ func (store *Client) indexOf(key storage.Key) (int, bool) {
|
||||
return i, store.Items[i].Key.Equal(key)
|
||||
}
|
||||
|
||||
func (store *Client) locked() func() {
|
||||
store.mu.Lock()
|
||||
return store.mu.Unlock
|
||||
}
|
||||
|
||||
func (store *Client) forcedError() bool {
|
||||
if store.ForceError > 0 {
|
||||
store.ForceError--
|
||||
@ -57,6 +65,8 @@ func (store *Client) forcedError() bool {
|
||||
|
||||
// Put adds a value to store
|
||||
func (store *Client) Put(key storage.Key, value storage.Value) error {
|
||||
defer store.locked()()
|
||||
|
||||
store.version++
|
||||
store.CallCount.Put++
|
||||
if store.forcedError() {
|
||||
@ -86,6 +96,8 @@ func (store *Client) Put(key storage.Key, value storage.Value) error {
|
||||
|
||||
// Get gets a value to store
|
||||
func (store *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
defer store.locked()()
|
||||
|
||||
store.CallCount.Get++
|
||||
|
||||
if store.forcedError() {
|
||||
@ -106,6 +118,8 @@ func (store *Client) Get(key storage.Key) (storage.Value, error) {
|
||||
|
||||
// GetAll gets all values from the store
|
||||
func (store *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
defer store.locked()()
|
||||
|
||||
store.CallCount.GetAll++
|
||||
if len(keys) > storage.LookupLimit {
|
||||
return nil, storage.ErrLimitExceeded
|
||||
@ -129,6 +143,8 @@ func (store *Client) GetAll(keys storage.Keys) (storage.Values, error) {
|
||||
|
||||
// Delete deletes key and the value
|
||||
func (store *Client) Delete(key storage.Key) error {
|
||||
defer store.locked()()
|
||||
|
||||
store.version++
|
||||
store.CallCount.Delete++
|
||||
|
||||
@ -152,24 +168,32 @@ func (store *Client) Delete(key storage.Key) error {
|
||||
|
||||
// List lists all keys starting from start and upto limit items
|
||||
func (store *Client) List(first storage.Key, limit int) (storage.Keys, error) {
|
||||
store.mu.Lock()
|
||||
store.CallCount.List++
|
||||
if store.forcedError() {
|
||||
store.mu.Unlock()
|
||||
return nil, errors.New("internal error")
|
||||
}
|
||||
store.mu.Unlock()
|
||||
return storage.ListKeys(store, first, limit)
|
||||
}
|
||||
|
||||
// ReverseList lists all keys in revers order
|
||||
func (store *Client) ReverseList(first storage.Key, limit int) (storage.Keys, error) {
|
||||
store.mu.Lock()
|
||||
store.CallCount.ReverseList++
|
||||
if store.forcedError() {
|
||||
store.mu.Unlock()
|
||||
return nil, errors.New("internal error")
|
||||
}
|
||||
store.mu.Unlock()
|
||||
return storage.ReverseListKeys(store, first, limit)
|
||||
}
|
||||
|
||||
// Close closes the store
|
||||
func (store *Client) Close() error {
|
||||
defer store.locked()()
|
||||
|
||||
store.CallCount.Close++
|
||||
if store.forcedError() {
|
||||
return errInternal
|
||||
@ -179,6 +203,8 @@ func (store *Client) Close() error {
|
||||
|
||||
// Iterate iterates over items based on opts
|
||||
func (store *Client) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error {
|
||||
defer store.locked()()
|
||||
|
||||
store.CallCount.Iterate++
|
||||
if store.forcedError() {
|
||||
return errInternal
|
||||
|
@ -22,6 +22,8 @@ func RunTests(t *testing.T, store storage.KeyValueStore) {
|
||||
|
||||
t.Run("List", func(t *testing.T) { testList(t, store) })
|
||||
t.Run("ListV2", func(t *testing.T) { testListV2(t, store) })
|
||||
|
||||
t.Run("Parallel", func(t *testing.T) { testParallel(t, store) })
|
||||
}
|
||||
|
||||
func testConstraints(t *testing.T, store storage.KeyValueStore) {
|
||||
|
74
storage/testsuite/test_parallel.go
Normal file
74
storage/testsuite/test_parallel.go
Normal file
@ -0,0 +1,74 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package testsuite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
func testParallel(t *testing.T, store storage.KeyValueStore) {
|
||||
items := storage.Items{
|
||||
newItem("a", "1", false),
|
||||
newItem("b", "2", false),
|
||||
newItem("c", "3", false),
|
||||
}
|
||||
rand.Shuffle(len(items), items.Swap)
|
||||
defer cleanupItems(store, items)
|
||||
|
||||
for i := range items {
|
||||
item := items[i]
|
||||
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Put
|
||||
err := store.Put(item.Key, item.Value)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to put %q = %v: %v", item.Key, item.Value, err)
|
||||
}
|
||||
|
||||
// Get
|
||||
value, err := store.Get(item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get %q = %v: %v", item.Key, item.Value, err)
|
||||
}
|
||||
if !bytes.Equal([]byte(value), []byte(item.Value)) {
|
||||
t.Fatalf("invalid value for %q = %v: got %v", item.Key, item.Value, value)
|
||||
}
|
||||
|
||||
// GetAll
|
||||
values, err := store.GetAll([]storage.Key{item.Key})
|
||||
if len(values) != 1 {
|
||||
t.Fatalf("failed to GetAll: %v", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal([]byte(values[0]), []byte(item.Value)) {
|
||||
t.Fatalf("invalid GetAll %q = %v: got %v", item.Key, item.Value, values[i])
|
||||
}
|
||||
|
||||
// Update value
|
||||
nextValue := storage.Value(string(item.Value) + "X")
|
||||
err = store.Put(item.Key, nextValue)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to update %q = %v: %v", item.Key, nextValue, err)
|
||||
}
|
||||
|
||||
value, err = store.Get(item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get %q = %v: %v", item.Key, nextValue, err)
|
||||
}
|
||||
if !bytes.Equal([]byte(value), []byte(nextValue)) {
|
||||
t.Fatalf("invalid updated value for %q = %v: got %v", item.Key, nextValue, value)
|
||||
}
|
||||
|
||||
err = store.Delete(item.Key)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete %v: %v", item.Key, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user