storj/storage/redis/client.go

220 lines
5.2 KiB
Go
Raw Normal View History

2018-04-18 17:55:28 +01:00
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
2018-04-18 16:34:15 +01:00
package redis
import (
"net/url"
"sort"
"strconv"
2018-04-18 16:34:15 +01:00
"time"
"github.com/go-redis/redis"
"github.com/zeebo/errs"
"storj.io/storj/storage"
2018-04-18 16:34:15 +01:00
)
var (
// Error is a redis error
Error = errs.Class("redis error")
)
const defaultNodeExpiration = 61 * time.Minute
2018-04-18 16:34:15 +01:00
// Client is the entrypoint into Redis
type Client struct {
db *redis.Client
TTL time.Duration
2018-04-18 16:34:15 +01:00
}
// NewClient returns a configured Client instance, verifying a successful connection to redis
func NewClient(address, password string, db int) (*Client, error) {
client := &Client{
db: redis.NewClient(&redis.Options{
2018-04-18 16:34:15 +01:00
Addr: address,
Password: password,
DB: db,
}),
TTL: defaultNodeExpiration,
2018-04-18 16:34:15 +01:00
}
// ping here to verify we are able to connect to redis with the initialized client.
if err := client.db.Ping().Err(); err != nil {
all: fix govet warnings (#255) Fixes go1.11 vet warnings. Cancel on WithTimeout must always be called to avoid memory leak: pkg/provider/provider.go:73: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak Range over non-copyable things: pkg/pool/connection_pool_test.go:32: range var v copies lock: struct{pool pool.ConnectionPool; key string; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:56: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:83: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected interface{}; expectedError error} contains pool.ConnectionPool contains sync.RWMutex zeebo/errs package always requires formatting directives: pkg/peertls/peertls.go:50: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:47: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:87: Class.New call has arguments but no formatting directives pkg/overlay/cache.go:94: Class.New call has arguments but no formatting directives pkg/provider/certificate_authority.go:98: New call has arguments but no formatting directives pkg/provider/identity.go:96: New call has arguments but no formatting directives pkg/provider/utils.go:124: New call needs 1 arg but has 2 args pkg/provider/utils.go:136: New call needs 1 arg but has 2 args storage/redis/client.go:44: Class.New call has arguments but no formatting directives storage/redis/client.go:64: Class.New call has arguments but no formatting directives storage/redis/client.go:75: Class.New call has arguments but no formatting directives storage/redis/client.go:80: Class.New call has arguments but no formatting directives storage/redis/client.go:92: Class.New call has arguments but no formatting directives storage/redis/client.go:96: Class.New call has arguments but no formatting directives storage/redis/client.go:102: Class.New call has arguments but no formatting directives storage/redis/client.go:126: Class.New call has arguments but no formatting directives
2018-08-22 07:39:57 +01:00
return nil, Error.New("ping failed: %v", err)
2018-04-18 16:34:15 +01:00
}
return client, nil
2018-04-18 16:34:15 +01:00
}
// NewClientFrom returns a configured Client instance from a redis address, verifying a successful connection to redis
func NewClientFrom(address string) (*Client, error) {
redisurl, err := url.Parse(address)
if err != nil {
return nil, err
}
if redisurl.Scheme != "redis" {
return nil, Error.New("not a redis:// formatted address")
}
q := redisurl.Query()
db, err := strconv.Atoi(q.Get("db"))
if err != nil {
return nil, err
}
return NewClient(redisurl.Host, q.Get("password"), db)
}
// Get looks up the provided key from redis returning either an error or the result.
func (client *Client) Get(key storage.Key) (storage.Value, error) {
2018-11-15 15:31:33 +00:00
if key.IsZero() {
return nil, storage.ErrEmptyKey.New("")
}
value, err := client.db.Get(string(key)).Bytes()
if err == redis.Nil {
return nil, storage.ErrKeyNotFound.New(key.String())
}
if err != nil {
all: fix govet warnings (#255) Fixes go1.11 vet warnings. Cancel on WithTimeout must always be called to avoid memory leak: pkg/provider/provider.go:73: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak Range over non-copyable things: pkg/pool/connection_pool_test.go:32: range var v copies lock: struct{pool pool.ConnectionPool; key string; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:56: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:83: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected interface{}; expectedError error} contains pool.ConnectionPool contains sync.RWMutex zeebo/errs package always requires formatting directives: pkg/peertls/peertls.go:50: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:47: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:87: Class.New call has arguments but no formatting directives pkg/overlay/cache.go:94: Class.New call has arguments but no formatting directives pkg/provider/certificate_authority.go:98: New call has arguments but no formatting directives pkg/provider/identity.go:96: New call has arguments but no formatting directives pkg/provider/utils.go:124: New call needs 1 arg but has 2 args pkg/provider/utils.go:136: New call needs 1 arg but has 2 args storage/redis/client.go:44: Class.New call has arguments but no formatting directives storage/redis/client.go:64: Class.New call has arguments but no formatting directives storage/redis/client.go:75: Class.New call has arguments but no formatting directives storage/redis/client.go:80: Class.New call has arguments but no formatting directives storage/redis/client.go:92: Class.New call has arguments but no formatting directives storage/redis/client.go:96: Class.New call has arguments but no formatting directives storage/redis/client.go:102: Class.New call has arguments but no formatting directives storage/redis/client.go:126: Class.New call has arguments but no formatting directives
2018-08-22 07:39:57 +01:00
return nil, Error.New("get error: %v", err)
}
return value, nil
}
// Put adds a value to the provided key in redis, returning an error on failure.
func (client *Client) Put(key storage.Key, value storage.Value) error {
2018-11-15 15:31:33 +00:00
if key.IsZero() {
return storage.ErrEmptyKey.New("")
}
2018-11-15 15:31:33 +00:00
err := client.db.Set(key.String(), []byte(value), client.TTL).Err()
if err != nil {
all: fix govet warnings (#255) Fixes go1.11 vet warnings. Cancel on WithTimeout must always be called to avoid memory leak: pkg/provider/provider.go:73: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak Range over non-copyable things: pkg/pool/connection_pool_test.go:32: range var v copies lock: struct{pool pool.ConnectionPool; key string; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:56: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:83: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected interface{}; expectedError error} contains pool.ConnectionPool contains sync.RWMutex zeebo/errs package always requires formatting directives: pkg/peertls/peertls.go:50: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:47: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:87: Class.New call has arguments but no formatting directives pkg/overlay/cache.go:94: Class.New call has arguments but no formatting directives pkg/provider/certificate_authority.go:98: New call has arguments but no formatting directives pkg/provider/identity.go:96: New call has arguments but no formatting directives pkg/provider/utils.go:124: New call needs 1 arg but has 2 args pkg/provider/utils.go:136: New call needs 1 arg but has 2 args storage/redis/client.go:44: Class.New call has arguments but no formatting directives storage/redis/client.go:64: Class.New call has arguments but no formatting directives storage/redis/client.go:75: Class.New call has arguments but no formatting directives storage/redis/client.go:80: Class.New call has arguments but no formatting directives storage/redis/client.go:92: Class.New call has arguments but no formatting directives storage/redis/client.go:96: Class.New call has arguments but no formatting directives storage/redis/client.go:102: Class.New call has arguments but no formatting directives storage/redis/client.go:126: Class.New call has arguments but no formatting directives
2018-08-22 07:39:57 +01:00
return Error.New("put error: %v", err)
}
return nil
2018-04-18 16:34:15 +01:00
}
// List returns either a list of keys for which boltdb has values or an error.
func (client *Client) List(first storage.Key, limit int) (storage.Keys, error) {
return storage.ListKeys(client, first, limit)
}
// ReverseList returns either a list of keys for which redis has values or an error.
// Starts from first and iterates backwards
func (client *Client) ReverseList(first storage.Key, limit int) (storage.Keys, error) {
return storage.ReverseListKeys(client, first, limit)
}
// Delete deletes a key/value pair from redis, for a given the key
func (client *Client) Delete(key storage.Key) error {
2018-11-15 15:31:33 +00:00
if key.IsZero() {
return storage.ErrEmptyKey.New("")
}
err := client.db.Del(key.String()).Err()
if err != nil {
all: fix govet warnings (#255) Fixes go1.11 vet warnings. Cancel on WithTimeout must always be called to avoid memory leak: pkg/provider/provider.go:73: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak Range over non-copyable things: pkg/pool/connection_pool_test.go:32: range var v copies lock: struct{pool pool.ConnectionPool; key string; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:56: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected pool.TestFoo; expectedError error} contains pool.ConnectionPool contains sync.RWMutex pkg/pool/connection_pool_test.go:83: range var v copies lock: struct{pool pool.ConnectionPool; key string; value pool.TestFoo; expected interface{}; expectedError error} contains pool.ConnectionPool contains sync.RWMutex zeebo/errs package always requires formatting directives: pkg/peertls/peertls.go:50: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:47: Class.New call has arguments but no formatting directives pkg/peertls/utils.go:87: Class.New call has arguments but no formatting directives pkg/overlay/cache.go:94: Class.New call has arguments but no formatting directives pkg/provider/certificate_authority.go:98: New call has arguments but no formatting directives pkg/provider/identity.go:96: New call has arguments but no formatting directives pkg/provider/utils.go:124: New call needs 1 arg but has 2 args pkg/provider/utils.go:136: New call needs 1 arg but has 2 args storage/redis/client.go:44: Class.New call has arguments but no formatting directives storage/redis/client.go:64: Class.New call has arguments but no formatting directives storage/redis/client.go:75: Class.New call has arguments but no formatting directives storage/redis/client.go:80: Class.New call has arguments but no formatting directives storage/redis/client.go:92: Class.New call has arguments but no formatting directives storage/redis/client.go:96: Class.New call has arguments but no formatting directives storage/redis/client.go:102: Class.New call has arguments but no formatting directives storage/redis/client.go:126: Class.New call has arguments but no formatting directives
2018-08-22 07:39:57 +01:00
return Error.New("delete error: %v", err)
}
return nil
2018-04-18 16:34:15 +01:00
}
// Close closes a redis client
func (client *Client) Close() error {
return client.db.Close()
2018-04-18 16:34:15 +01:00
}
// GetAll is the bulk method for gets from the redis data store.
// The maximum keys returned will be storage.LookupLimit. If more than that
// is requested, an error will be returned
func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
if len(keys) > storage.LookupLimit {
return nil, storage.ErrLimitExceeded
}
keyStrings := make([]string, len(keys))
for i, v := range keys {
keyStrings[i] = v.String()
}
results, err := client.db.MGet(keyStrings...).Result()
if err != nil {
return nil, err
}
values := []storage.Value{}
for _, result := range results {
if result == nil {
values = append(values, nil)
} else {
s, ok := result.(string)
if !ok {
return nil, Error.New("invalid result type %T", result)
}
values = append(values, storage.Value(s))
}
}
return values, nil
}
// Iterate iterates over items based on opts
func (client *Client) Iterate(opts storage.IterateOptions, fn func(it storage.Iterator) error) error {
var all storage.Items
var err error
if !opts.Reverse {
all, err = client.allPrefixedItems(opts.Prefix, opts.First, nil)
} else {
all, err = client.allPrefixedItems(opts.Prefix, nil, opts.First)
}
if err != nil {
return err
}
if !opts.Recurse {
all = storage.SortAndCollapse(all, opts.Prefix)
}
if opts.Reverse {
all = storage.ReverseItems(all)
}
return fn(&storage.StaticIterator{
Items: all,
})
}
func (client *Client) allPrefixedItems(prefix, first, last storage.Key) (storage.Items, error) {
var all storage.Items
seen := map[string]struct{}{}
match := string(escapeMatch([]byte(prefix))) + "*"
it := client.db.Scan(0, match, 0).Iterator()
for it.Next() {
key := it.Val()
if !first.IsZero() && storage.Key(key).Less(first) {
continue
}
if !last.IsZero() && last.Less(storage.Key(key)) {
continue
}
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
value, err := client.db.Get(key).Bytes()
if err != nil {
return nil, err
}
all = append(all, storage.ListItem{
Key: storage.Key(key),
Value: storage.Value(value),
IsPrefix: false,
})
}
sort.Sort(all)
return all, nil
}