diff --git a/certificate/authorization/db.go b/certificate/authorization/db.go index 540b2efd6..4bfc6ada5 100644 --- a/certificate/authorization/db.go +++ b/certificate/authorization/db.go @@ -148,36 +148,33 @@ func (authDB *DB) Get(ctx context.Context, userID string) (_ Group, err error) { // UserIDs returns a list of all userIDs present in the authorization database. func (authDB *DB) UserIDs(ctx context.Context) (userIDs []string, err error) { defer mon.Task()(&ctx)(&err) - err = authDB.db.Iterate(ctx, storage.IterateOptions{ - Recurse: true, - }, func(ctx context.Context, iterator storage.Iterator) error { - var listItem storage.ListItem - for iterator.Next(ctx, &listItem) { - userIDs = append(userIDs, listItem.Key.String()) - } - return nil - }) + + err = authDB.db.Range(ctx, + func(ctx context.Context, key storage.Key, _ storage.Value) error { + userIDs = append(userIDs, key.String()) + return nil + }) return userIDs, ErrDBInternal.Wrap(err) } // List returns all authorizations in the database. func (authDB *DB) List(ctx context.Context) (auths Group, err error) { defer mon.Task()(&ctx)(&err) - err = authDB.db.Iterate(ctx, storage.IterateOptions{ - Recurse: true, - }, func(ctx context.Context, iterator storage.Iterator) error { - var listErrs errs.Group - var listItem storage.ListItem - for iterator.Next(ctx, &listItem) { - var nextAuths Group - if err := nextAuths.Unmarshal(listItem.Value); err != nil { - listErrs.Add(err) + + var errs errs.Group + err = authDB.db.Range(ctx, + func(ctx context.Context, key storage.Key, value storage.Value) error { + var group Group + err := group.Unmarshal(value) + if err != nil { + errs.Add(err) + return nil } - auths = append(auths, nextAuths...) - } - return ErrDBInternal.Wrap(listErrs.Err()) - }) - return auths, ErrDBInternal.Wrap(err) + auths = append(auths, group...) + return nil + }) + errs.Add(err) + return auths, ErrDBInternal.Wrap(errs.Err()) } // Claim marks an authorization as claimed and records claim information. diff --git a/private/revocation/db.go b/private/revocation/db.go index 2d15caf30..96fa5d56a 100644 --- a/private/revocation/db.go +++ b/private/revocation/db.go @@ -108,25 +108,16 @@ func (db *DB) List(ctx context.Context) (revs []*extensions.Revocation, err erro return nil, nil } - keys, err := db.store.List(ctx, []byte{}, 0) - if err != nil { - return nil, extensions.ErrRevocationDB.Wrap(err) - } - - marshaledRevs, err := db.store.GetAll(ctx, keys) - if err != nil { - return nil, extensions.ErrRevocationDB.Wrap(err) - } - - for _, revBytes := range marshaledRevs { + err = db.store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { rev := new(extensions.Revocation) - if err := rev.Unmarshal(revBytes); err != nil { - return nil, extensions.ErrRevocationDB.Wrap(err) + if err := rev.Unmarshal(value); err != nil { + return extensions.ErrRevocationDB.Wrap(err) } revs = append(revs, rev) - } - return revs, nil + return nil + }) + return revs, extensions.ErrRevocationDB.Wrap(err) } // TestGetStore returns the internal store for testing. diff --git a/storage/boltdb/client.go b/storage/boltdb/client.go index 7b3d49c9e..21f35eb54 100644 --- a/storage/boltdb/client.go +++ b/storage/boltdb/client.go @@ -286,6 +286,17 @@ func (client *Client) IterateWithoutLookupLimit(ctx context.Context, opts storag }) } +// Range iterates over all items in unspecified order. +func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { + defer mon.Task()(&ctx)(&err) + + return client.view(func(bucket *bbolt.Bucket) error { + return bucket.ForEach(func(k, v []byte) error { + return fn(ctx, storage.Key(k), storage.Value(v)) + }) + }) +} + type advancer interface { PositionToFirst(prefix, first storage.Key) (key, value []byte) SkipPrefix(prefix storage.Key) (key, value []byte) diff --git a/storage/common.go b/storage/common.go index 4522248a8..d4a6a427f 100644 --- a/storage/common.go +++ b/storage/common.go @@ -68,6 +68,9 @@ type KeyValueStore interface { Delete(context.Context, Key) error // DeleteMultiple deletes keys and returns nil for. DeleteMultiple(context.Context, []Key) (Items, error) + // Range iterates over all items in unspecified order. + // The Key and Value are valid only for the duration of callback. + Range(ctx context.Context, fn func(context.Context, Key, Value) error) error // List lists all keys starting from start and upto limit items. List(ctx context.Context, start Key, limit int) (Keys, error) // Iterate iterates over items based on opts. diff --git a/storage/redis/client.go b/storage/redis/client.go index 2e3940dd4..7496a3a77 100644 --- a/storage/redis/client.go +++ b/storage/redis/client.go @@ -183,6 +183,35 @@ func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage. return values, nil } +// Range iterates over all items in unspecified order. +func (client *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { + defer mon.Task()(&ctx)(&err) + + it := client.db.Scan(ctx, 0, "", 0).Iterator() + + var lastKey string + var lastOk bool + for it.Next(ctx) { + key := it.Val() + // redis may return duplicates + if lastOk && key == lastKey { + continue + } + lastKey, lastOk = key, true + + value, err := get(ctx, client.db, storage.Key(key)) + if err != nil { + return Error.Wrap(err) + } + + if err := fn(ctx, storage.Key(key), value); err != nil { + return err + } + } + + return Error.Wrap(it.Err()) +} + // Iterate iterates over items based on opts. func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/storage/storelogger/logger.go b/storage/storelogger/logger.go index b132161d9..33b567590 100644 --- a/storage/storelogger/logger.go +++ b/storage/storelogger/logger.go @@ -77,6 +77,20 @@ func (store *Logger) List(ctx context.Context, first storage.Key, limit int) (_ return keys, err } +// Range iterates over all items in unspecified order. +func (store *Logger) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) (err error) { + defer mon.Task()(&ctx)(&err) + store.log.Debug("Range") + return store.store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { + store.log.Debug(" ", + zap.ByteString("key", key), + zap.Int("value length", len(value)), + zap.Binary("truncated value", truncate(value)), + ) + return fn(ctx, key, value) + }) +} + // Iterate iterates over items based on opts. func (store *Logger) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/storage/teststore/store.go b/storage/teststore/store.go index 41d77d4a8..194b0dd0b 100644 --- a/storage/teststore/store.go +++ b/storage/teststore/store.go @@ -34,6 +34,7 @@ type Client struct { GetAll int Delete int Close int + Range int Iterate int CompareAndSwap int } @@ -231,6 +232,25 @@ func (store *Client) Close() error { return nil } +// Range iterates over all items in unspecified order. +func (store *Client) Range(ctx context.Context, fn func(context.Context, storage.Key, storage.Value) error) error { + store.mu.Lock() + store.CallCount.Range++ + if store.forcedError() { + store.mu.Unlock() + return errors.New("internal error") + } + items := append([]storage.ListItem{}, store.Items...) + store.mu.Unlock() + + for _, item := range items { + if err := fn(ctx, item.Key, item.Value); err != nil { + return err + } + } + return nil +} + // Iterate iterates over items based on opts. func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/storage/testsuite/test.go b/storage/testsuite/test.go index f817728c9..b592c8595 100644 --- a/storage/testsuite/test.go +++ b/storage/testsuite/test.go @@ -25,6 +25,7 @@ func RunTests(t *testing.T, store storage.KeyValueStore) { defer ctx.Cleanup() t.Run("CRUD", func(t *testing.T) { testCRUD(t, ctx, store) }) t.Run("Constraints", func(t *testing.T) { testConstraints(t, ctx, store) }) + t.Run("Range", func(t *testing.T) { testRange(t, ctx, store) }) t.Run("Iterate", func(t *testing.T) { testIterate(t, ctx, store) }) t.Run("IterateAll", func(t *testing.T) { testIterateAll(t, ctx, store) }) t.Run("Prefix", func(t *testing.T) { testPrefix(t, ctx, store) }) diff --git a/storage/testsuite/test_range.go b/storage/testsuite/test_range.go new file mode 100644 index 000000000..3be9e36bb --- /dev/null +++ b/storage/testsuite/test_range.go @@ -0,0 +1,59 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package testsuite + +import ( + "context" + "errors" + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/storj/storage" +) + +func testRange(t *testing.T, ctx *testcontext.Context, store storage.KeyValueStore) { + err := store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { + return errors.New("empty store") + }) + require.NoError(t, err) + + items := storage.Items{ + newItem("a", "a", false), + newItem("b/1", "b/1", false), + newItem("b/2", "b/2", false), + newItem("b/3", "b/3", false), + newItem("c", "c", false), + newItem("c/", "c/", false), + newItem("c//", "c//", false), + newItem("c/1", "c/1", false), + newItem("g", "g", false), + newItem("h", "h", false), + } + rand.Shuffle(len(items), items.Swap) + defer cleanupItems(t, ctx, store, items) + + if err := storage.PutAll(ctx, store, items...); err != nil { + t.Fatalf("failed to setup: %v", err) + } + + var output storage.Items + err = store.Range(ctx, func(ctx context.Context, key storage.Key, value storage.Value) error { + output = append(output, storage.ListItem{ + Key: append([]byte{}, key...), + Value: append([]byte{}, value...), + }) + return nil + }) + require.NoError(t, err) + + expected := storage.CloneItems(items) + sort.Sort(expected) + sort.Sort(output) + + require.EqualValues(t, expected, output) +}