Enforce lookup limit for storage (#312)

This commit is contained in:
Egon Elbre 2018-09-07 12:00:00 +03:00 committed by GitHub
parent ff57fa5c00
commit 79354bf0ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 78 additions and 35 deletions

View File

@ -27,11 +27,6 @@ var (
segmentError = errs.Class("segment error") segmentError = errs.Class("segment error")
) )
// ListPageLimit is the maximum number of items that will be returned by a list
// request.
// TODO(kaloyan): make it configurable
const ListPageLimit = 1000
// Server implements the network state RPC service // Server implements the network state RPC service
type Server struct { type Server struct {
DB storage.KeyValueStore DB storage.KeyValueStore
@ -137,10 +132,6 @@ func (s *Server) List(ctx context.Context, req *pb.ListRequest) (resp *pb.ListRe
s.logger.Debug("entering pointerdb list") s.logger.Debug("entering pointerdb list")
limit := int(req.GetLimit()) limit := int(req.GetLimit())
if limit <= 0 || limit > ListPageLimit {
limit = ListPageLimit
}
if err = s.validateAuth(req.GetAPIKey()); err != nil { if err = s.validateAuth(req.GetAPIKey()); err != nil {
return nil, err return nil, err
} }

View File

@ -5,7 +5,6 @@ package boltdb
import ( import (
"bytes" "bytes"
"fmt"
"time" "time"
"storj.io/storj/pkg/utils" "storj.io/storj/pkg/utils"
@ -24,7 +23,6 @@ type Client struct {
const ( const (
// fileMode sets permissions so owner can read and write // fileMode sets permissions so owner can read and write
fileMode = 0600 fileMode = 0600
maxKeyLookup = 100
defaultTimeout = 1 * time.Second defaultTimeout = 1 * time.Second
) )
@ -116,12 +114,11 @@ func (client *Client) Close() error {
// GetAll finds all values for the provided keys up to 100 keys // GetAll finds all values for the provided keys up to 100 keys
// if more keys are provided than the maximum an error will be returned. // if more keys are provided than the maximum an error will be returned.
func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) { func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
lk := len(keys) if len(keys) > storage.LookupLimit {
if lk > maxKeyLookup { return nil, storage.ErrLimitExceeded
return nil, Error.New(fmt.Sprintf("requested %d keys, maximum is %d", lk, maxKeyLookup))
} }
vals := make(storage.Values, 0, lk) vals := make(storage.Values, 0, len(keys))
err := client.view(func(bucket *bolt.Bucket) error { err := client.view(func(bucket *bolt.Bucket) error {
for _, key := range keys { for _, key := range keys {
val := bucket.Get([]byte(key)) val := bucket.Get([]byte(key))

View File

@ -19,6 +19,9 @@ var ErrKeyNotFound = errs.Class("key not found")
// ErrEmptyKey is returned when an empty key is used in Put // ErrEmptyKey is returned when an empty key is used in Put
var ErrEmptyKey = errors.New("empty key") var ErrEmptyKey = errors.New("empty key")
// ErrLimitExceeded is returned when request limit is exceeded
var ErrLimitExceeded = errors.New("limit exceeded")
// Key is the type for the keys in a `KeyValueStore` // Key is the type for the keys in a `KeyValueStore`
type Key []byte type Key []byte
@ -37,6 +40,9 @@ type Limit int
// Items keeps all ListItem // Items keeps all ListItem
type Items []ListItem type Items []ListItem
// LookupLimit is enforced by storage implementations
const LookupLimit = 1000
// ListItem returns Key, Value, IsPrefix // ListItem returns Key, Value, IsPrefix
type ListItem struct { type ListItem struct {
Key Key Key Key

View File

@ -4,15 +4,13 @@
package storage package storage
// ListKeys returns keys starting from first and upto limit // ListKeys returns keys starting from first and upto limit
// limit is capped to LookupLimit
func ListKeys(store KeyValueStore, first Key, limit Limit) (Keys, error) { func ListKeys(store KeyValueStore, first Key, limit Limit) (Keys, error) {
const unlimited = Limit(1 << 31) if limit <= 0 || limit > LookupLimit {
limit = LookupLimit
keys := make(Keys, 0, limit)
if limit == 0 {
// TODO: this shouldn't be probably the case
limit = unlimited
} }
keys := make(Keys, 0, limit)
err := store.Iterate(IterateOptions{ err := store.Iterate(IterateOptions{
First: first, First: first,
Recurse: true, Recurse: true,
@ -31,15 +29,13 @@ func ListKeys(store KeyValueStore, first Key, limit Limit) (Keys, error) {
} }
// ReverseListKeys returns keys starting from first and upto limit in reverse order // ReverseListKeys returns keys starting from first and upto limit in reverse order
// limit is capped to LookupLimit
func ReverseListKeys(store KeyValueStore, first Key, limit Limit) (Keys, error) { func ReverseListKeys(store KeyValueStore, first Key, limit Limit) (Keys, error) {
const unlimited = Limit(1 << 31) if limit <= 0 || limit > LookupLimit {
limit = LookupLimit
keys := make(Keys, 0, limit)
if limit == 0 {
// TODO: this shouldn't be probably the case
limit = unlimited
} }
keys := make(Keys, 0, limit)
err := store.Iterate(IterateOptions{ err := store.Iterate(IterateOptions{
First: first, First: first,
Recurse: true, Recurse: true,

View File

@ -22,7 +22,12 @@ type ListOptions struct {
} }
// ListV2 lists all keys corresponding to ListOptions // ListV2 lists all keys corresponding to ListOptions
// limit is capped to LookupLimit
func ListV2(store KeyValueStore, opts ListOptions) (result Items, more More, err error) { func ListV2(store KeyValueStore, opts ListOptions) (result Items, more More, err error) {
if opts.Limit <= 0 || opts.Limit > LookupLimit {
opts.Limit = LookupLimit
}
if opts.StartAfter != nil && opts.EndBefore != nil { if opts.StartAfter != nil && opts.EndBefore != nil {
return nil, false, errors.New("start-after and end-before cannot be combined") return nil, false, errors.New("start-after and end-before cannot be combined")
} }

View File

@ -4,7 +4,6 @@
package redis package redis
import ( import (
"fmt"
"sort" "sort"
"time" "time"
@ -18,10 +17,7 @@ var (
Error = errs.Class("redis error") Error = errs.Class("redis error")
) )
const ( const defaultNodeExpiration = 61 * time.Minute
defaultNodeExpiration = 61 * time.Minute
maxKeyLookup = 100
)
// Client is the entrypoint into Redis // Client is the entrypoint into Redis
type Client struct { type Client struct {
@ -101,8 +97,8 @@ func (client *Client) Close() error {
// The maximum keys returned will be 100. If more than that is requested an // The maximum keys returned will be 100. If more than that is requested an
// error will be returned // error will be returned
func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) { func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
if len(keys) > maxKeyLookup { if len(keys) > storage.LookupLimit {
return nil, Error.New(fmt.Sprintf("requested %d keys, maximum is %d", len(keys), maxKeyLookup)) return nil, storage.ErrLimitExceeded
} }
keyStrings := make([]string, len(keys)) keyStrings := make([]string, len(keys))

View File

@ -88,6 +88,9 @@ func (store *Client) Get(key storage.Key) (storage.Value, error) {
// GetAll gets all values from the store // GetAll gets all values from the store
func (store *Client) GetAll(keys storage.Keys) (storage.Values, error) { func (store *Client) GetAll(keys storage.Keys) (storage.Values, error) {
store.CallCount.GetAll++ store.CallCount.GetAll++
if len(keys) > storage.LookupLimit {
return nil, storage.ErrLimitExceeded
}
values := storage.Values{} values := storage.Values{}
for _, key := range keys { for _, key := range keys {

View File

@ -4,6 +4,7 @@
package testsuite package testsuite
import ( import (
"strconv"
"testing" "testing"
"storj.io/storj/storage" "storj.io/storj/storage"
@ -24,6 +25,21 @@ func RunTests(t *testing.T, store storage.KeyValueStore) {
} }
func testConstraints(t *testing.T, store storage.KeyValueStore) { func testConstraints(t *testing.T, store storage.KeyValueStore) {
var items storage.Items
for i := 0; i < storage.LookupLimit+5; i++ {
items = append(items, storage.ListItem{
Key: storage.Key("test-" + strconv.Itoa(i)),
Value: storage.Value("xyz"),
})
}
for _, item := range items {
if err := store.Put(item.Key, item.Value); err != nil {
t.Fatal(err)
}
}
defer cleanupItems(store, items)
t.Run("Put Empty", func(t *testing.T) { t.Run("Put Empty", func(t *testing.T) {
var key storage.Key var key storage.Key
var val storage.Value var val storage.Value
@ -34,4 +50,36 @@ func testConstraints(t *testing.T, store storage.KeyValueStore) {
t.Fatal("putting empty key should fail") t.Fatal("putting empty key should fail")
} }
}) })
t.Run("GetAll limit", func(t *testing.T) {
_, err := store.GetAll(items[:storage.LookupLimit].GetKeys())
if err != nil {
t.Fatalf("GetAll LookupLimit should succeed: %v", err)
}
_, err = store.GetAll(items[:storage.LookupLimit+1].GetKeys())
if err == nil && err == storage.ErrLimitExceeded {
t.Fatalf("GetAll LookupLimit+1 should fail: %v", err)
}
})
t.Run("List limit", func(t *testing.T) {
keys, err := store.List(nil, storage.LookupLimit)
if err != nil || len(keys) != storage.LookupLimit {
t.Fatalf("List LookupLimit should succeed: %v / got %d", err, len(keys))
}
keys, err = store.ReverseList(nil, storage.LookupLimit)
if err != nil || len(keys) != storage.LookupLimit {
t.Fatalf("ReverseList LookupLimit should succeed: %v / got %d", err, len(keys))
}
_, err = store.List(nil, storage.LookupLimit+1)
if err != nil || len(keys) != storage.LookupLimit {
t.Fatalf("List LookupLimit+1 shouldn't fail: %v / got %d", err, len(keys))
}
_, err = store.ReverseList(nil, storage.LookupLimit+1)
if err != nil || len(keys) != storage.LookupLimit {
t.Fatalf("ReverseList LookupLimit+1 shouldn't fail: %v / got %d", err, len(keys))
}
})
} }

View File

@ -10,6 +10,7 @@ import (
"storj.io/storj/storage" "storj.io/storj/storage"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
) )
func testList(t *testing.T, store storage.KeyValueStore) { func testList(t *testing.T, store storage.KeyValueStore) {
@ -83,7 +84,7 @@ func testList(t *testing.T, store storage.KeyValueStore) {
t.Errorf("%s: %s", test.Name, err) t.Errorf("%s: %s", test.Name, err)
continue continue
} }
if diff := cmp.Diff(test.Expected, keys); diff != "" { if diff := cmp.Diff(test.Expected, keys, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s: (-want +got)\n%s", test.Name, diff) t.Errorf("%s: (-want +got)\n%s", test.Name, diff)
} }
} }