From ea4a9e61c3ce1e09e41cccceab4c6c900d5361bc Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Thu, 6 Apr 2023 14:44:23 +0300 Subject: [PATCH] storage: delete unused code Change-Id: Ic85a09fa31ff0bf3e99bce685a14c03598c2a962 --- storage/boltdb/client.go | 194 +-------- storage/boltdb/client_test.go | 67 --- storage/common.go | 47 +-- storage/listkeys.go | 34 -- storage/listv2.go | 118 ------ storage/redis/client.go | 214 +--------- storage/redis/client_test.go | 1 - storage/redis/util.go | 105 ----- storage/redis/util_test.go | 28 -- storage/storelogger/logger.go | 80 ---- storage/storelogger/logger_test.go | 1 - storage/teststore/store.go | 284 +------------ storage/teststore/store_test.go | 4 +- storage/testsuite/bench.go | 13 - storage/testsuite/long_bench.go | 577 -------------------------- storage/testsuite/test.go | 206 +-------- storage/testsuite/test_crud.go | 84 +--- storage/testsuite/test_iterate.go | 132 ------ storage/testsuite/test_iterate_all.go | 209 ---------- storage/testsuite/test_list.go | 75 ---- storage/testsuite/test_listv2.go | 142 ------- storage/testsuite/test_parallel.go | 26 +- storage/testsuite/test_prefix.go | 63 --- storage/testsuite/utils.go | 83 +--- storage/util.go | 12 - 25 files changed, 23 insertions(+), 2776 deletions(-) delete mode 100644 storage/listkeys.go delete mode 100644 storage/listv2.go delete mode 100644 storage/redis/util.go delete mode 100644 storage/redis/util_test.go delete mode 100644 storage/testsuite/long_bench.go delete mode 100644 storage/testsuite/test_iterate.go delete mode 100644 storage/testsuite/test_iterate_all.go delete mode 100644 storage/testsuite/test_list.go delete mode 100644 storage/testsuite/test_listv2.go delete mode 100644 storage/testsuite/test_prefix.go diff --git a/storage/boltdb/client.go b/storage/boltdb/client.go index 21f35eb54..ef59483aa 100644 --- a/storage/boltdb/client.go +++ b/storage/boltdb/client.go @@ -4,7 +4,6 @@ package boltdb import ( - "bytes" "context" "sync/atomic" "time" @@ -28,7 +27,6 @@ type Client struct { Bucket []byte referenceCount *int32 - lookupLimit int } const ( @@ -63,16 +61,9 @@ func New(path, bucket string) (*Client, error) { referenceCount: refCount, Path: path, Bucket: []byte(bucket), - lookupLimit: storage.DefaultLookupLimit, }, nil } -// SetLookupLimit sets the lookup limit. -func (client *Client) SetLookupLimit(v int) { client.lookupLimit = v } - -// LookupLimit returns the maximum limit that is allowed. -func (client *Client) LookupLimit() int { return client.lookupLimit } - func (client *Client) update(fn func(*bbolt.Bucket) error) error { return Error.Wrap(client.db.Update(func(tx *bbolt.Tx) error { return fn(tx.Bucket(client.Bucket)) @@ -94,7 +85,7 @@ func (client *Client) view(fn func(*bbolt.Bucket) error) error { // Put adds a key/value to boltDB in a batch, where boltDB commits the batch to disk every // 1000 operations or 10ms, whichever is first. The MaxBatchDelay are using default settings. // Ref: https://github.com/boltdb/bolt/blob/master/db.go#L160 -// Note: when using this method, check if it need to be executed asynchronously +// Note: when using this method, check if it needs to be executed asynchronously // since it blocks for the duration db.MaxBatchDelay. func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) { defer mon.Task()(&ctx)(&err) @@ -153,41 +144,6 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) { }) } -// DeleteMultiple deletes keys ignoring missing keys. -func (client *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) { - defer mon.Task()(&ctx, len(keys))(&err) - - var items storage.Items - err = client.update(func(bucket *bbolt.Bucket) error { - for _, key := range keys { - value := bucket.Get(key) - if len(value) == 0 { - continue - } - - items = append(items, storage.ListItem{ - Key: key, - Value: value, - }) - - err := bucket.Delete(key) - if err != nil { - return err - } - } - return nil - }) - - return items, err -} - -// List returns either a list of keys for which boltdb has values or an error. -func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) { - defer mon.Task()(&ctx)(&err) - rv, err := storage.ListKeys(ctx, client, first, limit) - return rv, Error.Wrap(err) -} - // Close closes a BoltDB client. func (client *Client) Close() (err error) { if atomic.AddInt32(client.referenceCount, -1) == 0 { @@ -196,96 +152,6 @@ func (client *Client) Close() (err error) { return nil } -// GetAll finds all values for the provided keys (up to LookupLimit). -// If more keys are provided than the maximum, an error will be returned. -func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) { - defer mon.Task()(&ctx)(&err) - if len(keys) > client.lookupLimit { - return nil, storage.ErrLimitExceeded.New("lookup limit exceeded") - } - - vals := make(storage.Values, 0, len(keys)) - err = client.view(func(bucket *bbolt.Bucket) error { - for _, key := range keys { - val := bucket.Get([]byte(key)) - if val == nil { - vals = append(vals, nil) - continue - } - vals = append(vals, storage.CloneValue(storage.Value(val))) - } - return nil - }) - return vals, err -} - -// Iterate iterates over items based on opts. -func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - - if opts.Limit <= 0 || opts.Limit > client.lookupLimit { - opts.Limit = client.lookupLimit - } - - return client.IterateWithoutLookupLimit(ctx, opts, fn) -} - -// IterateWithoutLookupLimit calls the callback with an iterator over the keys, but doesn't enforce default limit on opts. -func (client *Client) IterateWithoutLookupLimit(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - - return client.view(func(bucket *bbolt.Bucket) error { - var cursor advancer = forward{bucket.Cursor()} - - start := true - lastPrefix := []byte{} - wasPrefix := false - - return fn(ctx, storage.IteratorFunc(func(ctx context.Context, item *storage.ListItem) bool { - var key, value []byte - if start { - key, value = cursor.PositionToFirst(opts.Prefix, opts.First) - start = false - } else { - key, value = cursor.Advance() - } - - if !opts.Recurse { - // when non-recursive skip all items that have the same prefix - if wasPrefix && bytes.HasPrefix(key, lastPrefix) { - key, value = cursor.SkipPrefix(lastPrefix) - wasPrefix = false - } - } - - if len(key) == 0 || !bytes.HasPrefix(key, opts.Prefix) { - return false - } - - if !opts.Recurse { - // check whether the entry is a proper prefix - if p := bytes.IndexByte(key[len(opts.Prefix):], storage.Delimiter); p >= 0 { - key = key[:len(opts.Prefix)+p+1] - lastPrefix = append(lastPrefix[:0], key...) - - item.Key = append(item.Key[:0], storage.Key(lastPrefix)...) - item.Value = item.Value[:0] - item.IsPrefix = true - - wasPrefix = true - return true - } - } - - item.Key = append(item.Key[:0], storage.Key(key)...) - item.Value = append(item.Value[:0], storage.Value(value)...) - item.IsPrefix = false - - return true - })) - }) -} - // Range iterates over all items in unspecified order. func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { defer mon.Task()(&ctx)(&err) @@ -296,61 +162,3 @@ func (client *Client) Range(ctx context.Context, fn func(context.Context, storag }) }) } - -type advancer interface { - PositionToFirst(prefix, first storage.Key) (key, value []byte) - SkipPrefix(prefix storage.Key) (key, value []byte) - Advance() (key, value []byte) -} - -type forward struct { - *bbolt.Cursor -} - -func (cursor forward) PositionToFirst(prefix, first storage.Key) (key, value []byte) { - if first.IsZero() || first.Less(prefix) { - return cursor.Seek([]byte(prefix)) - } - return cursor.Seek([]byte(first)) -} - -func (cursor forward) SkipPrefix(prefix storage.Key) (key, value []byte) { - return cursor.Seek(storage.AfterPrefix(prefix)) -} - -func (cursor forward) Advance() (key, value []byte) { - return cursor.Next() -} - -// CompareAndSwap atomically compares and swaps oldValue with newValue. -func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldValue, newValue storage.Value) (err error) { - defer mon.Task()(&ctx)(&err) - if key.IsZero() { - return storage.ErrEmptyKey.New("") - } - - return client.update(func(bucket *bbolt.Bucket) error { - data := bucket.Get([]byte(key)) - if len(data) == 0 { - if oldValue != nil { - return storage.ErrKeyNotFound.New("%q", key) - } - - if newValue == nil { - return nil - } - - return Error.Wrap(bucket.Put(key, newValue)) - } - - if !bytes.Equal(storage.Value(data), oldValue) { - return storage.ErrValueChanged.New("%q", key) - } - - if newValue == nil { - return Error.Wrap(bucket.Delete(key)) - } - - return Error.Wrap(bucket.Put(key, newValue)) - }) -} diff --git a/storage/boltdb/client_test.go b/storage/boltdb/client_test.go index 991b43c9b..0a7f519fe 100644 --- a/storage/boltdb/client_test.go +++ b/storage/boltdb/client_test.go @@ -4,15 +4,10 @@ package boltdb import ( - "context" - "fmt" "os" "path/filepath" "testing" - "github.com/zeebo/errs" - - "storj.io/storj/storage" "storj.io/storj/storage/testsuite" ) @@ -34,7 +29,6 @@ func TestSuite(t *testing.T) { } }() - store.SetLookupLimit(500) testsuite.RunTests(t, store) } @@ -58,64 +52,3 @@ func BenchmarkSuite(b *testing.B) { testsuite.RunBenchmarks(b, store) } - -type boltLongBenchmarkStore struct { - *Client - dirPath string -} - -func (store *boltLongBenchmarkStore) BulkImport(ctx context.Context, iter storage.Iterator) (err error) { - // turn off syncing during import - oldval := store.db.NoSync - store.db.NoSync = true - defer func() { store.db.NoSync = oldval }() - - var item storage.ListItem - for iter.Next(ctx, &item) { - if err := store.Put(ctx, item.Key, item.Value); err != nil { - return fmt.Errorf("Failed to insert data (%q, %q): %w", item.Key, item.Value, err) - } - } - - return store.db.Sync() -} - -func (store *boltLongBenchmarkStore) BulkDeleteAll(ctx context.Context) error { - // do nothing here; everything will be cleaned up later after the test completes. it's not - // worth it to wait for BoltDB to remove every key, one by one, and we can't just - // os.RemoveAll() the whole test directory at this point because those files are still open - // and unremoveable on Windows. - return nil -} - -var _ testsuite.BulkImporter = &boltLongBenchmarkStore{} -var _ testsuite.BulkCleaner = &boltLongBenchmarkStore{} - -func BenchmarkSuiteLong(b *testing.B) { - tempdir, err := os.MkdirTemp("", "storj-bolt") - if err != nil { - b.Fatal(err) - } - defer func() { - if err := os.RemoveAll(tempdir); err != nil { - b.Fatal(err) - } - }() - - dbname := filepath.Join(tempdir, "bolt.db") - store, err := New(dbname, "bucket") - if err != nil { - b.Fatalf("failed to create db: %v", err) - } - defer func() { - if err := errs.Combine(store.Close(), os.RemoveAll(tempdir)); err != nil { - b.Fatalf("failed to close db: %v", err) - } - }() - - longStore := &boltLongBenchmarkStore{ - Client: store, - dirPath: tempdir, - } - testsuite.BenchmarkPathOperationsInLargeDb(b, longStore) -} diff --git a/storage/common.go b/storage/common.go index d4a6a427f..e33597da0 100644 --- a/storage/common.go +++ b/storage/common.go @@ -46,9 +46,6 @@ type Values []Value // Items keeps all ListItem. type Items []ListItem -// DefaultLookupLimit is the default lookup limit for storage implementations. -const DefaultLookupLimit = 500 - // ListItem returns Key, Value, IsPrefix. type ListItem struct { Key Key @@ -62,61 +59,21 @@ type KeyValueStore interface { Put(context.Context, Key, Value) error // Get gets a value to store. Get(context.Context, Key) (Value, error) - // GetAll gets all values from the store. - GetAll(context.Context, Keys) (Values, error) // Delete deletes key and the value. Delete(context.Context, Key) error - // DeleteMultiple deletes keys and returns nil for. - DeleteMultiple(context.Context, []Key) (Items, error) // Range iterates over all items in unspecified order. // The Key and Value are valid only for the duration of callback. Range(ctx context.Context, fn func(context.Context, Key, Value) error) error - // List lists all keys starting from start and upto limit items. - List(ctx context.Context, start Key, limit int) (Keys, error) - // Iterate iterates over items based on opts. - Iterate(ctx context.Context, opts IterateOptions, fn func(context.Context, Iterator) error) error - // IterateWithoutLookupLimit calls the callback with an iterator over the keys, but doesn't enforce default limit on opts. - IterateWithoutLookupLimit(ctx context.Context, opts IterateOptions, fn func(context.Context, Iterator) error) error - // CompareAndSwap atomically compares and swaps oldValue with newValue. - CompareAndSwap(ctx context.Context, key Key, oldValue, newValue Value) error // Close closes the store. Close() error - - // LookupLimit returns the maximum limit that is allowed. - LookupLimit() int } -// IterateOptions contains options for iterator. -type IterateOptions struct { - // Prefix ensure. - Prefix Key - // First will be the first item iterator returns or the next item (previous when reverse). - First Key - // Recurse, do not collapse items based on Delimiter. - Recurse bool - // The maximum number of elements to be returned. - Limit int -} - -// Iterator iterates over a sequence of ListItems. -type Iterator interface { - // Next prepares the next list item. - // It returns true on success, or false if there is no next result row or an error happened while preparing it. - Next(ctx context.Context, item *ListItem) bool -} - -// IteratorFunc implements basic iterator. -type IteratorFunc func(ctx context.Context, item *ListItem) bool - -// Next returns the next item. -func (next IteratorFunc) Next(ctx context.Context, item *ListItem) bool { return next(ctx, item) } - -// IsZero returns true if the value struct is it's zero value. +// IsZero returns true if the value struct is a zero value. func (value Value) IsZero() bool { return len(value) == 0 } -// IsZero returns true if the key struct is it's zero value. +// IsZero returns true if the key struct is a zero value. func (key Key) IsZero() bool { return len(key) == 0 } diff --git a/storage/listkeys.go b/storage/listkeys.go deleted file mode 100644 index 3942576ce..000000000 --- a/storage/listkeys.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package storage - -import ( - "context" -) - -// ListKeys returns keys starting from first and upto limit. -// limit is capped to LookupLimit. -func ListKeys(ctx context.Context, store KeyValueStore, first Key, limit int) (_ Keys, err error) { - defer mon.Task()(&ctx)(&err) - if limit <= 0 || limit > store.LookupLimit() { - limit = store.LookupLimit() - } - - keys := make(Keys, 0, limit) - err = store.Iterate(ctx, IterateOptions{ - First: first, - Recurse: true, - }, func(ctx context.Context, it Iterator) error { - var item ListItem - for ; limit > 0 && it.Next(ctx, &item); limit-- { - if item.Key == nil { - panic("nil key") - } - keys = append(keys, CloneKey(item.Key)) - } - return nil - }) - - return keys, err -} diff --git a/storage/listv2.go b/storage/listv2.go deleted file mode 100644 index 3a05e88bf..000000000 --- a/storage/listv2.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package storage - -import ( - "context" -) - -// ListOptions are items that are optional for the LIST method. -type ListOptions struct { - Prefix Key - StartAfter Key // StartAfter is relative to Prefix - Recursive bool - IncludeValue bool - Limit int -} - -// ListV2 lists all keys corresponding to ListOptions. -// limit is capped to LookupLimit. -// -// more indicates if the result was truncated. If false -// then the result []ListItem includes all requested keys. -// If true then the caller must call List again to get more -// results by setting `StartAfter` appropriately. -func ListV2(ctx context.Context, store KeyValueStore, opts ListOptions) (result Items, more bool, err error) { - more, err = ListV2Iterate(ctx, store, opts, func(ctx context.Context, item *ListItem) error { - if opts.IncludeValue { - result = append(result, ListItem{ - Key: CloneKey(item.Key), - Value: CloneValue(item.Value), - IsPrefix: item.IsPrefix, - }) - } else { - result = append(result, ListItem{ - Key: CloneKey(item.Key), - IsPrefix: item.IsPrefix, - }) - } - return nil - }) - return result, more, err -} - -// ListV2Iterate lists all keys corresponding to ListOptions. -// limit is capped to LookupLimit. -// -// more indicates if the result was truncated. If false -// then the result []ListItem includes all requested keys. -// If true then the caller must call List again to get more -// results by setting `StartAfter` appropriately. -// -// The opts.IncludeValue is ignored for this func. -// The callback item will be reused for next calls. -// If the user needs the preserve the value, it must call storage.CloneValue or storage.CloneKey. -func ListV2Iterate(ctx context.Context, store KeyValueStore, opts ListOptions, fn func(context.Context, *ListItem) error) (more bool, err error) { - defer mon.Task()(&ctx)(&err) - - limit := opts.Limit - if limit <= 0 || limit > store.LookupLimit() { - limit = store.LookupLimit() - } - - more = true - - first := opts.StartAfter - iterate := func(ctx context.Context, it Iterator) error { - var item ListItem - skipFirst := true - for ; limit > 0; limit-- { - if !it.Next(ctx, &item) { - more = false - return nil - } - - relativeKey := item.Key[len(opts.Prefix):] - if skipFirst { - skipFirst = false - if relativeKey.Equal(first) { - // skip the first element in iteration - // if it matches the search key - limit++ - continue - } - } - - task := mon.TaskNamed("handling_item")(nil) - item.Key = relativeKey - err := fn(ctx, &item) - task(nil) - - if err != nil { - return err - } - } - - // we still need to consume one item for the more flag - more = it.Next(ctx, &item) - return nil - } - - var firstFull Key - if !opts.StartAfter.IsZero() { - firstFull = joinKey(opts.Prefix, opts.StartAfter) - } - err = store.Iterate(ctx, IterateOptions{ - Prefix: opts.Prefix, - First: firstFull, - Recurse: opts.Recursive, - Limit: limit, - }, iterate) - - return more, err -} - -func joinKey(a, b Key) Key { - return append(append(Key{}, a...), b...) -} diff --git a/storage/redis/client.go b/storage/redis/client.go index 7496a3a77..efe0185d2 100644 --- a/storage/redis/client.go +++ b/storage/redis/client.go @@ -4,11 +4,9 @@ package redis import ( - "bytes" "context" "errors" "net/url" - "sort" "strconv" "time" @@ -34,8 +32,6 @@ const defaultNodeExpiration = 0 * time.Minute type Client struct { db *redis.Client TTL time.Duration - - lookupLimit int } // OpenClient returns a configured Client instance, verifying a successful connection to redis. @@ -46,8 +42,7 @@ func OpenClient(ctx context.Context, address, password string, db int) (*Client, Password: password, DB: db, }), - TTL: defaultNodeExpiration, - lookupLimit: storage.DefaultLookupLimit, + TTL: defaultNodeExpiration, } // ping here to verify we are able to connect to redis with the initialized client. @@ -79,12 +74,6 @@ func OpenClientFrom(ctx context.Context, address string) (*Client, error) { return OpenClient(ctx, redisurl.Host, q.Get("password"), db) } -// SetLookupLimit sets the lookup limit. -func (client *Client) SetLookupLimit(v int) { client.lookupLimit = v } - -// LookupLimit returns the maximum limit that is allowed. -func (client *Client) LookupLimit() int { return client.lookupLimit } - // Get looks up the provided key from redis returning either an error or the result. func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) { defer mon.Task()(&ctx)(&err) @@ -120,12 +109,6 @@ func (client *Client) Eval(ctx context.Context, script string, keys []string) (e return eval(ctx, client.db, script, keys) } -// List returns either a list of keys for which boltdb has values or an error. -func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) { - defer mon.Task()(&ctx)(&err) - return storage.ListKeys(ctx, client, first, limit) -} - // Delete deletes a key/value pair from redis, for a given the key. func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) { defer mon.Task()(&ctx)(&err) @@ -135,10 +118,10 @@ func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) { return delete(ctx, client.db, key) } -// DeleteMultiple deletes keys ignoring missing keys. -func (client *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) { - defer mon.Task()(&ctx, len(keys))(&err) - return deleteMultiple(ctx, client.db, keys) +// FlushDB deletes all keys in the currently selected DB. +func (client *Client) FlushDB(ctx context.Context) error { + _, err := client.db.FlushDB(ctx).Result() + return err } // Close closes a redis client. @@ -146,43 +129,6 @@ func (client *Client) Close() error { return client.db.Close() } -// GetAll is the bulk method for gets from the redis data store. -// The maximum keys returned will be LookupLimit. If more than that -// is requested, an error will be returned. -func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) { - defer mon.Task()(&ctx)(&err) - if len(keys) == 0 { - return nil, nil - } - if len(keys) > client.lookupLimit { - return nil, storage.ErrLimitExceeded.New("lookup limit exceeded") - } - - keyStrings := make([]string, len(keys)) - for i, v := range keys { - keyStrings[i] = v.String() - } - - results, err := client.db.MGet(ctx, keyStrings...).Result() - if err != nil { - return nil, err - } - - values := []storage.Value{} - for _, result := range results { - if result == nil { - values = append(values, nil) - } else { - s, ok := result.(string) - if !ok { - return nil, Error.New("invalid result type %T", result) - } - values = append(values, storage.Value(s)) - } - } - return values, nil -} - // Range iterates over all items in unspecified order. func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { defer mon.Task()(&ctx)(&err) @@ -212,127 +158,6 @@ func (client *Client) Range(ctx context.Context, fn func(context.Context, storag return Error.Wrap(it.Err()) } -// Iterate iterates over items based on opts. -func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - - if opts.Limit <= 0 || opts.Limit > client.lookupLimit { - opts.Limit = client.lookupLimit - } - return client.IterateWithoutLookupLimit(ctx, opts, fn) -} - -// IterateWithoutLookupLimit calls the callback with an iterator over the keys, but doesn't enforce default limit on opts. -func (client *Client) IterateWithoutLookupLimit(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - - all, err := client.allPrefixedItems(ctx, opts.Prefix, opts.First, nil, opts.Limit) - if err != nil { - return err - } - - if !opts.Recurse { - all = sortAndCollapse(all, opts.Prefix) - } - - return fn(ctx, &StaticIterator{ - Items: all, - }) -} - -// FlushDB deletes all keys in the currently selected DB. -func (client *Client) FlushDB(ctx context.Context) error { - _, err := client.db.FlushDB(ctx).Result() - return err -} - -func (client *Client) allPrefixedItems(ctx context.Context, prefix, first, last storage.Key, limit int) (storage.Items, error) { - var all storage.Items - seen := map[string]struct{}{} - - match := string(escapeMatch([]byte(prefix))) + "*" - it := client.db.Scan(ctx, 0, match, 0).Iterator() - for it.Next(ctx) { - key := it.Val() - if !first.IsZero() && storage.Key(key).Less(first) { - continue - } - if !last.IsZero() && last.Less(storage.Key(key)) { - continue - } - - if _, ok := seen[key]; ok { - continue - } - seen[key] = struct{}{} - - value, err := client.db.Get(ctx, key).Bytes() - if err != nil { - return nil, err - } - - all = append(all, storage.ListItem{ - Key: storage.Key(key), - Value: storage.Value(value), - IsPrefix: false, - }) - } - - sort.Sort(all) - - return all, nil -} - -// CompareAndSwap atomically compares and swaps oldValue with newValue. -func (client *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldValue, newValue storage.Value) (err error) { - defer mon.Task()(&ctx)(&err) - if key.IsZero() { - return storage.ErrEmptyKey.New("") - } - - txf := func(tx *redis.Tx) error { - value, err := get(ctx, tx, key) - if storage.ErrKeyNotFound.Has(err) { - if oldValue != nil { - return storage.ErrKeyNotFound.New("%q", key) - } - - if newValue == nil { - return nil - } - - // runs only if the watched keys remain unchanged - _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - return put(ctx, pipe, key, newValue, client.TTL) - }) - return err - } - if err != nil { - return err - } - - if !bytes.Equal(value, oldValue) { - return storage.ErrValueChanged.New("%q", key) - } - - // runs only if the watched keys remain unchanged - _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - if newValue == nil { - return delete(ctx, pipe, key) - } - return put(ctx, pipe, key, newValue, client.TTL) - }) - - return err - } - - err = client.db.Watch(ctx, txf, key.String()) - if errors.Is(err, redis.TxFailedErr) { - return storage.ErrValueChanged.New("%q", key) - } - return Error.Wrap(err) -} - func get(ctx context.Context, cmdable redis.Cmdable, key storage.Key) (_ storage.Value, err error) { defer mon.Task()(&ctx)(&err) value, err := cmdable.Get(ctx, string(key)).Bytes() @@ -371,32 +196,3 @@ func eval(ctx context.Context, cmdable redis.Cmdable, script string, keys []stri } return errs.Wrap(err) } - -func deleteMultiple(ctx context.Context, cmdable redis.Cmdable, keys []storage.Key) (_ storage.Items, err error) { - defer mon.Task()(&ctx, len(keys))(&err) - - var items storage.Items - for _, key := range keys { - value, err := get(ctx, cmdable, key) - if err != nil { - if errors.Is(err, redis.Nil) || storage.ErrKeyNotFound.Has(err) { - continue - } - return items, err - } - - err = delete(ctx, cmdable, key) - if err != nil { - if errors.Is(err, redis.Nil) || storage.ErrKeyNotFound.Has(err) { - continue - } - return items, err - } - items = append(items, storage.ListItem{ - Key: key, - Value: value, - }) - } - - return items, nil -} diff --git a/storage/redis/client_test.go b/storage/redis/client_test.go index b90efd231..23fabeea3 100644 --- a/storage/redis/client_test.go +++ b/storage/redis/client_test.go @@ -29,7 +29,6 @@ func TestSuite(t *testing.T) { t.Fatal(err) } - client.SetLookupLimit(500) testsuite.RunTests(t, client) } diff --git a/storage/redis/util.go b/storage/redis/util.go deleted file mode 100644 index 88f7f8bbb..000000000 --- a/storage/redis/util.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package redis - -import ( - "bytes" - "context" - "sort" - - "github.com/redis/go-redis/v9" - - "storj.io/storj/storage" -) - -func escapeMatch(match []byte) []byte { - start := 0 - escaped := []byte{} - for i, b := range match { - switch b { - case '?', '*', '[', ']', '\\': - escaped = append(escaped, match[start:i]...) - escaped = append(escaped, '\\', b) - start = i + 1 - } - } - if start == 0 { - return match - } - - return append(escaped, match[start:]...) -} - -// sortAndCollapse sorts items and combines elements based on Delimiter. -// items will be reused and modified. -func sortAndCollapse(items storage.Items, prefix []byte) storage.Items { - sort.Sort(items) - result := items[:0] - - var currentPrefix []byte - var prefixed bool - for _, item := range items { - if prefixed { - if bytes.HasPrefix(item.Key, currentPrefix) { - continue - } - prefixed = false - } - - if p := bytes.IndexByte(item.Key[len(prefix):], storage.Delimiter); p >= 0 { - currentPrefix = item.Key[:len(prefix)+p+1] - prefixed = true - result = append(result, storage.ListItem{ - Key: currentPrefix, - IsPrefix: true, - }) - } else { - result = append(result, item) - } - } - - return result -} - -// StaticIterator implements an iterator over list of items. -type StaticIterator struct { - Items storage.Items - Index int -} - -// Next returns the next item from the iterator. -func (it *StaticIterator) Next(ctx context.Context, item *storage.ListItem) bool { - if it.Index >= len(it.Items) { - return false - } - *item = it.Items[it.Index] - it.Index++ - return true -} - -// ScanIterator iterates over scan command items. -type ScanIterator struct { - db *redis.Client - it *redis.ScanIterator -} - -// Next returns the next item from the iterator. -func (it *ScanIterator) Next(ctx context.Context, item *storage.ListItem) bool { - ok := it.it.Next(ctx) - if !ok { - return false - } - - key := it.it.Val() - value, err := it.db.Get(ctx, key).Bytes() - if err != nil { - return false - } - - item.Key = storage.Key(key) - item.Value = storage.Value(value) - item.IsPrefix = false - - return true -} diff --git a/storage/redis/util_test.go b/storage/redis/util_test.go deleted file mode 100644 index 4bf33258f..000000000 --- a/storage/redis/util_test.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package redis - -import ( - "bytes" - "testing" -) - -func TestEscapeMatch(t *testing.T) { - type escaped struct{ unescaped, escaped string } - var examples = []escaped{ - {`h?llo`, `h\?llo`}, - {`h*llo`, `h\*llo`}, - {`h[ae]llo`, `h\[ae\]llo`}, - {`h[^e]llo`, `h\[^e\]llo`}, - {`h[a-b]llo`, `h\[a-b\]llo`}, - {`h\[a-b\]llo`, `h\\\[a-b\\\]llo`}, - } - - for _, example := range examples { - got := escapeMatch([]byte(example.unescaped)) - if !bytes.Equal(got, []byte(example.escaped)) { - t.Errorf("fail %q got %q expected %q", example.unescaped, got, example.escaped) - } - } -} diff --git a/storage/storelogger/logger.go b/storage/storelogger/logger.go index 33b567590..0acad0de1 100644 --- a/storage/storelogger/logger.go +++ b/storage/storelogger/logger.go @@ -31,9 +31,6 @@ func New(log *zap.Logger, store storage.KeyValueStore) *Logger { return &Logger{log.Named(name), store} } -// LookupLimit returns the maximum limit that is allowed. -func (store *Logger) LookupLimit() int { return store.store.LookupLimit() } - // Put adds a value to store. func (store *Logger) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) { defer mon.Task()(&ctx)(&err) @@ -48,13 +45,6 @@ func (store *Logger) Get(ctx context.Context, key storage.Key) (_ storage.Value, return store.store.Get(ctx, key) } -// GetAll gets all values from the store corresponding to keys. -func (store *Logger) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) { - defer mon.Task()(&ctx)(&err) - store.log.Debug("GetAll", zap.Any("keys", keys)) - return store.store.GetAll(ctx, keys) -} - // Delete deletes key and the value. func (store *Logger) Delete(ctx context.Context, key storage.Key) (err error) { defer mon.Task()(&ctx)(&err) @@ -62,21 +52,6 @@ func (store *Logger) Delete(ctx context.Context, key storage.Key) (err error) { return store.store.Delete(ctx, key) } -// DeleteMultiple deletes keys ignoring missing keys. -func (store *Logger) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) { - defer mon.Task()(&ctx, len(keys))(&err) - store.log.Debug("DeleteMultiple", zap.Any("keys", keys)) - return store.store.DeleteMultiple(ctx, keys) -} - -// List lists all keys starting from first and upto limit items. -func (store *Logger) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) { - defer mon.Task()(&ctx)(&err) - keys, err := store.store.List(ctx, first, limit) - store.log.Debug("List", zap.ByteString("first", first), zap.Int("limit", limit), zap.Strings("keys", keys.Strings())) - return keys, err -} - // Range iterates over all items in unspecified order. func (store *Logger) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { defer mon.Task()(&ctx)(&err) @@ -91,67 +66,12 @@ func (store *Logger) Range(ctx context.Context, fn func(context.Context, storage }) } -// Iterate iterates over items based on opts. -func (store *Logger) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - store.log.Debug("Iterate", - zap.ByteString("prefix", opts.Prefix), - zap.ByteString("first", opts.First), - zap.Bool("recurse", opts.Recurse), - ) - return store.store.Iterate(ctx, opts, func(ctx context.Context, it storage.Iterator) error { - return fn(ctx, storage.IteratorFunc(func(ctx context.Context, item *storage.ListItem) bool { - ok := it.Next(ctx, item) - if ok { - store.log.Debug(" ", - zap.ByteString("key", item.Key), - zap.Int("value length", len(item.Value)), - zap.Binary("truncated value", truncate(item.Value)), - ) - } - return ok - })) - }) -} - -// IterateWithoutLookupLimit calls the callback with an iterator over the keys, but doesn't enforce default limit on opts. -func (store *Logger) IterateWithoutLookupLimit(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - store.log.Debug("IterateWithoutLookupLimit", - zap.ByteString("prefix", opts.Prefix), - zap.ByteString("first", opts.First), - zap.Bool("recurse", opts.Recurse), - ) - return store.store.IterateWithoutLookupLimit(ctx, opts, func(ctx context.Context, it storage.Iterator) error { - return fn(ctx, storage.IteratorFunc(func(ctx context.Context, item *storage.ListItem) bool { - ok := it.Next(ctx, item) - if ok { - store.log.Debug(" ", - zap.ByteString("key", item.Key), - zap.Int("value length", len(item.Value)), - zap.Binary("truncated value", truncate(item.Value)), - ) - } - return ok - })) - }) -} - // Close closes the store. func (store *Logger) Close() error { store.log.Debug("Close") return store.store.Close() } -// CompareAndSwap atomically compares and swaps oldValue with newValue. -func (store *Logger) CompareAndSwap(ctx context.Context, key storage.Key, oldValue, newValue storage.Value) (err error) { - defer mon.Task()(&ctx)(&err) - store.log.Debug("CompareAndSwap", zap.ByteString("key", key), - zap.Int("old value length", len(oldValue)), zap.Int("new value length", len(newValue)), - zap.Binary("truncated old value", truncate(oldValue)), zap.Binary("truncated new value", truncate(newValue))) - return store.store.CompareAndSwap(ctx, key, oldValue, newValue) -} - func truncate(v storage.Value) (t []byte) { if len(v)-1 < 10 { t = []byte(v) diff --git a/storage/storelogger/logger_test.go b/storage/storelogger/logger_test.go index ed78c22c7..067fef625 100644 --- a/storage/storelogger/logger_test.go +++ b/storage/storelogger/logger_test.go @@ -14,7 +14,6 @@ import ( func TestSuite(t *testing.T) { store := teststore.New() - store.SetLookupLimit(500) logged := New(zap.NewNop(), store) testsuite.RunTests(t, logged) } diff --git a/storage/teststore/store.go b/storage/teststore/store.go index 194b0dd0b..e90ba12ea 100644 --- a/storage/teststore/store.go +++ b/storage/teststore/store.go @@ -4,7 +4,6 @@ package teststore import ( - "bytes" "context" "errors" "sort" @@ -20,40 +19,28 @@ var mon = monkit.Package() // Client implements in-memory key value store. type Client struct { - lookupLimit int - mu sync.Mutex Items []storage.ListItem ForceError int CallCount struct { - Get int - Put int - List int - GetAll int - Delete int - Close int - Range int - Iterate int - CompareAndSwap int + Get int + Put int + Delete int + Close int + Range int } version int } // New creates a new in-memory key-value store. -func New() *Client { return &Client{lookupLimit: storage.DefaultLookupLimit} } +func New() *Client { return &Client{} } // MigrateToLatest pretends to migrate to latest db schema version. func (store *Client) MigrateToLatest(ctx context.Context) error { return nil } -// SetLookupLimit sets the lookup limit. -func (store *Client) SetLookupLimit(v int) { store.lookupLimit = v } - -// LookupLimit returns the maximum limit that is allowed. -func (store *Client) LookupLimit() int { return store.lookupLimit } - // indexOf finds index of key or where it could be inserted. func (store *Client) indexOf(key storage.Key) (int, bool) { i := sort.Search(len(store.Items), func(k int) bool { @@ -128,32 +115,6 @@ func (store *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, return storage.CloneValue(store.Items[keyIndex].Value), nil } -// GetAll gets all values from the store. -func (store *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) { - defer mon.Task()(&ctx)(&err) - defer store.locked()() - - store.CallCount.GetAll++ - if len(keys) > store.lookupLimit { - return nil, storage.ErrLimitExceeded.New("lookup limit exceeded") - } - - if store.forcedError() { - return nil, errors.New("internal error") - } - - values := storage.Values{} - for _, key := range keys { - keyIndex, found := store.indexOf(key) - if !found { - values = append(values, nil) - continue - } - values = append(values, storage.CloneValue(store.Items[keyIndex].Value)) - } - return values, nil -} - // Delete deletes key and the value. func (store *Client) Delete(ctx context.Context, key storage.Key) (err error) { defer mon.Task()(&ctx)(&err) @@ -179,48 +140,6 @@ func (store *Client) Delete(ctx context.Context, key storage.Key) (err error) { return nil } -// DeleteMultiple deletes keys ignoring missing keys. -func (store *Client) DeleteMultiple(ctx context.Context, keys []storage.Key) (_ storage.Items, err error) { - defer mon.Task()(&ctx, len(keys))(&err) - defer store.locked()() - - store.version++ - store.CallCount.Delete++ - - if store.forcedError() { - return nil, errInternal - } - - var items storage.Items - for _, key := range keys { - keyIndex, found := store.indexOf(key) - if !found { - continue - } - e := store.Items[keyIndex] - items = append(items, storage.ListItem{ - Key: e.Key, - Value: e.Value, - }) - store.delete(keyIndex) - } - - return items, nil -} - -// List lists all keys starting from start and upto limit items. -func (store *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) { - defer mon.Task()(&ctx)(&err) - store.mu.Lock() - store.CallCount.List++ - if store.forcedError() { - store.mu.Unlock() - return nil, errors.New("internal error") - } - store.mu.Unlock() - return storage.ListKeys(ctx, store, first, limit) -} - // Close closes the store. func (store *Client) Close() error { defer store.locked()() @@ -251,197 +170,6 @@ func (store *Client) Range(ctx context.Context, fn func(context.Context, storage return nil } -// Iterate iterates over items based on opts. -func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - return store.IterateWithoutLookupLimit(ctx, opts, fn) -} - -// IterateWithoutLookupLimit calls the callback with an iterator over the keys, but doesn't enforce default limit on opts. -func (store *Client) IterateWithoutLookupLimit(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { - defer mon.Task()(&ctx)(&err) - - store.mu.Lock() - store.CallCount.Iterate++ - if store.forcedError() { - store.mu.Unlock() - return errInternal - } - store.mu.Unlock() - - var cursor advancer = &forward{newCursor(store)} - - cursor.PositionToFirst(opts.Prefix, opts.First) - var lastPrefix storage.Key - var wasPrefix bool - - return fn(ctx, storage.IteratorFunc( - func(ctx context.Context, item *storage.ListItem) bool { - next, ok := cursor.Advance() - if !ok { - return false - } - - if !opts.Recurse { - if wasPrefix && bytes.HasPrefix(next.Key, lastPrefix) { - next, ok = cursor.SkipPrefix(lastPrefix) - - if !ok { - return false - } - wasPrefix = false - } - } - - if !bytes.HasPrefix(next.Key, opts.Prefix) { - cursor.close() - return false - } - - if !opts.Recurse { - if p := bytes.IndexByte([]byte(next.Key[len(opts.Prefix):]), storage.Delimiter); p >= 0 { - lastPrefix = append(lastPrefix[:0], next.Key[:len(opts.Prefix)+p+1]...) - - item.Key = append(item.Key[:0], lastPrefix...) - item.Value = item.Value[:0] - item.IsPrefix = true - - wasPrefix = true - return true - } - } - - item.Key = append(item.Key[:0], next.Key...) - item.Value = append(item.Value[:0], next.Value...) - item.IsPrefix = false - - return true - })) -} - -type advancer interface { - close() - PositionToFirst(prefix, first storage.Key) - SkipPrefix(prefix storage.Key) (*storage.ListItem, bool) - Advance() (*storage.ListItem, bool) -} - -type forward struct{ cursor } - -func (cursor *forward) PositionToFirst(prefix, first storage.Key) { - if first.IsZero() || first.Less(prefix) { - cursor.positionForward(prefix) - } else { - cursor.positionForward(first) - } -} - -func (cursor *forward) SkipPrefix(prefix storage.Key) (*storage.ListItem, bool) { - cursor.positionForward(storage.AfterPrefix(prefix)) - return cursor.next() -} - -func (cursor *forward) Advance() (*storage.ListItem, bool) { - return cursor.next() -} - -// cursor implements iterating over items with basic repositioning when the items change. -type cursor struct { - store *Client - done bool - nextIndex int - version int - lastKey storage.Key -} - -func newCursor(store *Client) cursor { return cursor{store: store} } - -func (cursor *cursor) close() { - cursor.store = nil - cursor.done = true -} - -// positionForward positions at key or the next item. -func (cursor *cursor) positionForward(key storage.Key) { - store := cursor.store - store.mu.Lock() - cursor.version = store.version - cursor.nextIndex, _ = store.indexOf(key) - store.mu.Unlock() - cursor.lastKey = storage.CloneKey(key) -} - -func (cursor *cursor) next() (*storage.ListItem, bool) { - store := cursor.store - if cursor.done { - return nil, false - } - defer store.locked()() - - if cursor.version != store.version { - cursor.version = store.version - var ok bool - cursor.nextIndex, ok = store.indexOf(cursor.lastKey) - if ok { - cursor.nextIndex++ - } - } - - if cursor.nextIndex >= len(store.Items) { - cursor.close() - return nil, false - } - - item := &store.Items[cursor.nextIndex] - cursor.lastKey = item.Key - cursor.nextIndex++ - return item, true -} - -// CompareAndSwap atomically compares and swaps oldValue with newValue. -func (store *Client) CompareAndSwap(ctx context.Context, key storage.Key, oldValue, newValue storage.Value) (err error) { - defer mon.Task()(&ctx)(&err) - defer store.locked()() - - store.version++ - store.CallCount.CompareAndSwap++ - if store.forcedError() { - return errInternal - } - - if key.IsZero() { - return storage.ErrEmptyKey.New("") - } - - keyIndex, found := store.indexOf(key) - if !found { - if oldValue != nil { - return storage.ErrKeyNotFound.New("%q", key) - } - - if newValue == nil { - return nil - } - - store.put(keyIndex, key, newValue) - return nil - } - - kv := &store.Items[keyIndex] - if !bytes.Equal(kv.Value, oldValue) { - return storage.ErrValueChanged.New("%q", key) - } - - if newValue == nil { - store.delete(keyIndex) - return nil - } - - kv.Value = storage.CloneValue(newValue) - - return nil -} - func (store *Client) put(keyIndex int, key storage.Key, value storage.Value) { store.Items = append(store.Items, storage.ListItem{}) copy(store.Items[keyIndex+1:], store.Items[keyIndex:]) diff --git a/storage/teststore/store_test.go b/storage/teststore/store_test.go index 5a02f6118..df2a54734 100644 --- a/storage/teststore/store_test.go +++ b/storage/teststore/store_test.go @@ -10,9 +10,7 @@ import ( ) func TestSuite(t *testing.T) { - store := New() - store.SetLookupLimit(500) - testsuite.RunTests(t, store) + testsuite.RunTests(t, New()) } func BenchmarkSuite(b *testing.B) { testsuite.RunBenchmarks(b, New()) diff --git a/storage/testsuite/bench.go b/storage/testsuite/bench.go index 34ff619e9..807c1ca1c 100644 --- a/storage/testsuite/bench.go +++ b/storage/testsuite/bench.go @@ -78,17 +78,4 @@ func RunBenchmarks(b *testing.B, store storage.KeyValueStore) { } } }) - - b.Run("ListV2 5", func(b *testing.B) { - b.SetBytes(int64(len(items))) - for k := 0; k < b.N; k++ { - _, _, err := storage.ListV2(ctx, store, storage.ListOptions{ - StartAfter: storage.Key("gamma"), - Limit: 5, - }) - if err != nil { - b.Fatal(err) - } - } - }) } diff --git a/storage/testsuite/long_bench.go b/storage/testsuite/long_bench.go deleted file mode 100644 index 47fc1f60f..000000000 --- a/storage/testsuite/long_bench.go +++ /dev/null @@ -1,577 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package testsuite - -import ( - "bufio" - "bytes" - "compress/gzip" - "context" - "flag" - "fmt" - "io" - "os" - "strconv" - "strings" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/zeebo/errs" - - "storj.io/common/testcontext" - "storj.io/storj/storage" -) - -const ( - maxProblems = 10 - - // the largest and deepest level-2 directory in the dataset. - largestLevel2Directory = "Peronosporales/hateless/" - - // the directory in the dataset with the most immediate children. - largestSingleDirectory = "Peronosporales/hateless/tod/unricht/sniveling/Puyallup/" -) - -var ( - // see https://github.com/storj/test-path-corpus - longBenchmarksData = flag.String("test-bench-long", "", "Run the long benchmark suite against eligible KeyValueStores using the given paths dataset") - - noInitDb = flag.Bool("test-bench-long-noinit", false, "Don't import the large dataset for the long benchmarks; assume it is already loaded") - noCleanDb = flag.Bool("test-bench-long-noclean", false, "Don't clean the long benchmarks KeyValueStore after running, for debug purposes") -) - -func interpolateInput(input []byte) ([]byte, error) { - output := make([]byte, 0, len(input)) - var bytesConsumed int - var next byte - - for pos := 0; pos < len(input); pos += bytesConsumed { - if input[pos] == '\\' { - bytesConsumed = 2 - if pos+1 >= len(input) { - return output, errs.New("encoding error in input: escape at end-of-string") - } - switch input[pos+1] { - case 'x': - if pos+3 >= len(input) { - return output, errs.New("encoding error in input: incomplete \\x escape") - } - nextVal, err := strconv.ParseUint(string(input[pos+2:pos+4]), 16, 8) - if err != nil { - return output, errs.New("encoding error in input: invalid \\x escape: %v", err) - } - next = byte(nextVal) - bytesConsumed = 4 - case 't': - next = '\t' - case 'n': - next = '\n' - case 'r': - next = '\r' - case '\\': - next = '\\' - default: - next = input[pos+1] - } - } else { - next = input[pos] - bytesConsumed = 1 - } - output = append(output, next) - } - return output, nil -} - -// KVInputIterator is passed to the BulkImport method on BulkImporter-satisfying objects. It will -// iterate over a fairly large list of paths that should be imported for testing purposes. -type KVInputIterator struct { - itemNo int - scanner *bufio.Scanner - fileName string - err error - reachedEnd bool - closeFunc func() error -} - -func newKVInputIterator(pathToFile string) (*KVInputIterator, error) { - kvi := &KVInputIterator{fileName: pathToFile} - pathData, err := os.Open(pathToFile) - if err != nil { - return nil, errs.New("Failed to open file with test data (expected at %q): %v", pathToFile, err) - } - var reader io.Reader = pathData - if strings.HasSuffix(pathToFile, ".gz") { - gzReader, err := gzip.NewReader(pathData) - if err != nil { - return nil, errs.Combine( - errs.New("Failed to create gzip reader: %v", err), - pathData.Close()) - } - kvi.closeFunc = func() error { return errs.Combine(gzReader.Close(), pathData.Close()) } - reader = gzReader - } else { - kvi.closeFunc = pathData.Close - } - kvi.scanner = bufio.NewScanner(reader) - return kvi, nil -} - -// Next should be called by BulkImporter instances in order to advance the iterator. It fills in -// a storage.ListItem instance, and returns a boolean indicating whether to continue. When false is -// returned, iteration should stop and nothing is expected to be changed in item. -func (kvi *KVInputIterator) Next(ctx context.Context, item *storage.ListItem) bool { - if !kvi.scanner.Scan() { - kvi.reachedEnd = true - kvi.err = kvi.scanner.Err() - return false - } - if kvi.err != nil { - return false - } - kvi.itemNo++ - parts := bytes.Split(kvi.scanner.Bytes(), []byte("\t")) - if len(parts) != 3 { - kvi.err = errs.New("Invalid data in %q on line %d: has %d fields", kvi.fileName, kvi.itemNo, len(parts)) - return false - } - k, err := interpolateInput(parts[1]) - if err != nil { - kvi.err = errs.New("Failed to read key data from %q on line %d: %v", kvi.fileName, kvi.itemNo, err) - return false - } - v, err := interpolateInput(parts[2]) - if err != nil { - kvi.err = errs.New("Failed to read value data from %q on line %d: %v", kvi.fileName, kvi.itemNo, err) - return false - } - item.Key = storage.Key(k) - item.Value = storage.Value(v) - item.IsPrefix = false - return true -} - -// Error() returns the last error encountered while iterating over the input file. This must be -// checked after iteration completes, at least. -func (kvi *KVInputIterator) Error() error { - return kvi.err -} - -func openTestData(tb testing.TB) *KVInputIterator { - tb.Helper() - inputIter, err := newKVInputIterator(*longBenchmarksData) - if err != nil { - tb.Fatal(err) - } - return inputIter -} - -// BenchmarkPathOperationsInLargeDb runs the "long benchmarks" suite for KeyValueStore instances. -func BenchmarkPathOperationsInLargeDb(b *testing.B, store storage.KeyValueStore) { - if *longBenchmarksData == "" { - b.Skip("Long benchmarks not enabled.") - } - - ctx := testcontext.New(b) - defer ctx.Cleanup() - - initStore(b, ctx, store) - - doTest := func(name string, testFunc func(*testing.B, *testcontext.Context, storage.KeyValueStore)) { - b.Run(name, func(bb *testing.B) { - for i := 0; i < bb.N; i++ { - testFunc(bb, ctx, store) - } - }) - } - - doTest("DeepRecursive", deepRecursive) - doTest("DeepNonRecursive", deepNonRecursive) - doTest("ShallowRecursive", shallowRecursive) - doTest("ShallowNonRecursive", shallowNonRecursive) - doTest("TopRecursiveLimit", topRecursiveLimit) - doTest("TopRecursiveStartAt", topRecursiveStartAt) - doTest("TopNonRecursive", topNonRecursive) - - cleanupStore(b, ctx, store) -} - -func importBigPathset(tb testing.TB, ctx *testcontext.Context, store storage.KeyValueStore) { - // make sure this is an empty db, or else refuse to run - if !isEmptyKVStore(tb, ctx, store) { - tb.Fatal("Provided KeyValueStore is not empty. The long benchmarks are destructive. Not running!") - } - - inputIter := openTestData(tb) - defer func() { - if err := inputIter.closeFunc(); err != nil { - tb.Logf("Failed to close test data stream: %v", err) - } - }() - - importer, ok := store.(BulkImporter) - if ok { - tb.Log("Performing bulk import...") - err := importer.BulkImport(ctx, inputIter) - - if err != nil { - errStr := "Provided KeyValueStore failed to import data" - if inputIter.reachedEnd { - errStr += " after iterating over all input data" - } else { - errStr += fmt.Sprintf(" after iterating over %d lines of input data", inputIter.itemNo) - } - tb.Fatalf("%s: %v", errStr, err) - } - } else { - tb.Log("Performing manual import...") - - var item storage.ListItem - for inputIter.Next(ctx, &item) { - if err := store.Put(ctx, item.Key, item.Value); err != nil { - tb.Fatalf("Provided KeyValueStore failed to insert data (%q, %q): %v", item.Key, item.Value, err) - } - } - } - if err := inputIter.Error(); err != nil { - tb.Fatalf("Failed to iterate over input data during import. Error was %v", err) - } - if !inputIter.reachedEnd { - tb.Fatal("Provided KeyValueStore failed to exhaust input iterator") - } -} - -func initStore(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - b.Helper() - - if !*noInitDb { - // can't find a way to run the import and cleanup as sub-benchmarks, while still requiring - // that they be run once and only once, and aborting the whole benchmark if import fails. - // we don't want the time it takes to count against the first sub-benchmark only, so we - // stop the timer. however, we do care about the time that import and cleanup take, though, - // so we'll at least log it. - b.StopTimer() - tStart := time.Now() - importBigPathset(b, ctx, store) - b.Logf("importing took %s", time.Since(tStart).String()) - b.StartTimer() - } -} - -func cleanupStore(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - b.Helper() - if !*noCleanDb { - tStart := time.Now() - cleanupBigPathset(b, ctx, store) - b.Logf("cleanup took %s", time.Since(tStart).String()) - } -} - -type verifyOpts struct { - iterateOpts storage.IterateOptions - doIterations int - batchSize int - expectCount int - expectLastKey storage.Key -} - -func benchAndVerifyIteration(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore, opts *verifyOpts) { - problems := 0 - iteration := 0 - - errMsg := func(tmpl string, args ...interface{}) string { - errMsg1 := fmt.Sprintf(tmpl, args...) - return fmt.Sprintf("[on iteration %d/%d, with opts %+v]: %s", iteration, opts.doIterations, opts.iterateOpts, errMsg1) - } - - errorf := func(tmpl string, args ...interface{}) { - b.Error(errMsg(tmpl, args...)) - problems++ - if problems > maxProblems { - b.Fatal("Too many problems") - } - } - - fatalf := func(tmpl string, args ...interface{}) { - b.Fatal(errMsg(tmpl, args...)) - } - - expectRemaining := opts.expectCount - totalFound := 0 - var lastKey storage.Key - var bytesTotal int64 - lookupSize := opts.batchSize - - for iteration = 1; iteration <= opts.doIterations; iteration++ { - results, err := iterateItems(ctx, store, opts.iterateOpts, lookupSize) - if err != nil { - fatalf("Failed to call iterateItems(): %v", err) - } - if len(results) == 0 { - // we can't continue to iterate - fatalf("iterateItems() got 0 items") - } - if len(results) > lookupSize { - fatalf("iterateItems() returned _more_ items than limit: %d>%d", len(results), lookupSize) - } - if iteration > 0 && results[0].Key.Equal(lastKey) { - // fine and normal - results = results[1:] - } - expectRemaining -= len(results) - if len(results) != opts.batchSize && expectRemaining != 0 { - errorf("iterateItems read %d items instead of %d", len(results), opts.batchSize) - } - for n, result := range results { - totalFound++ - bytesTotal += int64(len(result.Key)) + int64(len(result.Value)) - if result.Key.IsZero() { - errorf("got an empty key among the results at n=%d!", n) - continue - } - if result.Key.Equal(lastKey) { - errorf("got the same key (%q) twice in a row, not on a lookup boundary!", lastKey) - } - if result.Key.Less(lastKey) { - errorf("KeyValueStore returned items out of order! %q < %q", result.Key, lastKey) - } - if result.IsPrefix { - if !result.Value.IsZero() { - errorf("Expected no metadata for IsPrefix item %q, but got %q", result.Key, result.Value) - } - if result.Key[len(result.Key)-1] != byte('/') { - errorf("Expected key for IsPrefix item %q to end in /, but it does not", result.Key) - } - } else { - valAsNum, err := strconv.ParseUint(string(result.Value), 10, 32) - if err != nil { - errorf("Expected metadata for key %q to hold a decimal integer, but it has %q", result.Key, result.Value) - } else if int(valAsNum) != len(result.Key) { - errorf("Expected metadata for key %q to be %d, but it has %q", result.Key, len(result.Key), result.Value) - } - } - lastKey = result.Key - } - if len(results) > 0 { - opts.iterateOpts.First = results[len(results)-1].Key - } - lookupSize = opts.batchSize + 1 // subsequent queries will start with the last element previously returned - } - b.SetBytes(bytesTotal) - - if totalFound != opts.expectCount { - b.Fatalf("Expected to read %d items in total, but got %d", opts.expectCount, totalFound) - } - if !opts.expectLastKey.IsZero() { - if diff := cmp.Diff(opts.expectLastKey.String(), lastKey.String()); diff != "" { - b.Fatalf("KeyValueStore got wrong last item: (-want +got)\n%s", diff) - } - } -} - -func deepRecursive(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - opts := &verifyOpts{ - iterateOpts: storage.IterateOptions{ - Prefix: storage.Key(largestLevel2Directory), - Recurse: true, - }, - } - - // these are not expected to exhaust all available items - opts.doIterations = 500 - opts.batchSize = store.LookupLimit() - opts.expectCount = opts.doIterations * opts.batchSize - - // verify with: - // select encode(fullpath, 'escape') from ( - // select rank() over (order by fullpath), fullpath from pathdata where fullpath > $1::bytea - // ) x where rank = ($2 * $3); - // where $1 = largestLevel2Directory, $2 = doIterations, and $3 = batchSize - opts.expectLastKey = storage.Key("Peronosporales/hateless/tod/extrastate/firewood/renomination/cletch/herotheism/aluminiferous/nub") - - benchAndVerifyIteration(b, ctx, store, opts) -} - -func deepNonRecursive(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - opts := &verifyOpts{ - iterateOpts: storage.IterateOptions{ - Prefix: storage.Key(largestLevel2Directory), - Recurse: false, - }, - doIterations: 1, - batchSize: 10000, - } - - // verify with: - // select count(*) from list_directory(''::bytea, $1::bytea) ld(fp, md); - // where $1 is largestLevel2Directory - opts.expectCount = 119 - - // verify with: - // select encode(fp, 'escape') from ( - // select * from list_directory(''::bytea, $1::bytea) ld(fp, md) - // ) x order by fp desc limit 1; - // where $1 is largestLevel2Directory - opts.expectLastKey = storage.Key("Peronosporales/hateless/xerophily/") - - benchAndVerifyIteration(b, ctx, store, opts) -} - -func shallowRecursive(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - opts := &verifyOpts{ - iterateOpts: storage.IterateOptions{ - Prefix: storage.Key(largestSingleDirectory), - Recurse: true, - }, - } - - // verify with: - // select count(*) from pathdata - // where fullpath > $1::bytea and fullpath < bytea_increment($1::bytea); - // where $1 = largestSingleDirectory - opts.expectCount = 18574 - - // verify with: - // select convert_from(fullpath, 'UTF8') from pathdata - // where fullpath > $1::bytea and fullpath < bytea_increment($1::bytea) - // order by fullpath desc limit 1; - // where $1 = largestSingleDirectory - opts.expectLastKey = storage.Key("Peronosporales/hateless/tod/unricht/sniveling/Puyallup/élite") - - // i didn't plan it this way, but expectedCount happens to have some nicely-sized factors for - // our purposes with no messy remainder. 74 * 251 = 18574 - opts.doIterations = 74 - opts.batchSize = 251 - - benchAndVerifyIteration(b, ctx, store, opts) -} - -func shallowNonRecursive(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - opts := &verifyOpts{ - iterateOpts: storage.IterateOptions{ - Prefix: storage.Key(largestSingleDirectory), - Recurse: false, - }, - doIterations: 2, - batchSize: 10000, - } - - // verify with: - // select count(*) from list_directory(''::bytea, $1::bytea) ld(fp, md); - // where $1 is largestSingleDirectory - opts.expectCount = 18574 - - // verify with: - // select encode(fp, 'escape') from ( - // select * from list_directory(''::bytea, $1::bytea) ld(fp, md) - // ) x order by fp desc limit 1; - // where $1 = largestSingleDirectory - opts.expectLastKey = storage.Key("Peronosporales/hateless/tod/unricht/sniveling/Puyallup/élite") - - benchAndVerifyIteration(b, ctx, store, opts) -} - -func topRecursiveLimit(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - opts := &verifyOpts{ - iterateOpts: storage.IterateOptions{ - Recurse: true, - }, - doIterations: 100, - batchSize: 10000, - } - - // not expected to exhaust items - opts.expectCount = opts.doIterations * opts.batchSize - - // verify with: - // select encode(fullpath, 'escape') from ( - // select rank() over (order by fullpath), fullpath from pathdata - // ) x where rank = $1; - // where $1 = expectCount - opts.expectLastKey = storage.Key("nonresuscitation/synchronically/bechern/hemangiomatosis") - - benchAndVerifyIteration(b, ctx, store, opts) -} - -func topRecursiveStartAt(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - opts := &verifyOpts{ - iterateOpts: storage.IterateOptions{ - Recurse: true, - }, - doIterations: 100, - batchSize: 10000, - } - - // this is pretty arbitrary. just the key 100 positions before the end of the Peronosporales/hateless/ dir. - opts.iterateOpts.First = storage.Key("Peronosporales/hateless/warrener/anthropomancy/geisotherm/wickerwork") - - // not expected to exhaust items - opts.expectCount = opts.doIterations * opts.batchSize - - // verify with: - // select encode(fullpath, 'escape') from ( - // select fullpath from pathdata where fullpath >= $1::bytea order by fullpath limit $2 - // ) x order by fullpath desc limit 1; - // where $1 = iterateOpts.First and $2 = expectCount - opts.expectLastKey = storage.Key("raptured/heathbird/histrionism/vermifugous/barefaced/beechdrops/lamber/phlegmatic/blended/Gershon/scallop/burglarproof/incompensated/allanite/alehouse/embroilment/lienotoxin/monotonically/cumbersomeness") - - benchAndVerifyIteration(b, ctx, store, opts) -} - -func topNonRecursive(b *testing.B, ctx *testcontext.Context, store storage.KeyValueStore) { - opts := &verifyOpts{ - iterateOpts: storage.IterateOptions{ - Recurse: false, - }, - doIterations: 1, - batchSize: 10000, - } - - // verify with: - // select count(*) from list_directory(''::bytea, ''::bytea); - opts.expectCount = 21 - - // verify with: - // select encode(fp, 'escape') from ( - // select * from list_directory(''::bytea, ''::bytea) ld(fp, md) - // ) x order by fp desc limit 1; - opts.expectLastKey = storage.Key("vejoces") - - benchAndVerifyIteration(b, ctx, store, opts) -} - -func cleanupBigPathset(tb testing.TB, ctx *testcontext.Context, store storage.KeyValueStore) { - if *noCleanDb { - tb.Skip("Instructed not to clean up this KeyValueStore after long benchmarks are complete.") - } - - cleaner, ok := store.(BulkCleaner) - if ok { - tb.Log("Performing bulk cleanup...") - err := cleaner.BulkDeleteAll(ctx) - - if err != nil { - tb.Fatalf("Provided KeyValueStore failed to perform bulk delete: %v", err) - } - } else { - inputIter := openTestData(tb) - defer func() { - if err := inputIter.closeFunc(); err != nil { - tb.Logf("Failed to close input data stream: %v", err) - } - }() - - tb.Log("Performing manual cleanup...") - - var item storage.ListItem - for inputIter.Next(ctx, &item) { - if err := store.Delete(ctx, item.Key); err != nil { - tb.Fatalf("Provided KeyValueStore failed to delete item %q during cleanup: %v", item.Key, err) - } - } - if err := inputIter.Error(); err != nil { - tb.Fatalf("Failed to iterate over input data: %v", err) - } - } -} diff --git a/storage/testsuite/test.go b/storage/testsuite/test.go index b592c8595..e7300b464 100644 --- a/storage/testsuite/test.go +++ b/storage/testsuite/test.go @@ -4,14 +4,9 @@ package testsuite import ( - "bytes" - "encoding/gob" - "fmt" "strconv" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "storj.io/common/testcontext" @@ -26,21 +21,12 @@ func RunTests(t *testing.T, store storage.KeyValueStore) { t.Run("CRUD", func(t *testing.T) { testCRUD(t, ctx, store) }) t.Run("Constraints", func(t *testing.T) { testConstraints(t, ctx, store) }) t.Run("Range", func(t *testing.T) { testRange(t, ctx, store) }) - t.Run("Iterate", func(t *testing.T) { testIterate(t, ctx, store) }) - t.Run("IterateAll", func(t *testing.T) { testIterateAll(t, ctx, store) }) - t.Run("Prefix", func(t *testing.T) { testPrefix(t, ctx, store) }) - - t.Run("List", func(t *testing.T) { testList(t, ctx, store) }) - t.Run("ListV2", func(t *testing.T) { testListV2(t, ctx, store) }) - t.Run("Parallel", func(t *testing.T) { testParallel(t, ctx, store) }) } func testConstraints(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { - lookupLimit := store.LookupLimit() - var items storage.Items - for i := 0; i < lookupLimit+5; i++ { + for i := 0; i < 10; i++ { items = append(items, storage.ListItem{ Key: storage.Key("test-" + strconv.Itoa(i)), Value: storage.Value("xyz"), @@ -70,194 +56,4 @@ func testConstraints(t *testing.T, ctx *testcontext.Context, store storage.KeyVa t.Fatal("putting empty key should fail") } }) - - t.Run("GetAll limit", func(t *testing.T) { - _, err := store.GetAll(ctx, items[:lookupLimit].GetKeys()) - if err != nil { - t.Fatalf("GetAll LookupLimit should succeed: %v", err) - } - - _, err = store.GetAll(ctx, items[:lookupLimit+1].GetKeys()) - if !storage.ErrLimitExceeded.Has(err) { - t.Fatalf("GetAll LookupLimit+1 should fail: %v", err) - } - }) - - t.Run("List limit", func(t *testing.T) { - keys, err := store.List(ctx, nil, lookupLimit) - if err != nil || len(keys) != lookupLimit { - t.Fatalf("List LookupLimit should succeed: %v / got %d", err, len(keys)) - } - _, err = store.List(ctx, nil, lookupLimit+1) - if err != nil || len(keys) != lookupLimit { - t.Fatalf("List LookupLimit+1 shouldn't fail: %v / got %d", err, len(keys)) - } - }) - - t.Run("CompareAndSwap Empty Key", func(t *testing.T) { - var key storage.Key - var val storage.Value - - err := store.CompareAndSwap(ctx, key, val, val) - require.Error(t, err, "putting empty key should fail") - }) - - t.Run("CompareAndSwap Empty Old Value", func(t *testing.T) { - key := storage.Key("test-key") - val := storage.Value("test-value") - defer func() { _ = store.Delete(ctx, key) }() - - err := store.CompareAndSwap(ctx, key, nil, val) - require.NoError(t, err, "failed to update %q: %v -> %v: %+v", key, nil, val, err) - - value, err := store.Get(ctx, key) - require.NoError(t, err, "failed to get %q = %v: %+v", key, val, err) - require.Equal(t, value, val, "invalid value for %q = %v: got %v", key, val, value) - }) - - t.Run("CompareAndSwap Empty New Value", func(t *testing.T) { - key := storage.Key("test-key") - val := storage.Value("test-value") - defer func() { _ = store.Delete(ctx, key) }() - - err := store.Put(ctx, key, val) - require.NoError(t, err, "failed to put %q = %v: %+v", key, val, err) - - err = store.CompareAndSwap(ctx, key, val, nil) - require.NoError(t, err, "failed to update %q: %v -> %v: %+v", key, val, nil, err) - - value, err := store.Get(ctx, key) - require.Error(t, err, "got deleted value %q = %v", key, value) - }) - - t.Run("CompareAndSwap Empty Both Empty Values", func(t *testing.T) { - key := storage.Key("test-key") - - err := store.CompareAndSwap(ctx, key, nil, nil) - require.NoError(t, err, "failed to update %q: %v -> %v: %+v", key, nil, nil, err) - - value, err := store.Get(ctx, key) - require.Error(t, err, "got unexpected value %q = %v", key, value) - }) - - t.Run("CompareAndSwap Missing Key", func(t *testing.T) { - for i, tt := range []struct { - old, new storage.Value - }{ - {storage.Value("old-value"), nil}, - {storage.Value("old-value"), storage.Value("new-value")}, - } { - errTag := fmt.Sprintf("%d. %+v", i, tt) - key := storage.Key("test-key") - - err := store.CompareAndSwap(ctx, key, tt.old, tt.new) - assert.True(t, storage.ErrKeyNotFound.Has(err), "%s: unexpected error: %+v", errTag, err) - } - }) - - t.Run("CompareAndSwap Value Changed", func(t *testing.T) { - for i, tt := range []struct { - old, new storage.Value - }{ - {nil, nil}, - {nil, storage.Value("new-value")}, - {storage.Value("old-value"), nil}, - {storage.Value("old-value"), storage.Value("new-value")}, - } { - func() { - errTag := fmt.Sprintf("%d. %+v", i, tt) - key := storage.Key("test-key") - val := storage.Value("test-value") - defer func() { _ = store.Delete(ctx, key) }() - - err := store.Put(ctx, key, val) - require.NoError(t, err, errTag) - - err = store.CompareAndSwap(ctx, key, tt.old, tt.new) - assert.True(t, storage.ErrValueChanged.Has(err), "%s: unexpected error: %+v", errTag, err) - }() - } - }) - - t.Run("CompareAndSwap Concurrent", func(t *testing.T) { - const count = 100 - - key := storage.Key("test-key") - defer func() { _ = store.Delete(ctx, key) }() - - // Add concurrently all numbers from 1 to `count` in a set under test-key - var group errgroup.Group - for i := 0; i < count; i++ { - i := i - group.Go(func() error { - for { - set := make(map[int]bool) - - oldValue, err := store.Get(ctx, key) - if !storage.ErrKeyNotFound.Has(err) { - if err != nil { - return err - } - - set, err = decodeSet(oldValue) - if err != nil { - return err - } - } - - set[i] = true - newValue, err := encodeSet(set) - if err != nil { - return err - } - - err = store.CompareAndSwap(ctx, key, oldValue, storage.Value(newValue)) - if storage.ErrValueChanged.Has(err) { - // Another goroutine was faster. Make a new attempt. - continue - } - - return err - } - }) - } - err := group.Wait() - require.NoError(t, err) - - // Check that all numbers were added in the set - value, err := store.Get(ctx, key) - require.NoError(t, err) - - set, err := decodeSet(value) - require.NoError(t, err) - - for i := 0; i < count; i++ { - assert.Contains(t, set, i) - } - }) -} - -func encodeSet(set map[int]bool) ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - - err := enc.Encode(set) - if err != nil { - return nil, err - } - - return buf.Bytes(), nil -} - -func decodeSet(b []byte) (map[int]bool, error) { - buf := bytes.NewBuffer(b) - dec := gob.NewDecoder(buf) - - var set map[int]bool - err := dec.Decode(&set) - if err != nil { - return nil, err - } - - return set, nil } diff --git a/storage/testsuite/test_crud.go b/storage/testsuite/test_crud.go index 7b26c168d..e8551078e 100644 --- a/storage/testsuite/test_crud.go +++ b/storage/testsuite/test_crud.go @@ -6,11 +6,8 @@ package testsuite import ( "bytes" "math/rand" - "sort" "testing" - "github.com/stretchr/testify/require" - "storj.io/common/testcontext" "storj.io/storj/storage" ) @@ -54,89 +51,16 @@ func testCRUD(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStor } }) - t.Run("GetAll", func(t *testing.T) { - subset := items[:len(items)/2] - keys := subset.GetKeys() - values, err := store.GetAll(ctx, keys) - if err != nil { - t.Fatalf("failed to GetAll %q: %v", keys, err) - } - if len(values) != len(keys) { - t.Fatalf("failed to GetAll %q: got %q", keys, values) - } - for i, item := range subset { - if !bytes.Equal([]byte(values[i]), []byte(item.Value)) { - t.Fatalf("invalid GetAll %q = %v: got %v", item.Key, item.Value, values[i]) - } - } - }) - - t.Run("Update", func(t *testing.T) { - for i, item := range items { - next := items[(i+1)%len(items)] - err := store.CompareAndSwap(ctx, item.Key, item.Value, next.Value) - if err != nil { - t.Fatalf("failed to update %q: %v -> %v: %v", item.Key, item.Value, next.Value, err) - } - } - - for i, item := range items { - next := items[(i+1)%len(items)] - value, err := store.Get(ctx, item.Key) - if err != nil { - t.Fatalf("failed to get updated %q = %v: %v", item.Key, next.Value, err) - } - if !bytes.Equal([]byte(value), []byte(next.Value)) { - t.Fatalf("invalid updated value for %q = %v: got %v", item.Key, next.Value, value) - } - } - }) - t.Run("Delete", func(t *testing.T) { - k := len(items) / 2 - batch, nonbatch := items[:k], items[k:] - - var list []storage.Key - for _, item := range batch { - list = append(list, item.Key) - } - - var expected storage.Items - for _, item := range batch { - value, err := store.Get(ctx, item.Key) + for _, item := range items { + _, err := store.Get(ctx, item.Key) if err != nil { - t.Fatalf("failed to get %v: %v", item.Key, value) + t.Fatalf("failed to get %v", item.Key) } - expected = append(expected, storage.ListItem{ - Key: item.Key, - Value: value, - }) - } - - deleted, err := store.DeleteMultiple(ctx, list) - if err != nil { - t.Fatalf("failed to batch delete: %v", err) - } - - sort.Slice(expected, func(i, k int) bool { - return expected[i].Key.Less(expected[k].Key) - }) - sort.Slice(deleted, func(i, k int) bool { - return deleted[i].Key.Less(deleted[k].Key) - }) - require.Equal(t, expected, deleted) - - // Duplicate delete should also be fine. - retry, err := store.DeleteMultiple(ctx, list) - if err != nil { - t.Fatalf("failed to batch delete: %v", err) - } - if len(retry) != 0 { - t.Fatalf("expected delete to return nothing: %v", len(retry)) } // individual deletes - for _, item := range nonbatch { + for _, item := range items { err := store.Delete(ctx, item.Key) if err != nil { t.Fatalf("failed to delete %v: %v", item.Key, err) diff --git a/storage/testsuite/test_iterate.go b/storage/testsuite/test_iterate.go deleted file mode 100644 index fd7b062ff..000000000 --- a/storage/testsuite/test_iterate.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package testsuite - -import ( - "math/rand" - "testing" - - "storj.io/common/testcontext" - "storj.io/storj/storage" -) - -func testIterate(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { - items := storage.Items{ - newItem("a", "a", false), - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - } - rand.Shuffle(len(items), items.Swap) - defer cleanupItems(t, ctx, store, items) - - if err := storage.PutAll(ctx, store, items...); err != nil { - t.Fatalf("failed to setup: %v", err) - } - - testIterations(t, ctx, store, []iterationTest{ - {"no limits", - storage.IterateOptions{}, storage.Items{ - newItem("a", "a", false), - newItem("b/", "", true), - newItem("c", "c", false), - newItem("c/", "", true), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"at a", - storage.IterateOptions{ - First: storage.Key("a"), - }, storage.Items{ - newItem("a", "a", false), - newItem("b/", "", true), - newItem("c", "c", false), - newItem("c/", "", true), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"after a", - storage.IterateOptions{ - First: storage.NextKey(storage.Key("a")), - }, storage.Items{ - newItem("b/", "", true), - newItem("c", "c", false), - newItem("c/", "", true), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"at b", - storage.IterateOptions{ - First: storage.Key("b"), - }, storage.Items{ - newItem("b/", "", true), - newItem("c", "c", false), - newItem("c/", "", true), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"after b", - storage.IterateOptions{ - First: storage.NextKey(storage.Key("b")), - }, storage.Items{ - newItem("b/", "", true), - newItem("c", "c", false), - newItem("c/", "", true), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"after c", - storage.IterateOptions{ - First: storage.NextKey(storage.Key("c")), - }, storage.Items{ - newItem("c/", "", true), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"at e", - storage.IterateOptions{ - First: storage.Key("e"), - }, storage.Items{ - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"after e", - storage.IterateOptions{ - First: storage.NextKey(storage.Key("e")), - }, storage.Items{ - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"prefix b slash", - storage.IterateOptions{ - Prefix: storage.Key("b/"), - }, storage.Items{ - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - }}, - {"prefix c slash", - storage.IterateOptions{ - Prefix: storage.Key("c/"), - }, storage.Items{ - newItem("c/", "c/", false), - newItem("c//", "", true), - newItem("c/1", "c/1", false), - }}, - {"prefix c slash slash", - storage.IterateOptions{ - Prefix: storage.Key("c//"), - }, storage.Items{ - newItem("c//", "c//", false), - }}, - }) -} diff --git a/storage/testsuite/test_iterate_all.go b/storage/testsuite/test_iterate_all.go deleted file mode 100644 index 21ff6e770..000000000 --- a/storage/testsuite/test_iterate_all.go +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package testsuite - -import ( - "math/rand" - "testing" - - "storj.io/common/testcontext" - "storj.io/storj/storage" -) - -func testIterateAll(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { - items := storage.Items{ - newItem("a", "a", false), - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - } - rand.Shuffle(len(items), items.Swap) - defer cleanupItems(t, ctx, store, items) - - if err := storage.PutAll(ctx, store, items...); err != nil { - t.Fatalf("failed to setup: %v", err) - } - - testIterations(t, ctx, store, []iterationTest{ - {"no limits", - storage.IterateOptions{ - Recurse: true, - }, storage.Items{ - newItem("a", "a", false), - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"no limits with non-nil first", - storage.IterateOptions{ - Recurse: true, - First: storage.Key(""), - }, storage.Items{ - newItem("a", "a", false), - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"at a", - storage.IterateOptions{ - First: storage.Key("a"), - Recurse: true, - }, storage.Items{ - newItem("a", "a", false), - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"after a", - storage.IterateOptions{ - First: storage.NextKey(storage.Key("a")), - Recurse: true, - }, storage.Items{ - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"at b", - storage.IterateOptions{ - First: storage.Key("b"), - Recurse: true, - }, storage.Items{ - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"after b", - storage.IterateOptions{ - First: storage.NextKey(storage.Key("b")), - Recurse: true, - }, storage.Items{ - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"at c", - storage.IterateOptions{ - First: storage.Key("c"), - Recurse: true, - }, storage.Items{ - newItem("c", "c", false), - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - {"after c", - storage.IterateOptions{ - First: storage.NextKey(storage.Key("c")), - Recurse: true, - }, storage.Items{ - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"at e", - storage.IterateOptions{ - First: storage.Key("e"), - Recurse: true, - }, storage.Items{ - newItem("g", "g", false), - newItem("h", "h", false), - }}, - - {"prefix b slash", - storage.IterateOptions{ - Prefix: storage.Key("b/"), - Recurse: true, - }, storage.Items{ - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - }}, - {"prefix b slash at a", - storage.IterateOptions{ - Prefix: storage.Key("b/"), First: storage.Key("a"), - Recurse: true, - }, storage.Items{ - newItem("b/1", "b/1", false), - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - }}, - {"prefix b slash at b slash 2", - storage.IterateOptions{ - Prefix: storage.Key("b/"), First: storage.Key("b/2"), - Recurse: true, - }, storage.Items{ - newItem("b/2", "b/2", false), - newItem("b/3", "b/3", false), - }}, - - {"prefix c slash", - storage.IterateOptions{ - Prefix: storage.Key("c/"), - Recurse: true, - }, storage.Items{ - newItem("c/", "c/", false), - newItem("c//", "c//", false), - newItem("c/1", "c/1", false), - }}, - - {"prefix c slash slash", - storage.IterateOptions{ - Prefix: storage.Key("c//"), - Recurse: true, - }, storage.Items{ - newItem("c//", "c//", false), - }}, - }) -} diff --git a/storage/testsuite/test_list.go b/storage/testsuite/test_list.go deleted file mode 100644 index 0bb64765b..000000000 --- a/storage/testsuite/test_list.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package testsuite - -import ( - "math/rand" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - - "storj.io/common/testcontext" - "storj.io/storj/storage" -) - -func testList(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { - items := storage.Items{ - newItem("path/0", "\x00\xFF\x00", false), - newItem("path/1", "\x01\xFF\x01", false), - newItem("path/2", "\x02\xFF\x02", false), - newItem("path/3", "\x03\xFF\x03", false), - newItem("path/4", "\x04\xFF\x04", false), - newItem("path/5", "\x05\xFF\x05", false), - } - rand.Shuffle(len(items), items.Swap) - defer cleanupItems(t, ctx, store, items) - - if err := storage.PutAll(ctx, store, items...); err != nil { - t.Fatalf("failed to setup: %v", err) - } - - type Test struct { - Name string - First storage.Key - Limit int - Expected storage.Keys - } - - newKeys := func(xs ...string) storage.Keys { - var keys storage.Keys - for _, x := range xs { - keys = append(keys, storage.Key(x)) - } - return keys - } - - tests := []Test{ - {"without key", - nil, 3, - newKeys("path/0", "path/1", "path/2")}, - {"without key, limit 0", - nil, 0, - newKeys("path/0", "path/1", "path/2", "path/3", "path/4", "path/5")}, - {"with key", - storage.Key("path/2"), 3, - newKeys("path/2", "path/3", "path/4")}, - {"without key 100", - nil, 100, - newKeys("path/0", "path/1", "path/2", "path/3", "path/4", "path/5")}, - } - - for _, test := range tests { - var keys storage.Keys - var err error - keys, err = store.List(ctx, test.First, test.Limit) - if err != nil { - t.Errorf("%s: %s", test.Name, err) - continue - } - if diff := cmp.Diff(test.Expected, keys, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("%s: (-want +got)\n%s", test.Name, diff) - } - } -} diff --git a/storage/testsuite/test_listv2.go b/storage/testsuite/test_listv2.go deleted file mode 100644 index 78cdd4c19..000000000 --- a/storage/testsuite/test_listv2.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package testsuite - -import ( - "math/rand" - "sort" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - - "storj.io/common/testcontext" - "storj.io/storj/storage" -) - -func testListV2(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { - items := storage.Items{ - newItem("music/a-song1.mp3", "1", false), - newItem("music/a-song2.mp3", "2", false), - newItem("music/my-album/song3.mp3", "3", false), - newItem("music/my-album/song4.mp3", "4", false), - newItem("music/z-song5.mp3", "5", false), - newItem("sample.jpg", "6", false), - newItem("videos/movie.mkv", "7", false), - } - rand.Shuffle(len(items), items.Swap) - defer cleanupItems(t, ctx, store, items) - - if err := storage.PutAll(ctx, store, items...); err != nil { - t.Fatalf("failed to setup: %v", err) - } - - sort.Sort(items) - - type Test struct { - Name string - Options storage.ListOptions - More bool - Expected storage.Items - } - - tests := []Test{ - {"all", - storage.ListOptions{ - Recursive: true, - IncludeValue: true, - }, - false, items, - }, - {"music", - storage.ListOptions{ - Prefix: storage.Key("music/"), - }, - false, storage.Items{ - newItem("a-song1.mp3", "", false), - newItem("a-song2.mp3", "", false), - newItem("my-album/", "", true), - newItem("z-song5.mp3", "", false), - }, - }, - {"music recursive", - storage.ListOptions{ - Recursive: true, - Prefix: storage.Key("music/"), - }, - false, storage.Items{ - newItem("a-song1.mp3", "", false), - newItem("a-song2.mp3", "", false), - newItem("my-album/song3.mp3", "", false), - newItem("my-album/song4.mp3", "", false), - newItem("z-song5.mp3", "", false), - }, - }, - {"all non-recursive without value (default)", - storage.ListOptions{}, - false, storage.Items{ - newItem("music/", "", true), - newItem("sample.jpg", "", false), - newItem("videos/", "", true), - }, - }, - {"all non-recursive", - storage.ListOptions{ - IncludeValue: true, - }, - false, storage.Items{ - newItem("music/", "", true), - newItem("sample.jpg", "6", false), - newItem("videos/", "", true), - }, - }, - {"start after 2 recursive", - storage.ListOptions{ - Recursive: true, - StartAfter: storage.Key("music/a-song1.mp3"), - Limit: 2, - }, - true, storage.Items{ - newItem("music/a-song2.mp3", "", false), - newItem("music/my-album/song3.mp3", "", false), - }, - }, - {"start after non-existing 2 recursive", - storage.ListOptions{ - Recursive: true, - StartAfter: storage.Key("music/a-song15.mp3"), - Limit: 2, - }, - true, storage.Items{ - newItem("music/a-song2.mp3", "", false), - newItem("music/my-album/song3.mp3", "", false), - }, - }, - {"start after 2", - storage.ListOptions{ - Prefix: storage.Key("music/"), - StartAfter: storage.Key("a-song1.mp3"), - Limit: 2, - }, - true, storage.Items{ - newItem("a-song2.mp3", "", false), - newItem("my-album/", "", true), - }, - }, - } - - for _, test := range tests { - got, more, err := storage.ListV2(ctx, store, test.Options) - if err != nil { - t.Errorf("%v: %v", test.Name, err) - continue - } - if more != test.More { - t.Errorf("%v: more %v expected %v", test.Name, more, test.More) - } - if diff := cmp.Diff(test.Expected, got, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("%s: (-want +got)\n%s", test.Name, diff) - } - } -} diff --git a/storage/testsuite/test_parallel.go b/storage/testsuite/test_parallel.go index d0ec660cb..9f1fe07b2 100644 --- a/storage/testsuite/test_parallel.go +++ b/storage/testsuite/test_parallel.go @@ -42,31 +42,7 @@ func testParallel(t *testing.T, ctx *testcontext.Context, store storage.KeyValue t.Fatalf("invalid value for %q = %v: got %v", item.Key, item.Value, value) } - // GetAll - values, err := store.GetAll(ctx, []storage.Key{item.Key}) - if len(values) != 1 { - t.Fatalf("failed to GetAll: %v", err) - } - - if !bytes.Equal([]byte(values[0]), []byte(item.Value)) { - t.Fatalf("invalid GetAll %q = %v: got %v", item.Key, item.Value, values[i]) - } - - // Update value - nextValue := storage.Value(string(item.Value) + "X") - err = store.CompareAndSwap(ctx, item.Key, item.Value, nextValue) - if err != nil { - t.Fatalf("failed to update %q = %v: %v", item.Key, nextValue, err) - } - - value, err = store.Get(ctx, item.Key) - if err != nil { - t.Fatalf("failed to get %q = %v: %v", item.Key, nextValue, err) - } - if !bytes.Equal([]byte(value), []byte(nextValue)) { - t.Fatalf("invalid updated value for %q = %v: got %v", item.Key, nextValue, value) - } - + // Delete err = store.Delete(ctx, item.Key) if err != nil { t.Fatalf("failed to delete %v: %v", item.Key, err) diff --git a/storage/testsuite/test_prefix.go b/storage/testsuite/test_prefix.go deleted file mode 100644 index 6e76daa68..000000000 --- a/storage/testsuite/test_prefix.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package testsuite - -import ( - "math/rand" - "testing" - - "storj.io/common/testcontext" - "storj.io/storj/storage" -) - -func testPrefix(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { - items := storage.Items{ - newItem("x-a", "a", false), - newItem("x-b/1", "b/1", false), - newItem("x-b/2", "b/2", false), - newItem("x-b/3", "b/3", false), - newItem("y-c", "c", false), - newItem("y-c/", "c/", false), - newItem("y-c//", "c//", false), - newItem("y-c/1", "c/1", false), - newItem("y-g", "g", false), - newItem("y-h", "h", false), - } - rand.Shuffle(len(items), items.Swap) - defer cleanupItems(t, ctx, store, items) - - if err := storage.PutAll(ctx, store, items...); err != nil { - t.Fatalf("failed to setup: %v", err) - } - - testIterations(t, ctx, store, []iterationTest{ - {"prefix x dash b slash", - storage.IterateOptions{ - Prefix: storage.Key("x-"), First: storage.Key("x-b"), - Recurse: true, - }, storage.Items{ - newItem("x-b/1", "b/1", false), - newItem("x-b/2", "b/2", false), - newItem("x-b/3", "b/3", false), - }}, - {"prefix x dash b slash", - storage.IterateOptions{ - Prefix: storage.Key("x-"), First: storage.Key("x-b"), - }, storage.Items{ - newItem("x-b/", "", true), - }}, - {"prefix y- slash", - storage.IterateOptions{ - Prefix: storage.Key("y-"), - Recurse: true, - }, storage.Items{ - newItem("y-c", "c", false), - newItem("y-c/", "c/", false), - newItem("y-c//", "c//", false), - newItem("y-c/1", "c/1", false), - newItem("y-g", "g", false), - newItem("y-h", "h", false), - }}, - }) -} diff --git a/storage/testsuite/utils.go b/storage/testsuite/utils.go index 39996c95d..f2cbe08d6 100644 --- a/storage/testsuite/utils.go +++ b/storage/testsuite/utils.go @@ -4,12 +4,8 @@ package testsuite import ( - "context" "testing" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "storj.io/common/testcontext" "storj.io/storj/storage" ) @@ -23,82 +19,7 @@ func newItem(key, value string, isPrefix bool) storage.ListItem { } func cleanupItems(t testing.TB, ctx *testcontext.Context, store storage.KeyValueStore, items storage.Items) { - bulkDeleter, ok := store.(BulkDeleter) - if ok { - err := bulkDeleter.BulkDelete(ctx, items) - if err != nil { - t.Fatalf("could not do bulk cleanup of items: %v", err) - } - } else { - for _, item := range items { - _ = store.Delete(ctx, item.Key) - } + for _, item := range items { + _ = store.Delete(ctx, item.Key) } } - -// BulkImporter identifies KV storage facilities that can do bulk importing of items more -// efficiently than inserting one-by-one. -type BulkImporter interface { - BulkImport(context.Context, storage.Iterator) error -} - -// BulkDeleter identifies KV storage facilities that can delete multiple items efficiently. -type BulkDeleter interface { - BulkDelete(context.Context, storage.Items) error -} - -// BulkCleaner identifies KV storage facilities that can delete all items efficiently. -type BulkCleaner interface { - BulkDeleteAll(ctx context.Context) error -} - -type iterationTest struct { - Name string - Options storage.IterateOptions - Expected storage.Items -} - -func testIterations(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore, tests []iterationTest) { - t.Helper() - for _, test := range tests { - items, err := iterateItems(ctx, store, test.Options, -1) - if err != nil { - t.Errorf("%s: %v", test.Name, err) - continue - } - if diff := cmp.Diff(test.Expected, items, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("%s: (-want +got)\n%s", test.Name, diff) - } - } -} - -func isEmptyKVStore(tb testing.TB, ctx *testcontext.Context, store storage.KeyValueStore) bool { - tb.Helper() - keys, err := store.List(ctx, storage.Key(""), 1) - if err != nil { - tb.Fatalf("Failed to check if KeyValueStore is empty: %v", err) - } - return len(keys) == 0 -} - -type collector struct { - Items storage.Items - Limit int -} - -func (collect *collector) include(ctx context.Context, it storage.Iterator) error { - var item storage.ListItem - for (collect.Limit < 0 || len(collect.Items) < collect.Limit) && it.Next(ctx, &item) { - collect.Items = append(collect.Items, storage.CloneItem(item)) - } - return nil -} - -func iterateItems(ctx *testcontext.Context, store storage.KeyValueStore, opts storage.IterateOptions, limit int) (storage.Items, error) { - collect := &collector{Limit: limit} - err := store.Iterate(ctx, opts, collect.include) - if err != nil { - return nil, err - } - return collect.Items, nil -} diff --git a/storage/util.go b/storage/util.go index 0aeff0679..143fe6bba 100644 --- a/storage/util.go +++ b/storage/util.go @@ -8,18 +8,6 @@ import ( "fmt" ) -// NextKey returns the successive key. -func NextKey(key Key) Key { - return append(CloneKey(key), 0) -} - -// AfterPrefix returns the key after prefix. -func AfterPrefix(key Key) Key { - after := CloneKey(key) - after[len(after)-1]++ - return after -} - // CloneKey creates a copy of key. func CloneKey(key Key) Key { return append(Key{}, key...) }