2019-01-24 20:15:10 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
2018-04-18 17:55:28 +01:00
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
2018-04-18 16:34:15 +01:00
|
|
|
package redis
|
|
|
|
|
|
|
|
import (
|
2019-06-05 15:23:10 +01:00
|
|
|
"context"
|
2018-12-12 13:15:34 +00:00
|
|
|
"net/url"
|
2018-09-05 17:10:35 +01:00
|
|
|
"sort"
|
2018-10-05 16:58:07 +01:00
|
|
|
"strconv"
|
2018-04-18 16:34:15 +01:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/go-redis/redis"
|
2018-06-13 19:22:32 +01:00
|
|
|
"github.com/zeebo/errs"
|
2019-06-05 15:23:10 +01:00
|
|
|
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
2018-12-20 18:29:05 +00:00
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
"storj.io/storj/storage"
|
2018-04-18 16:34:15 +01:00
|
|
|
)
|
|
|
|
|
2018-06-19 19:03:46 +01:00
|
|
|
var (
|
|
|
|
// Error is a redis error
|
|
|
|
Error = errs.Class("redis error")
|
2019-06-05 15:23:10 +01:00
|
|
|
|
|
|
|
mon = monkit.Package()
|
2018-06-19 19:03:46 +01:00
|
|
|
)
|
|
|
|
|
2018-12-17 20:16:28 +00:00
|
|
|
// TODO(coyle): this should be set to 61 * time.Minute after we implement Ping and Refresh on Overlay Cache
|
|
|
|
// This disables the TTL since the Set command only includes a TTL if it is greater than 0
|
|
|
|
const defaultNodeExpiration = 0 * time.Minute
|
2018-04-18 16:34:15 +01:00
|
|
|
|
2018-08-01 15:15:38 +01:00
|
|
|
// Client is the entrypoint into Redis
|
|
|
|
type Client struct {
|
2018-06-13 19:22:32 +01:00
|
|
|
db *redis.Client
|
|
|
|
TTL time.Duration
|
2018-04-18 16:34:15 +01:00
|
|
|
}
|
|
|
|
|
2018-08-27 18:28:16 +01:00
|
|
|
// NewClient returns a configured Client instance, verifying a successful connection to redis
|
2018-08-01 15:15:38 +01:00
|
|
|
func NewClient(address, password string, db int) (*Client, error) {
|
2018-09-05 17:10:35 +01:00
|
|
|
client := &Client{
|
2018-06-13 19:22:32 +01:00
|
|
|
db: redis.NewClient(&redis.Options{
|
2018-04-18 16:34:15 +01:00
|
|
|
Addr: address,
|
|
|
|
Password: password,
|
|
|
|
DB: db,
|
|
|
|
}),
|
2018-06-13 19:22:32 +01:00
|
|
|
TTL: defaultNodeExpiration,
|
2018-04-18 16:34:15 +01:00
|
|
|
}
|
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
// ping here to verify we are able to connect to redis with the initialized client.
|
2018-09-05 17:10:35 +01:00
|
|
|
if err := client.db.Ping().Err(); err != nil {
|
2018-08-22 07:39:57 +01:00
|
|
|
return nil, Error.New("ping failed: %v", err)
|
2018-04-18 16:34:15 +01:00
|
|
|
}
|
|
|
|
|
2018-09-05 17:10:35 +01:00
|
|
|
return client, nil
|
2018-04-18 16:34:15 +01:00
|
|
|
}
|
|
|
|
|
2018-10-05 16:58:07 +01:00
|
|
|
// NewClientFrom returns a configured Client instance from a redis address, verifying a successful connection to redis
|
|
|
|
func NewClientFrom(address string) (*Client, error) {
|
2018-12-12 13:15:34 +00:00
|
|
|
redisurl, err := url.Parse(address)
|
2018-10-05 16:58:07 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if redisurl.Scheme != "redis" {
|
|
|
|
return nil, Error.New("not a redis:// formatted address")
|
|
|
|
}
|
|
|
|
|
|
|
|
q := redisurl.Query()
|
|
|
|
|
|
|
|
db, err := strconv.Atoi(q.Get("db"))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return NewClient(redisurl.Host, q.Get("password"), db)
|
|
|
|
}
|
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
// Get looks up the provided key from redis returning either an error or the result.
|
2019-06-05 15:23:10 +01:00
|
|
|
func (client *Client) Get(ctx context.Context, key storage.Key) (_ storage.Value, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-11-15 15:31:33 +00:00
|
|
|
if key.IsZero() {
|
|
|
|
return nil, storage.ErrEmptyKey.New("")
|
|
|
|
}
|
|
|
|
|
2018-09-05 17:10:35 +01:00
|
|
|
value, err := client.db.Get(string(key)).Bytes()
|
|
|
|
if err == redis.Nil {
|
2018-08-14 16:22:29 +01:00
|
|
|
return nil, storage.ErrKeyNotFound.New(key.String())
|
|
|
|
}
|
2018-06-19 19:03:46 +01:00
|
|
|
if err != nil {
|
2018-08-22 07:39:57 +01:00
|
|
|
return nil, Error.New("get error: %v", err)
|
2018-06-19 19:03:46 +01:00
|
|
|
}
|
2018-09-05 17:10:35 +01:00
|
|
|
return value, nil
|
2018-06-13 19:22:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Put adds a value to the provided key in redis, returning an error on failure.
|
2019-06-05 15:23:10 +01:00
|
|
|
func (client *Client) Put(ctx context.Context, key storage.Key, value storage.Value) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-11-15 15:31:33 +00:00
|
|
|
if key.IsZero() {
|
|
|
|
return storage.ErrEmptyKey.New("")
|
2018-06-13 19:22:32 +01:00
|
|
|
}
|
2018-11-15 15:31:33 +00:00
|
|
|
|
2019-06-05 15:23:10 +01:00
|
|
|
err = client.db.Set(key.String(), []byte(value), client.TTL).Err()
|
2018-06-19 19:03:46 +01:00
|
|
|
if err != nil {
|
2018-08-22 07:39:57 +01:00
|
|
|
return Error.New("put error: %v", err)
|
2018-06-19 19:03:46 +01:00
|
|
|
}
|
|
|
|
return nil
|
2018-04-18 16:34:15 +01:00
|
|
|
}
|
|
|
|
|
2018-08-01 15:15:38 +01:00
|
|
|
// List returns either a list of keys for which boltdb has values or an error.
|
2019-06-05 15:23:10 +01:00
|
|
|
func (client *Client) List(ctx context.Context, first storage.Key, limit int) (_ storage.Keys, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return storage.ListKeys(ctx, client, first, limit)
|
2018-08-26 04:00:49 +01:00
|
|
|
}
|
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
// Delete deletes a key/value pair from redis, for a given the key
|
2019-06-05 15:23:10 +01:00
|
|
|
func (client *Client) Delete(ctx context.Context, key storage.Key) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-11-15 15:31:33 +00:00
|
|
|
if key.IsZero() {
|
|
|
|
return storage.ErrEmptyKey.New("")
|
|
|
|
}
|
|
|
|
|
2019-06-05 15:23:10 +01:00
|
|
|
err = client.db.Del(key.String()).Err()
|
2018-06-19 19:03:46 +01:00
|
|
|
if err != nil {
|
2018-08-22 07:39:57 +01:00
|
|
|
return Error.New("delete error: %v", err)
|
2018-06-19 19:03:46 +01:00
|
|
|
}
|
2018-09-05 17:10:35 +01:00
|
|
|
return nil
|
2018-04-18 16:34:15 +01:00
|
|
|
}
|
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
// Close closes a redis client
|
2018-09-05 17:10:35 +01:00
|
|
|
func (client *Client) Close() error {
|
|
|
|
return client.db.Close()
|
2018-04-18 16:34:15 +01:00
|
|
|
}
|
2018-08-01 15:15:38 +01:00
|
|
|
|
2018-10-25 18:11:28 +01:00
|
|
|
// GetAll is the bulk method for gets from the redis data store.
|
|
|
|
// The maximum keys returned will be storage.LookupLimit. If more than that
|
|
|
|
// is requested, an error will be returned
|
2019-06-05 15:23:10 +01:00
|
|
|
func (client *Client) GetAll(ctx context.Context, keys storage.Keys) (_ storage.Values, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-09-07 10:00:00 +01:00
|
|
|
if len(keys) > storage.LookupLimit {
|
|
|
|
return nil, storage.ErrLimitExceeded
|
2018-08-01 15:15:38 +01:00
|
|
|
}
|
|
|
|
|
2018-09-05 17:10:35 +01:00
|
|
|
keyStrings := make([]string, len(keys))
|
2018-08-01 15:15:38 +01:00
|
|
|
for i, v := range keys {
|
2018-09-05 17:10:35 +01:00
|
|
|
keyStrings[i] = v.String()
|
2018-08-01 15:15:38 +01:00
|
|
|
}
|
|
|
|
|
2018-09-05 17:10:35 +01:00
|
|
|
results, err := client.db.MGet(keyStrings...).Result()
|
2018-08-01 15:15:38 +01:00
|
|
|
if err != nil {
|
2018-09-05 17:10:35 +01:00
|
|
|
return nil, err
|
2018-08-01 15:15:38 +01:00
|
|
|
}
|
|
|
|
values := []storage.Value{}
|
2018-09-05 17:10:35 +01:00
|
|
|
for _, result := range results {
|
2018-09-11 05:52:14 +01:00
|
|
|
if result == nil {
|
|
|
|
values = append(values, nil)
|
|
|
|
} else {
|
|
|
|
s, ok := result.(string)
|
|
|
|
if !ok {
|
|
|
|
return nil, Error.New("invalid result type %T", result)
|
|
|
|
}
|
|
|
|
values = append(values, storage.Value(s))
|
2018-09-05 17:10:35 +01:00
|
|
|
}
|
2018-09-11 05:52:14 +01:00
|
|
|
|
2018-08-01 15:15:38 +01:00
|
|
|
}
|
|
|
|
return values, nil
|
|
|
|
}
|
2018-09-05 17:10:35 +01:00
|
|
|
|
|
|
|
// Iterate iterates over items based on opts
|
2019-06-05 15:23:10 +01:00
|
|
|
func (client *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2018-09-05 17:10:35 +01:00
|
|
|
var all storage.Items
|
|
|
|
if !opts.Reverse {
|
|
|
|
all, err = client.allPrefixedItems(opts.Prefix, opts.First, nil)
|
|
|
|
} else {
|
|
|
|
all, err = client.allPrefixedItems(opts.Prefix, nil, opts.First)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if !opts.Recurse {
|
|
|
|
all = storage.SortAndCollapse(all, opts.Prefix)
|
|
|
|
}
|
|
|
|
if opts.Reverse {
|
|
|
|
all = storage.ReverseItems(all)
|
|
|
|
}
|
2019-06-05 15:23:10 +01:00
|
|
|
return fn(ctx, &storage.StaticIterator{
|
2018-09-05 17:10:35 +01:00
|
|
|
Items: all,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-12-20 18:29:05 +00:00
|
|
|
// FlushDB deletes all keys in the currently selected DB.
|
|
|
|
func (client *Client) FlushDB() error {
|
|
|
|
_, err := client.db.FlushDB().Result()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-09-05 17:10:35 +01:00
|
|
|
func (client *Client) allPrefixedItems(prefix, first, last storage.Key) (storage.Items, error) {
|
|
|
|
var all storage.Items
|
2018-09-06 21:50:22 +01:00
|
|
|
seen := map[string]struct{}{}
|
2018-09-05 17:10:35 +01:00
|
|
|
|
|
|
|
match := string(escapeMatch([]byte(prefix))) + "*"
|
|
|
|
it := client.db.Scan(0, match, 0).Iterator()
|
|
|
|
for it.Next() {
|
|
|
|
key := it.Val()
|
2018-09-07 15:20:15 +01:00
|
|
|
if !first.IsZero() && storage.Key(key).Less(first) {
|
2018-09-05 17:10:35 +01:00
|
|
|
continue
|
|
|
|
}
|
2018-09-07 15:20:15 +01:00
|
|
|
if !last.IsZero() && last.Less(storage.Key(key)) {
|
2018-09-05 17:10:35 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2018-09-06 21:50:22 +01:00
|
|
|
if _, ok := seen[key]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
seen[key] = struct{}{}
|
|
|
|
|
2018-09-05 17:10:35 +01:00
|
|
|
value, err := client.db.Get(key).Bytes()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
all = append(all, storage.ListItem{
|
|
|
|
Key: storage.Key(key),
|
|
|
|
Value: storage.Value(value),
|
|
|
|
IsPrefix: false,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Sort(all)
|
|
|
|
|
|
|
|
return all, nil
|
|
|
|
}
|