Cleanup & bolt-backed cache (#87)

* wip post-demos cleanup

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* Reorganize:

storage
├── common
│   └── common.go `package storage`
├── boltdb
│   └── ...
└── redis
    └── ...

storage
├── common.go `package storage`
├── boltdb
│   └── ...
└── redis
    └── ...

storage
├── common
│   ├── common.go `package storage`
│   └── testing.go `package storage` <--
└── ...

internal
├── app
│   └── cli
├── pkg
│   └── readcloser
└── test
    └── util.go `package test` <--

* remove comment

* add and use goimports

* add test types & fix some lint errors

* better typing

* fixing linter issues/comments

* goimports

* goimports

* more linter fixes; replace panic with NoError assertions in tests

* fix typo/more linter errors

* moar better linter fixes

* even moar better linter fixes

* linter

* add localPort back

* fixing exports, imports, and add comments
This commit is contained in:
Bryan White 2018-06-13 14:22:32 -04:00 committed by Dennis Coyle
parent 7280ae64a6
commit 1ebd66d880
21 changed files with 907 additions and 544 deletions

View File

@ -18,6 +18,10 @@ check-copyrights:
@echo "Running ${@}" @echo "Running ${@}"
@./scripts/check-for-header.sh @./scripts/check-for-header.sh
# Applies goimports to every go file (excluding vendored files)
goimports-fix:
goimports -w $$(find . -type f -name '*.go' -not -path "*/vendor/*")
proto: proto:
@echo "Running ${@}" @echo "Running ${@}"
./scripts/build-protos.sh ./scripts/build-protos.sh

View File

@ -26,7 +26,7 @@ var (
) )
func (s *serv) Process(ctx context.Context) error { func (s *serv) Process(ctx context.Context) error {
bdb, err := boltdb.New(s.logger, *dbPath) bdb, err := boltdb.NewClient(s.logger, *dbPath, boltdb.PointerBucket)
if err != nil { if err != nil {
return err return err

196
internal/test/util.go Normal file
View File

@ -0,0 +1,196 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package test
import (
"crypto/rand"
"encoding/hex"
"flag"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeebo/errs"
"storj.io/storj/storage"
)
// KvStore is an in-memory, crappy key/value store type for testing
type KvStore map[string]storage.Value
// MockKeyValueStore is a `KeyValueStore` type used for testing (see storj.io/storj/storage/common.go)
type MockKeyValueStore struct {
Data KvStore
GetCalled int
PutCalled int
ListCalled int
DeleteCalled int
CloseCalled int
PingCalled int
}
// RedisDone is a function type that describes the callback returned by `EnsureRedis`
type RedisDone func()
// RedisServer is a struct which holds and manages the state of a `redis-server` process
type RedisServer struct {
cmd *exec.Cmd
started bool
}
var (
// ErrMissingKey is the error returned if a key is not in the mock store
ErrMissingKey = errs.New("missing")
// ErrForced is the error returned when the forced error flag is passed to mock an error
ErrForced = errs.New("error forced by using 'error' key in mock")
redisRefs = map[string]bool{}
testRedis = &RedisServer{
started: false,
}
)
// Get looks up the provided key from the MockKeyValueStore returning either an error or the result.
func (m *MockKeyValueStore) Get(key storage.Key) (storage.Value, error) {
m.GetCalled++
if key.String() == "error" {
return storage.Value{}, ErrForced
}
v, ok := m.Data[key.String()]
if !ok {
return storage.Value{}, ErrMissingKey
}
return v, nil
}
// Put adds a value to the provided key in the MockKeyValueStore, returning an error on failure.
func (m *MockKeyValueStore) Put(key storage.Key, value storage.Value) error {
m.PutCalled++
m.Data[key.String()] = value
return nil
}
// Delete deletes a key/value pair from the MockKeyValueStore, for a given the key
func (m *MockKeyValueStore) Delete(key storage.Key) error {
m.DeleteCalled++
delete(m.Data, key.String())
return nil
}
// List returns either a list of keys for which the MockKeyValueStore has values or an error.
func (m *MockKeyValueStore) List() (_ storage.Keys, _ error) {
m.ListCalled++
keys := storage.Keys{}
for k := range m.Data {
keys = append(keys, storage.Key(k))
}
return keys, nil
}
// Close closes the client
func (m *MockKeyValueStore) Close() error {
m.CloseCalled++
return nil
}
// Ping is called by some redis client code
func (m *MockKeyValueStore) Ping() error {
m.PingCalled++
return nil
}
// NewMockKeyValueStore returns a mocked `KeyValueStore` implementation for testing
func NewMockKeyValueStore(d KvStore) *MockKeyValueStore {
return &MockKeyValueStore{
Data: d,
GetCalled: 0,
PutCalled: 0,
ListCalled: 0,
DeleteCalled: 0,
CloseCalled: 0,
PingCalled: 0,
}
}
// EnsureRedis attempts to start the `redis-server` binary
func EnsureRedis(t *testing.T) (_ RedisDone) {
flag.Set("redisAddress", "127.0.0.1:6379")
index, _ := randomHex(5)
redisRefs[index] = true
if testRedis.started != true {
testRedis.start(t)
}
return func() {
if v := recover(); v != nil {
testRedis.stop()
panic(v)
}
redisRefs[index] = false
if !(redisRefCount() > 0) {
testRedis.stop()
}
}
}
func redisRefCount() (_ int) {
count := 0
for _, ref := range redisRefs {
if ref {
count++
}
}
return count
}
func (r *RedisServer) start(t *testing.T) {
r.cmd = &exec.Cmd{}
cmd := r.cmd
logPath, err := filepath.Abs("test_redis-server.log")
assert.NoError(t, err)
binPath, err := exec.LookPath("redis-server")
assert.NoError(t, err)
log, err := os.Create(logPath)
assert.NoError(t, err)
cmd.Path = binPath
cmd.Stdout = log
go func() {
r.started = true
if err := cmd.Run(); err != nil {
// TODO(bryanchriswhite) error checking
}
}()
time.Sleep(2 * time.Second)
}
func (r *RedisServer) stop() {
r.started = false
r.cmd.Process.Kill()
}
func randomHex(n int) (string, error) {
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}

View File

@ -16,6 +16,8 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"storj.io/storj/internal/test"
pb "storj.io/storj/protos/netstate" pb "storj.io/storj/protos/netstate"
) )
@ -25,9 +27,7 @@ func TestNetStateClient(t *testing.T) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 9000)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 9000))
assert.NoError(t, err) assert.NoError(t, err)
mdb := &MockDB{ mdb := test.NewMockKeyValueStore(test.KvStore{})
timesCalled: 0,
}
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
pb.RegisterNetStateServer(grpcServer, NewServer(mdb, logger)) pb.RegisterNetStateServer(grpcServer, NewServer(mdb, logger))
@ -57,31 +57,23 @@ func TestNetStateClient(t *testing.T) {
APIKey: []byte("abc123"), APIKey: []byte("abc123"),
} }
if mdb.timesCalled != 0 {
t.Error("Expected mockdb to be called 0 times")
}
// Tests Server.Put // Tests Server.Put
_, err = c.Put(ctx, &pr1) _, err = c.Put(ctx, &pr1)
if err != nil || status.Code(err) == codes.Internal { if err != nil || status.Code(err) == codes.Internal {
t.Error("Failed to Put") t.Error("Failed to Put")
} }
if mdb.timesCalled != 1 { if mdb.PutCalled != 1 {
t.Error("Failed to call mockdb correctly") t.Error("Failed to call mockdb correctly")
} }
if !bytes.Equal(mdb.puts[0].Path, pr1.Path) {
t.Error("Expected saved path to equal given path")
}
pointerBytes, err := proto.Marshal(pr1.Pointer) pointerBytes, err := proto.Marshal(pr1.Pointer)
if err != nil { if err != nil {
t.Error("failed to marshal test pointer") t.Error("failed to marshal test pointer")
} }
if !bytes.Equal(mdb.puts[0].Pointer, pointerBytes) { if !bytes.Equal(mdb.Data[string(pr1.Path)], pointerBytes) {
t.Error("Expected saved value to equal given value") t.Error("Expected saved pointer to equal given pointer")
} }
// Tests Server.Get // Tests Server.Get
@ -97,7 +89,7 @@ func TestNetStateClient(t *testing.T) {
t.Error("Expected to get same content that was put") t.Error("Expected to get same content that was put")
} }
if mdb.timesCalled != 2 { if mdb.GetCalled != 1 {
t.Error("Failed to call mockdb correct number of times") t.Error("Failed to call mockdb correct number of times")
} }
@ -120,7 +112,7 @@ func TestNetStateClient(t *testing.T) {
t.Error("Failed to Put") t.Error("Failed to Put")
} }
if mdb.timesCalled != 3 { if mdb.PutCalled != 2 {
t.Error("Failed to call mockdb correct number of times") t.Error("Failed to call mockdb correct number of times")
} }
@ -135,7 +127,7 @@ func TestNetStateClient(t *testing.T) {
t.Error("Failed to delete") t.Error("Failed to delete")
} }
if mdb.timesCalled != 4 { if mdb.DeleteCalled != 1 {
t.Error("Failed to call mockdb correct number of times") t.Error("Failed to call mockdb correct number of times")
} }
@ -157,7 +149,7 @@ func TestNetStateClient(t *testing.T) {
t.Error("Failed to list correct file path") t.Error("Failed to list correct file path")
} }
if mdb.timesCalled != 5 { if mdb.ListCalled != 1 {
t.Error("Failed to call mockdb correct number of times") t.Error("Failed to call mockdb correct number of times")
} }
} }

View File

@ -14,33 +14,29 @@ import (
"storj.io/storj/netstate/auth" "storj.io/storj/netstate/auth"
pb "storj.io/storj/protos/netstate" pb "storj.io/storj/protos/netstate"
"storj.io/storj/storage/boltdb" "storj.io/storj/storage"
) )
// PointerEntry - Path and Pointer are saved as a key/value pair to a `storage.KeyValueStore`.
type PointerEntry struct {
Path []byte
Pointer []byte
}
// Server implements the network state RPC service // Server implements the network state RPC service
type Server struct { type Server struct {
DB DB DB storage.KeyValueStore
logger *zap.Logger logger *zap.Logger
} }
// NewServer creates instance of Server // NewServer creates instance of Server
func NewServer(db DB, logger *zap.Logger) *Server { func NewServer(db storage.KeyValueStore, logger *zap.Logger) *Server {
return &Server{ return &Server{
DB: db, DB: db,
logger: logger, logger: logger,
} }
} }
// DB interface allows more modular unit testing
// and makes it easier in the future to substitute
// db clients other than bolt
type DB interface {
Put(boltdb.PointerEntry) error
Get([]byte) ([]byte, error)
List() ([][]byte, error)
Delete([]byte) error
}
func (s *Server) validateAuth(APIKeyBytes []byte) error { func (s *Server) validateAuth(APIKeyBytes []byte) error {
if !auth.ValidateAPIKey(string(APIKeyBytes)) { if !auth.ValidateAPIKey(string(APIKeyBytes)) {
s.logger.Error("unauthorized request: ", zap.Error(grpc.Errorf(codes.Unauthenticated, "Invalid API credential"))) s.logger.Error("unauthorized request: ", zap.Error(grpc.Errorf(codes.Unauthenticated, "Invalid API credential")))
@ -64,12 +60,12 @@ func (s *Server) Put(ctx context.Context, putReq *pb.PutRequest) (*pb.PutRespons
return nil, status.Errorf(codes.Internal, err.Error()) return nil, status.Errorf(codes.Internal, err.Error())
} }
pe := boltdb.PointerEntry{ pe := PointerEntry{
Path: putReq.Path, Path: putReq.Path,
Pointer: pointerBytes, Pointer: pointerBytes,
} }
if err := s.DB.Put(pe); err != nil { if err := s.DB.Put(pe.Path, pe.Pointer); err != nil {
s.logger.Error("err putting pointer", zap.Error(err)) s.logger.Error("err putting pointer", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error()) return nil, status.Errorf(codes.Internal, err.Error())
} }
@ -118,7 +114,7 @@ func (s *Server) List(ctx context.Context, req *pb.ListRequest) (*pb.ListRespons
s.logger.Debug("path keys retrieved") s.logger.Debug("path keys retrieved")
return &pb.ListResponse{ return &pb.ListResponse{
// pathKeys is an array of byte arrays // pathKeys is an array of byte arrays
Paths: pathKeys, Paths: pathKeys.ByteSlices(),
}, nil }, nil
} }

View File

@ -4,13 +4,10 @@
package netstate package netstate
import ( import (
"bytes"
"os" "os"
"testing" "testing"
"github.com/spf13/viper" "github.com/spf13/viper"
"storj.io/storj/storage/boltdb"
) )
const ( const (
@ -23,48 +20,3 @@ func TestMain(m *testing.M) {
viper.AutomaticEnv() viper.AutomaticEnv()
os.Exit(m.Run()) os.Exit(m.Run())
} }
type MockDB struct {
timesCalled int
puts []boltdb.PointerEntry
pathKeys [][]byte
}
func (m *MockDB) Put(f boltdb.PointerEntry) error {
m.timesCalled++
m.puts = append(m.puts, f)
return nil
}
func (m *MockDB) Get(path []byte) ([]byte, error) {
m.timesCalled++
for _, pointerEntry := range m.puts {
if bytes.Equal(path, pointerEntry.Path) {
return pointerEntry.Pointer, nil
}
}
panic("failed to get the given file")
}
func (m *MockDB) List() ([][]byte, error) {
m.timesCalled++
for _, putReq := range m.puts {
m.pathKeys = append(m.pathKeys, putReq.Path)
}
return m.pathKeys, nil
}
func (m *MockDB) Delete(path []byte) error {
m.timesCalled++
for i, pointerEntry := range m.puts {
if bytes.Equal(path, pointerEntry.Path) {
m.puts = append(m.puts[:i], m.puts[i+1:]...)
}
}
return nil
}

View File

@ -1,47 +1,60 @@
// Copyright (C) 2018 Storj Labs, Inc. // Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information. // See LICENSE for copying information.
package redis package overlay
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/kademlia"
"storj.io/storj/protos/overlay" "storj.io/storj/protos/overlay"
"storj.io/storj/storage"
"storj.io/storj/storage/boltdb"
"storj.io/storj/storage/redis"
) )
const defaultNodeExpiration = 61 * time.Minute
// ErrNodeNotFound standardizes errors here // ErrNodeNotFound standardizes errors here
var ErrNodeNotFound = errors.New("Node not found") var ErrNodeNotFound = errs.New("Node not found")
// OverlayClient is used to store overlay data in Redis // Cache is used to store overlay data in Redis
type OverlayClient struct { type Cache struct {
DB Client DB storage.KeyValueStore
DHT kademlia.DHT DHT kademlia.DHT
} }
// NewOverlayClient returns a pointer to a new OverlayClient instance with an initalized connection to Redis. // NewRedisOverlayCache returns a pointer to a new Cache instance with an initalized connection to Redis.
func NewOverlayClient(address, password string, db int, DHT kademlia.DHT) (*OverlayClient, error) { func NewRedisOverlayCache(address, password string, db int, DHT kademlia.DHT) (*Cache, error) {
rc, err := NewRedisClient(address, password, db) rc, err := redis.NewClient(address, password, db)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &OverlayClient{ return &Cache{
DB: rc, DB: rc,
DHT: DHT, DHT: DHT,
}, nil }, nil
} }
// NewBoltOverlayCache returns a pointer to a new Cache instance with an initalized connection to a Bolt db.
func NewBoltOverlayCache(dbPath string, DHT kademlia.DHT) (*Cache, error) {
bc, err := boltdb.NewClient(nil, dbPath, boltdb.OverlayBucket)
if err != nil {
return nil, err
}
return &Cache{
DB: bc,
DHT: DHT,
}, nil
}
// Get looks up the provided nodeID from the redis cache // Get looks up the provided nodeID from the redis cache
func (o *OverlayClient) Get(ctx context.Context, key string) (*overlay.NodeAddress, error) { func (o *Cache) Get(ctx context.Context, key string) (*overlay.NodeAddress, error) {
b, err := o.DB.Get(key) b, err := o.DB.Get([]byte(key))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -54,18 +67,18 @@ func (o *OverlayClient) Get(ctx context.Context, key string) (*overlay.NodeAddre
return na, nil return na, nil
} }
// Set adds a nodeID to the redis cache with a binary representation of proto defined NodeAddress // Put adds a nodeID to the redis cache with a binary representation of proto defined NodeAddress
func (o *OverlayClient) Set(nodeID string, value overlay.NodeAddress) error { func (o *Cache) Put(nodeID string, value overlay.NodeAddress) error {
data, err := proto.Marshal(&value) data, err := proto.Marshal(&value)
if err != nil { if err != nil {
return err return err
} }
return o.DB.Set(nodeID, data, defaultNodeExpiration) return o.DB.Put([]byte(nodeID), []byte(data))
} }
// Bootstrap walks the initialized network and populates the cache // Bootstrap walks the initialized network and populates the cache
func (o *OverlayClient) Bootstrap(ctx context.Context) error { func (o *Cache) Bootstrap(ctx context.Context) error {
fmt.Println("bootstrapping cache") fmt.Println("bootstrapping cache")
nodes, err := o.DHT.GetNodes(ctx, "0", 1280) nodes, err := o.DHT.GetNodes(ctx, "0", 1280)
@ -75,7 +88,7 @@ func (o *OverlayClient) Bootstrap(ctx context.Context) error {
fmt.Println("could not find node in network", err, v.Id) fmt.Println("could not find node in network", err, v.Id)
} }
addr, err := proto.Marshal(found.Address) addr, err := proto.Marshal(found.Address)
o.DB.Set(found.Id, addr, defaultNodeExpiration) o.DB.Put([]byte(found.Id), addr)
} }
// called after kademlia is bootstrapped // called after kademlia is bootstrapped
// needs to take RoutingTable and start to persist it into the cache // needs to take RoutingTable and start to persist it into the cache
@ -96,7 +109,7 @@ func (o *OverlayClient) Bootstrap(ctx context.Context) error {
} }
// Refresh walks the network looking for new nodes and pings existing nodes to eliminate stale addresses // Refresh walks the network looking for new nodes and pings existing nodes to eliminate stale addresses
func (o *OverlayClient) Refresh(ctx context.Context) error { func (o *Cache) Refresh(ctx context.Context) error {
// iterate over all nodes // iterate over all nodes
// compare responses to find new nodes // compare responses to find new nodes
// listen for responses from existing nodes // listen for responses from existing nodes
@ -118,7 +131,7 @@ func (o *OverlayClient) Refresh(ctx context.Context) error {
} }
// Walk iterates over buckets to traverse the network // Walk iterates over buckets to traverse the network
func (o *OverlayClient) Walk(ctx context.Context) error { func (o *Cache) Walk(ctx context.Context) error {
nodes, err := o.DHT.GetNodes(ctx, "0", 128) nodes, err := o.DHT.GetNodes(ctx, "0", 128)
if err != nil { if err != nil {
return err return err

302
pkg/overlay/cache_test.go Normal file
View File

@ -0,0 +1,302 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"context"
"os"
"path/filepath"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zeebo/errs"
"storj.io/storj/internal/test"
"storj.io/storj/pkg/utils"
"storj.io/storj/protos/overlay"
"storj.io/storj/storage"
"storj.io/storj/storage/boltdb"
"storj.io/storj/storage/redis"
)
type dbClient int
type responses map[dbClient]*overlay.NodeAddress
type errors map[dbClient]error
const (
mock dbClient = iota
bolt
_redis
)
var (
getCases = []struct {
testID string
expectedTimesCalled int
key string
expectedResponses responses
expectedErrors errors
data test.KvStore
}{
{
testID: "valid Get",
expectedTimesCalled: 1,
key: "foo",
expectedResponses: func() responses {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
return responses{
mock: na,
bolt: na,
_redis: na,
}
}(),
expectedErrors: errors{
mock: nil,
bolt: nil,
_redis: nil,
},
data: test.KvStore{"foo": func() storage.Value {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
d, err := proto.Marshal(na)
if err != nil {
panic(err)
}
return d
}()},
},
{
testID: "forced get error",
expectedTimesCalled: 1,
key: "error",
expectedResponses: func() responses {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
return responses{
mock: nil,
bolt: na,
_redis: na,
}
}(),
expectedErrors: errors{
mock: test.ErrForced,
bolt: nil,
_redis: nil,
},
data: test.KvStore{"error": func() storage.Value {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
d, err := proto.Marshal(na)
if err != nil {
panic(err)
}
return d
}()},
},
{
testID: "get missing key",
expectedTimesCalled: 1,
key: "bar",
expectedResponses: responses{
mock: nil,
bolt: nil,
_redis: nil,
},
// TODO(bryanchriswhite): compare actual errors
expectedErrors: errors{
mock: test.ErrMissingKey,
bolt: errs.New("boltdb error"),
_redis: errs.New("redis error"),
},
data: test.KvStore{"foo": func() storage.Value {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
d, err := proto.Marshal(na)
if err != nil {
panic(err)
}
return d
}()},
},
}
putCases = []struct {
testID string
expectedTimesCalled int
key string
value overlay.NodeAddress
expectedErrors errors
data test.KvStore
}{
{
testID: "valid Put",
expectedTimesCalled: 1,
key: "foo",
value: overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"},
expectedErrors: errors{
mock: nil,
bolt: nil,
_redis: nil,
},
data: test.KvStore{},
},
}
)
func redisTestClient(t *testing.T, data test.KvStore) storage.KeyValueStore {
client, err := redis.NewClient("127.0.0.1:6379", "", 1)
assert.NoError(t, err)
populateStorage(t, client, data)
return client
}
func boltTestClient(t *testing.T, data test.KvStore) (_ storage.KeyValueStore, _ func()) {
boltPath, err := filepath.Abs("test_bolt.db")
assert.NoError(t, err)
logger, err := utils.NewLogger("dev")
assert.NoError(t, err)
client, err := boltdb.NewClient(logger, boltPath, "testBoltdb")
assert.NoError(t, err)
cleanup := func() {
err := os.Remove(boltPath)
assert.NoError(t, err)
}
populateStorage(t, client, data)
return client, cleanup
}
func populateStorage(t *testing.T, client storage.KeyValueStore, data test.KvStore) {
for k, v := range data {
err := client.Put(storage.Key(k), v)
assert.NoError(t, err)
}
}
func TestRedisGet(t *testing.T) {
done := test.EnsureRedis(t)
defer done()
for _, c := range getCases {
t.Run(c.testID, func(t *testing.T) {
db := redisTestClient(t, c.data)
oc := Cache{DB: db}
resp, err := oc.Get(context.Background(), c.key)
if expectedErr := c.expectedErrors[_redis]; expectedErr != nil {
assert.Error(t, err)
} else {
assert.Equal(t, expectedErr, err)
}
assert.Equal(t, c.expectedResponses[_redis], resp)
})
}
}
func TestRedisPut(t *testing.T) {
done := test.EnsureRedis(t)
defer done()
for _, c := range putCases {
t.Run(c.testID, func(t *testing.T) {
db, cleanup := boltTestClient(t, c.data)
defer cleanup()
oc := Cache{DB: db}
err := oc.Put(c.key, c.value)
assert.Equal(t, c.expectedErrors[_redis], err)
v, err := db.Get([]byte(c.key))
assert.NoError(t, err)
na := &overlay.NodeAddress{}
assert.NoError(t, proto.Unmarshal(v, na))
assert.Equal(t, na, &c.value)
})
}
}
func TestBoltGet(t *testing.T) {
for _, c := range getCases {
t.Run(c.testID, func(t *testing.T) {
db, cleanup := boltTestClient(t, c.data)
defer cleanup()
oc := Cache{DB: db}
resp, err := oc.Get(context.Background(), c.key)
if expectedErr := c.expectedErrors[bolt]; expectedErr != nil {
assert.Error(t, err)
} else {
assert.Equal(t, expectedErr, err)
}
assert.Equal(t, c.expectedResponses[bolt], resp)
})
}
}
func TestBoltPut(t *testing.T) {
for _, c := range putCases {
t.Run(c.testID, func(t *testing.T) {
db, cleanup := boltTestClient(t, c.data)
defer cleanup()
oc := Cache{DB: db}
err := oc.Put(c.key, c.value)
assert.Equal(t, c.expectedErrors[_redis], err)
v, err := db.Get([]byte(c.key))
assert.NoError(t, err)
na := &overlay.NodeAddress{}
assert.NoError(t, proto.Unmarshal(v, na))
assert.Equal(t, na, &c.value)
})
}
}
func TestMockGet(t *testing.T) {
for _, c := range getCases {
t.Run(c.testID, func(t *testing.T) {
db := test.NewMockKeyValueStore(c.data)
oc := Cache{DB: db}
assert.Equal(t, 0, db.GetCalled)
resp, err := oc.Get(context.Background(), c.key)
assert.Equal(t, c.expectedErrors[mock], err)
assert.Equal(t, c.expectedResponses[mock], resp)
assert.Equal(t, c.expectedTimesCalled, db.GetCalled)
})
}
}
func TestMockPut(t *testing.T) {
for _, c := range putCases {
t.Run(c.testID, func(t *testing.T) {
db := test.NewMockKeyValueStore(c.data)
oc := Cache{DB: db}
assert.Equal(t, 0, db.PutCalled)
err := oc.Put(c.key, c.value)
assert.Equal(t, c.expectedErrors[mock], err)
assert.Equal(t, c.expectedTimesCalled, db.PutCalled)
v := db.Data[c.key]
na := &overlay.NodeAddress{}
assert.NoError(t, proto.Unmarshal(v, na))
assert.Equal(t, na, &c.value)
})
}
}

View File

@ -7,23 +7,22 @@ import (
"context" "context"
"go.uber.org/zap" "go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2" "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/kademlia"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
"storj.io/storj/storage/redis"
) )
// Overlay implements our overlay RPC service // Overlay implements our overlay RPC service
type Overlay struct { type Overlay struct {
kad *kademlia.Kademlia kad *kademlia.Kademlia
DB *redis.OverlayClient cache *Cache
logger *zap.Logger logger *zap.Logger
metrics *monkit.Registry metrics *monkit.Registry
} }
// Lookup finds the address of a node in our overlay network // Lookup finds the address of a node in our overlay network
func (o *Overlay) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.LookupResponse, error) { func (o *Overlay) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.LookupResponse, error) {
na, err := o.DB.Get(ctx, req.NodeID) na, err := o.cache.Get(ctx, req.NodeID)
if err != nil { if err != nil {
o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeID)) o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeID))
return nil, err return nil, err

View File

@ -12,11 +12,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
monkit "gopkg.in/spacemonkeygo/monkit.v2" "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/kademlia"
proto "storj.io/storj/protos/overlay" proto "storj.io/storj/protos/overlay"
"storj.io/storj/storage/redis"
) )
var ( var (
@ -34,15 +33,14 @@ func init() {
flag.StringVar(&bootstrapIP, "bootstrapIP", "", "Optional IP to bootstrap node against") flag.StringVar(&bootstrapIP, "bootstrapIP", "", "Optional IP to bootstrap node against")
flag.StringVar(&bootstrapPort, "bootstrapPort", "", "Optional port of node to bootstrap against") flag.StringVar(&bootstrapPort, "bootstrapPort", "", "Optional port of node to bootstrap against")
flag.StringVar(&localPort, "localPort", "8080", "Specify a different port to listen on locally") flag.StringVar(&localPort, "localPort", "8080", "Specify a different port to listen on locally")
flag.Parse()
} }
// NewServer creates a new Overlay Service Server // NewServer creates a new Overlay Service Server
func NewServer(k *kademlia.Kademlia, db *redis.OverlayClient, l *zap.Logger, m *monkit.Registry) *grpc.Server { func NewServer(k *kademlia.Kademlia, cache *Cache, l *zap.Logger, m *monkit.Registry) *grpc.Server {
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
proto.RegisterOverlayServer(grpcServer, &Overlay{ proto.RegisterOverlayServer(grpcServer, &Overlay{
kad: k, kad: k,
DB: db, cache: cache,
logger: l, logger: l,
metrics: m, metrics: m,
}) })
@ -78,7 +76,7 @@ func (s *Service) Process(ctx context.Context) error {
// TODO(coyle): Should add the ability to pass a configuration to change the bootstrap node // TODO(coyle): Should add the ability to pass a configuration to change the bootstrap node
in := kademlia.GetIntroNode(bootstrapIP, bootstrapPort) in := kademlia.GetIntroNode(bootstrapIP, bootstrapPort)
kad, err := kademlia.NewKademlia([]proto.Node{in}, "bootstrap.storj.io", "8080") kad, err := kademlia.NewKademlia([]proto.Node{in}, "0.0.0.0", localPort)
if err != nil { if err != nil {
s.logger.Error("Failed to instantiate new Kademlia", zap.Error(err)) s.logger.Error("Failed to instantiate new Kademlia", zap.Error(err))
return err return err
@ -95,10 +93,13 @@ func (s *Service) Process(ctx context.Context) error {
} }
// bootstrap cache // bootstrap cache
cache, err := redis.NewOverlayClient(redisAddress, redisPassword, db, kad) var cache *Cache
if err != nil { if redisAddress != "" {
s.logger.Error("Failed to create a new redis overlay client", zap.Error(err)) cache, err = NewRedisOverlayCache(redisAddress, redisPassword, db, kad)
return err if err != nil {
s.logger.Error("Failed to create a new redis overlay client", zap.Error(err))
return err
}
} }
if err := cache.Bootstrap(ctx); err != nil { if err := cache.Bootstrap(ctx); err != nil {
@ -115,18 +116,20 @@ func (s *Service) Process(ctx context.Context) error {
return err return err
} }
grpcServer := grpc.NewServer() grpcServer := NewServer(kad, cache, s.logger, s.metrics)
proto.RegisterOverlayServer(grpcServer, &Overlay{
kad: kad,
DB: cache,
logger: s.logger,
metrics: s.metrics,
})
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") }) http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") })
go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), nil) }() go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), nil) }()
go cache.Walk(ctx) go cache.Walk(ctx)
// If the passed context times out or is cancelled, shutdown the gRPC server
go func() {
if _, ok := <-ctx.Done(); !ok {
grpcServer.GracefulStop()
}
}()
// If `grpcServer.Serve(...)` returns an error, shutdown/cleanup the gRPC server
defer grpcServer.GracefulStop() defer grpcServer.GracefulStop()
return grpcServer.Serve(lis) return grpcServer.Serve(lis)
} }

View File

@ -8,10 +8,12 @@ import (
"fmt" "fmt"
"net" "net"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/grpc" "google.golang.org/grpc"
"storj.io/storj/internal/test"
proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package
) )
@ -45,3 +47,13 @@ func TestNewClient(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, r) assert.NotNil(t, r)
} }
func TestProcess(t *testing.T) {
done := test.EnsureRedis(t)
defer done()
o := Service{}
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
err := o.Process(ctx)
assert.NoError(t, err)
}

View File

@ -13,30 +13,30 @@ import (
) )
func TestStore(t *testing.T) { func TestStore(t *testing.T) {
tests := []struct{ tests := []struct {
it string it string
id string id string
content []byte content []byte
expectedContent []byte expectedContent []byte
err string err string
} { }{
{ {
it: "should successfully store data", it: "should successfully store data",
id: "0123456789ABCDEFGHIJ", id: "0123456789ABCDEFGHIJ",
content: []byte("butts"), content: []byte("butts"),
expectedContent: []byte("butts"), expectedContent: []byte("butts"),
err: "", err: "",
}, },
{ {
it: "should return an error when given an invalid id", it: "should return an error when given an invalid id",
id: "012", id: "012",
content: []byte("butts"), content: []byte("butts"),
expectedContent: []byte("butts"), expectedContent: []byte("butts"),
err: "argError: Invalid id length", err: "argError: Invalid id length",
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.it, func(t *testing.T) { t.Run(tt.it, func(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
storeFile, err := StoreWriter(tt.id, os.TempDir()) storeFile, err := StoreWriter(tt.id, os.TempDir())
@ -79,67 +79,67 @@ func TestStore(t *testing.T) {
return return
} }
}) })
} }
} }
func TestRetrieve(t *testing.T) { func TestRetrieve(t *testing.T) {
tests := []struct{ tests := []struct {
it string it string
id string id string
size int64 size int64
offset int64 offset int64
content []byte content []byte
expectedContent []byte expectedContent []byte
err string err string
} { }{
{ {
it: "should successfully retrieve data", it: "should successfully retrieve data",
id: "0123456789ABCDEFGHIJ", id: "0123456789ABCDEFGHIJ",
size: 5, size: 5,
offset: 0, offset: 0,
content: []byte("butts"), content: []byte("butts"),
expectedContent: []byte("butts"), expectedContent: []byte("butts"),
err: "", err: "",
}, },
{ {
it: "should successfully retrieve data by offset", it: "should successfully retrieve data by offset",
id: "0123456789ABCDEFGHIJ", id: "0123456789ABCDEFGHIJ",
size: 5, size: 5,
offset: 5, offset: 5,
content: []byte("butts"), content: []byte("butts"),
expectedContent: []byte("butts"), expectedContent: []byte("butts"),
err: "", err: "",
}, },
{ {
it: "should successfully retrieve data by chunk", it: "should successfully retrieve data by chunk",
id: "0123456789ABCDEFGHIJ", id: "0123456789ABCDEFGHIJ",
size: 2, size: 2,
offset: 5, offset: 5,
content: []byte("bu"), content: []byte("bu"),
expectedContent: []byte("bu"), expectedContent: []byte("bu"),
err: "", err: "",
}, },
{ {
it: "should return an error when given negative offset", it: "should return an error when given negative offset",
id: "0123456789ABCDEFGHIJ", id: "0123456789ABCDEFGHIJ",
size: 0, size: 0,
offset: -1337, offset: -1337,
content: []byte("butts"), content: []byte("butts"),
expectedContent: []byte(""), expectedContent: []byte(""),
err: "argError: Invalid offset: -1337", err: "argError: Invalid offset: -1337",
}, },
{ {
it: "should successfully retrieve data with negative length", it: "should successfully retrieve data with negative length",
id: "0123456789ABCDEFGHIJ", id: "0123456789ABCDEFGHIJ",
size: -1, size: -1,
offset: 0, offset: 0,
content: []byte("butts"), content: []byte("butts"),
expectedContent: []byte("butts"), expectedContent: []byte("butts"),
err: "", err: "",
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.it, func(t *testing.T) { t.Run(tt.it, func(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
@ -194,31 +194,31 @@ func TestRetrieve(t *testing.T) {
return return
} }
}) })
} }
} }
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
tests := []struct{ tests := []struct {
it string it string
id string id string
err string err string
} { }{
{ {
it: "should successfully delete data", it: "should successfully delete data",
id: "11111111111111111111", id: "11111111111111111111",
err: "", err: "",
}, },
{ {
it: "should return nil-err with non-existent id", it: "should return nil-err with non-existent id",
id: "11111111111111111111", id: "11111111111111111111",
err: "", err: "",
}, },
{ {
it: "should err with invalid id length", it: "should err with invalid id length",
id: "111111", id: "111111",
err: "argError: Invalid id length", err: "argError: Invalid id length",
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.it, func(t *testing.T) { t.Run(tt.it, func(t *testing.T) {

View File

@ -55,25 +55,25 @@ func TestPiece(t *testing.T) {
// set up test cases // set up test cases
tests := []struct { tests := []struct {
id string id string
size int64 size int64
expiration int64 expiration int64
err string err string
}{ }{
{ // should successfully retrieve piece meta-data { // should successfully retrieve piece meta-data
id: testId, id: testId,
size: 5, size: 5,
expiration: testExpiration, expiration: testExpiration,
err: "", err: "",
}, },
{ // server should err with invalid id { // server should err with invalid id
id: "123", id: "123",
size: 5, size: 5,
expiration: testExpiration, expiration: testExpiration,
err: "rpc error: code = Unknown desc = argError: Invalid id length", err: "rpc error: code = Unknown desc = argError: Invalid id length",
}, },
{ // server should err with nonexistent file { // server should err with nonexistent file
id: "22222222222222222222", id: "22222222222222222222",
size: 5, size: 5,
expiration: testExpiration, expiration: testExpiration,
err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(os.TempDir(), "/test-data/3000/22/22/2222222222222222")), err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(os.TempDir(), "/test-data/3000/22/22/2222222222222222")),
@ -83,14 +83,14 @@ func TestPiece(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run("should return expected PieceSummary values", func(t *testing.T) { t.Run("should return expected PieceSummary values", func(t *testing.T) {
// simulate piece TTL entry // simulate piece TTL entry
_, err = db.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, testCreatedDate, testExpiration)) _, err = db.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, testCreatedDate, testExpiration))
if err != nil { if err != nil {
t.Errorf("Error: %v\nCould not make TTL entry", err) t.Errorf("Error: %v\nCould not make TTL entry", err)
return return
} }
defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id))
req := &pb.PieceId{Id: tt.id} req := &pb.PieceId{Id: tt.id}
resp, err := c.Piece(context.Background(), req) resp, err := c.Piece(context.Background(), req)
@ -146,7 +146,7 @@ func TestRetrieve(t *testing.T) {
// set up test cases // set up test cases
tests := []struct { tests := []struct {
id string id string
reqSize int64 reqSize int64
respSize int64 respSize int64
offset int64 offset int64
@ -154,7 +154,7 @@ func TestRetrieve(t *testing.T) {
err string err string
}{ }{
{ // should successfully retrieve data { // should successfully retrieve data
id: testId, id: testId,
reqSize: 5, reqSize: 5,
respSize: 5, respSize: 5,
offset: 0, offset: 0,
@ -162,7 +162,7 @@ func TestRetrieve(t *testing.T) {
err: "", err: "",
}, },
{ // server should err with invalid id { // server should err with invalid id
id: "123", id: "123",
reqSize: 5, reqSize: 5,
respSize: 5, respSize: 5,
offset: 0, offset: 0,
@ -170,7 +170,7 @@ func TestRetrieve(t *testing.T) {
err: "rpc error: code = Unknown desc = argError: Invalid id length", err: "rpc error: code = Unknown desc = argError: Invalid id length",
}, },
{ // server should err with nonexistent file { // server should err with nonexistent file
id: "22222222222222222222", id: "22222222222222222222",
reqSize: 5, reqSize: 5,
respSize: 5, respSize: 5,
offset: 0, offset: 0,
@ -178,7 +178,7 @@ func TestRetrieve(t *testing.T) {
err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(os.TempDir(), "/test-data/3000/22/22/2222222222222222")), err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(os.TempDir(), "/test-data/3000/22/22/2222222222222222")),
}, },
{ // server should return expected content and respSize with offset and excess reqSize { // server should return expected content and respSize with offset and excess reqSize
id: testId, id: testId,
reqSize: 5, reqSize: 5,
respSize: 4, respSize: 4,
offset: 1, offset: 1,
@ -186,7 +186,7 @@ func TestRetrieve(t *testing.T) {
err: "", err: "",
}, },
{ // server should return expected content with reduced reqSize { // server should return expected content with reduced reqSize
id: testId, id: testId,
reqSize: 4, reqSize: 4,
respSize: 4, respSize: 4,
offset: 0, offset: 0,
@ -229,7 +229,7 @@ func TestRetrieve(t *testing.T) {
func TestStore(t *testing.T) { func TestStore(t *testing.T) {
tests := []struct { tests := []struct {
id string id string
size int64 size int64
ttl int64 ttl int64
offset int64 offset int64
@ -239,7 +239,7 @@ func TestStore(t *testing.T) {
err string err string
}{ }{
{ // should successfully store data { // should successfully store data
id: testId, id: testId,
ttl: testExpiration, ttl: testExpiration,
content: []byte("butts"), content: []byte("butts"),
message: "OK", message: "OK",
@ -247,7 +247,7 @@ func TestStore(t *testing.T) {
err: "", err: "",
}, },
{ // should err with invalid id length { // should err with invalid id length
id: "butts", id: "butts",
ttl: testExpiration, ttl: testExpiration,
content: []byte("butts"), content: []byte("butts"),
message: "", message: "",
@ -279,7 +279,7 @@ func TestStore(t *testing.T) {
resp, err := stream.CloseAndRecv() resp, err := stream.CloseAndRecv()
defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id))
if len(tt.err) > 0 { if len(tt.err) > 0 {
if err != nil { if err != nil {
@ -305,22 +305,22 @@ func TestStore(t *testing.T) {
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
// set up test cases // set up test cases
tests := []struct { tests := []struct {
id string id string
message string message string
err string err string
}{ }{
{ // should successfully delete data { // should successfully delete data
id: testId, id: testId,
message: "OK", message: "OK",
err: "", err: "",
}, },
{ // should err with invalid id length { // should err with invalid id length
id: "123", id: "123",
message: "rpc error: code = Unknown desc = argError: Invalid id length", message: "rpc error: code = Unknown desc = argError: Invalid id length",
err: "rpc error: code = Unknown desc = argError: Invalid id length", err: "rpc error: code = Unknown desc = argError: Invalid id length",
}, },
{ // should return OK with nonexistent file { // should return OK with nonexistent file
id: "22222222222222222223", id: "22222222222222222223",
message: "OK", message: "OK",
err: "", err: "",
}, },
@ -351,7 +351,7 @@ func TestDelete(t *testing.T) {
return return
} }
defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id))
defer pstore.Delete(testId, s.PieceStoreDir) defer pstore.Delete(testId, s.PieceStoreDir)

View File

@ -1,3 +1,6 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: storj.io/storj/protos/piecestore (interfaces: PieceStoreRoutesClient,PieceStoreRoutes_RetrieveClient) // Source: storj.io/storj/protos/piecestore (interfaces: PieceStoreRoutesClient,PieceStoreRoutes_RetrieveClient)

View File

@ -8,39 +8,101 @@ import (
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/storj/storage"
)
type boltClient struct {
logger *zap.Logger
db *bolt.DB
Path string
Bucket []byte
}
const (
// fileMode sets permissions so owner can read and write
fileMode = 0600
// PointerBucket is the string representing the bucket used for `PointerEntries`
PointerBucket = "pointers"
// OverlayBucket is the string representing the bucket used for a bolt-backed overlay dht cache
OverlayBucket = "overlay"
) )
var ( var (
defaultTimeout = 1 * time.Second defaultTimeout = 1 * time.Second
) )
const ( // NewClient instantiates a new BoltDB client given a zap logger, db file path, and a bucket name
// fileMode sets permissions so owner can read and write func NewClient(logger *zap.Logger, path, bucket string) (storage.KeyValueStore, error) {
fileMode = 0600
)
// Client is the storage interface for the Bolt database
type Client struct {
logger *zap.Logger
db *bolt.DB
Path string
}
// New instantiates a new BoltDB client
func New(logger *zap.Logger, path string) (*Client, error) {
db, err := bolt.Open(path, fileMode, &bolt.Options{Timeout: defaultTimeout}) db, err := bolt.Open(path, fileMode, &bolt.Options{Timeout: defaultTimeout})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Client{ return &boltClient{
logger: logger, logger: logger,
db: db, db: db,
Path: path, Path: path,
Bucket: []byte(bucket),
}, nil }, nil
} }
// Put adds a value to the provided key in boltdb, returning an error on failure.
func (c *boltClient) Put(key storage.Key, value storage.Value) error {
c.logger.Debug("entering bolt put")
return c.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(c.Bucket)
if err != nil {
return err
}
return b.Put(key, value)
})
}
// Get looks up the provided key from boltdb returning either an error or the result.
func (c *boltClient) Get(pathKey storage.Key) (storage.Value, error) {
c.logger.Debug("entering bolt get: " + string(pathKey))
var pointerBytes []byte
err := c.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(c.Bucket)
v := b.Get(pathKey)
if v == nil {
return Error.New("pointer at %#v not found", string(pathKey))
}
pointerBytes = v
return nil
})
return pointerBytes, err
}
// List returns either a list of keys for which boltdb has values or an error.
func (c *boltClient) List() (storage.Keys, error) {
c.logger.Debug("entering bolt list")
var paths storage.Keys
err := c.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(c.Bucket)
err := b.ForEach(func(key, value []byte) error {
paths = append(paths, key)
return nil
})
return err
})
return paths, err
}
// Delete deletes a key/value pair from boltdb, for a given the key
func (c *boltClient) Delete(pathKey storage.Key) error {
c.logger.Debug("entering bolt delete: " + string(pathKey))
return c.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(c.Bucket).Delete(pathKey)
})
}
// Close closes a BoltDB client // Close closes a BoltDB client
func (c *Client) Close() error { func (c *boltClient) Close() error {
return c.db.Close() return c.db.Close()
} }

View File

@ -10,6 +10,7 @@ import (
"testing" "testing"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/storj/pkg/netstate"
) )
func tempfile() string { func tempfile() string {
@ -27,27 +28,30 @@ func tempfile() string {
func TestNetState(t *testing.T) { func TestNetState(t *testing.T) {
logger, _ := zap.NewDevelopment() logger, _ := zap.NewDevelopment()
c, err := New(logger, tempfile()) c, err := NewClient(logger, tempfile(), "test_bucket")
if err != nil { if err != nil {
t.Error("Failed to create test db") t.Error("Failed to create test db")
} }
defer func() { defer func() {
c.Close() c.Close()
os.Remove(c.Path) switch client := c.(type) {
case *boltClient:
os.Remove(client.Path)
}
}() }()
testEntry1 := PointerEntry{ testEntry1 := netstate.PointerEntry{
Path: []byte(`test/path`), Path: []byte(`test/path`),
Pointer: []byte(`pointer1`), Pointer: []byte(`pointer1`),
} }
testEntry2 := PointerEntry{ testEntry2 := netstate.PointerEntry{
Path: []byte(`test/path2`), Path: []byte(`test/path2`),
Pointer: []byte(`pointer2`), Pointer: []byte(`pointer2`),
} }
// tests Put function // tests Put function
if err := c.Put(testEntry1); err != nil { if err := c.Put(testEntry1.Path, testEntry1.Pointer); err != nil {
t.Error("Failed to save testFile to pointers bucket") t.Error("Failed to save testFile to pointers bucket")
} }
@ -66,7 +70,7 @@ func TestNetState(t *testing.T) {
} }
// tests List function // tests List function
if err := c.Put(testEntry2); err != nil { if err := c.Put(testEntry2.Path, testEntry2.Pointer); err != nil {
t.Error("Failed to put testEntry2 to pointers bucket") t.Error("Failed to put testEntry2 to pointers bucket")
} }
testPaths, err := c.List() testPaths, err := c.List()

View File

@ -1,75 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package boltdb
import (
"github.com/boltdb/bolt"
)
// PointerEntry - Path and Pointer are saved as a kv pair to boltdb.
// The following boltdb methods handle the pointer type (defined in
// the protobuf file) after it has been marshalled into bytes.
type PointerEntry struct {
Path []byte
Pointer []byte
}
const (
pointerBucket = "pointers"
)
// Put saves the Path and Pointer as a kv entry in the "pointers" bucket
func (client *Client) Put(pe PointerEntry) error {
client.logger.Debug("entering bolt put")
return client.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(pointerBucket))
if err != nil {
return err
}
return b.Put(pe.Path, pe.Pointer)
})
}
// Get retrieves the Pointer value stored at the Path key
func (client *Client) Get(pathKey []byte) ([]byte, error) {
client.logger.Debug("entering bolt get: " + string(pathKey))
var pointerBytes []byte
err := client.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(pointerBucket))
v := b.Get(pathKey)
if v == nil {
return Error.New("pointer at %#v not found", string(pathKey))
}
pointerBytes = v
return nil
})
return pointerBytes, err
}
// List creates a byte array of all path keys in in the "pointers" bucket
func (client *Client) List() ([][]byte, error) {
client.logger.Debug("entering bolt list")
var paths [][]byte
err := client.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(pointerBucket))
err := b.ForEach(func(key, value []byte) error {
paths = append(paths, key)
return nil
})
return err
})
return paths, err
}
// Delete deletes a kv pair from the "pointers" bucket, given the Path key
func (client *Client) Delete(pathKey []byte) error {
client.logger.Debug("entering bolt delete: " + string(pathKey))
return client.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket([]byte(pointerBucket)).Delete(pathKey)
})
}

49
storage/common.go Normal file
View File

@ -0,0 +1,49 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package storage
// Key is the type for the keys in a `KeyValueStore`
type Key []byte
// Value is the type for the values in a `ValueValueStore`
type Value []byte
// Keys is the type for a slice of keys in a `KeyValueStore`
type Keys []Key
// KeyValueStore is an interface describing key/value stores like redis and boltdb
type KeyValueStore interface {
// Put adds a value to the provided key in the KeyValueStore, returning an error on failure.
Put(Key, Value) error
Get(Key) (Value, error)
List() (Keys, error)
Delete(Key) error
Close() error
}
// MarshalBinary implements the encoding.BinaryMarshaler interface for the Value type
func (v *Value) MarshalBinary() (_ []byte, _ error) {
return *v, nil
}
// MarshalBinary implements the encoding.BinaryMarshaler interface for the Key type
func (k *Key) MarshalBinary() (_ []byte, _ error) {
return *k, nil
}
// ByteSlices converts a `Keys` struct to a slice of byte-slices (i.e. `[][]byte`)
func (k *Keys) ByteSlices() [][]byte {
result := make([][]byte, len(*k))
for _k, v := range *k {
result[_k] = []byte(v)
}
return result
}
// String implements the Stringer interface
func (k *Key) String() string {
return string(*k)
}

View File

@ -7,50 +7,74 @@ import (
"time" "time"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/zeebo/errs"
"storj.io/storj/storage"
) )
// Client defines the interface for communicating with a Storj redis instance const defaultNodeExpiration = 61 * time.Minute
type Client interface {
Get(key string) ([]byte, error)
Set(key string, value []byte, ttl time.Duration) error
Ping() error
}
// Client is the entrypoint into Redis // redisClient is the entrypoint into Redis
type redisClient struct { type redisClient struct {
DB *redis.Client db *redis.Client
TTL time.Duration
} }
// NewRedisClient returns a configured Client instance, verifying a sucessful connection to redis // NewClient returns a configured Client instance, verifying a sucessful connection to redis
func NewRedisClient(address, password string, db int) (Client, error) { func NewClient(address, password string, db int) (storage.KeyValueStore, error) {
c := &redisClient{ c := &redisClient{
DB: redis.NewClient(&redis.Options{ db: redis.NewClient(&redis.Options{
Addr: address, Addr: address,
Password: password, Password: password,
DB: db, DB: db,
}), }),
TTL: defaultNodeExpiration,
} }
// ping here to verify we are able to connect to the redis instacne with the initialized client. // ping here to verify we are able to connect to redis with the initialized client.
if err := c.DB.Ping().Err(); err != nil { if err := c.db.Ping().Err(); err != nil {
return nil, err return nil, err
} }
return c, nil return c, nil
} }
// Get looks up the provided key from the redis cache returning either an error or the result. // Get looks up the provided key from redis returning either an error or the result.
func (c *redisClient) Get(key string) ([]byte, error) { func (c *redisClient) Get(key storage.Key) (storage.Value, error) {
return c.DB.Get(key).Bytes() return c.db.Get(string(key)).Bytes()
} }
// Set adds a value to the provided key in the Redis cache, returning an error on failure. // Put adds a value to the provided key in redis, returning an error on failure.
func (c *redisClient) Put(key storage.Key, value storage.Value) error {
v, err := value.MarshalBinary()
func (c *redisClient) Set(key string, value []byte, ttl time.Duration) error { if err != nil {
return c.DB.Set(key, value, ttl).Err() return err
}
return c.db.Set(key.String(), v, c.TTL).Err()
} }
// Ping returns an error if pinging the underlying redis server failed // List returns either a list of keys for which boltdb has values or an error.
func (c *redisClient) Ping() error { func (c *redisClient) List() (_ storage.Keys, _ error) {
return c.DB.Ping().Err() results, err := c.db.Keys("*").Result()
if err != nil {
return nil, errs.Wrap(err)
}
keys := make(storage.Keys, len(results))
for i, k := range results {
keys[i] = storage.Key(k)
}
return keys, nil
}
// Delete deletes a key/value pair from redis, for a given the key
func (c *redisClient) Delete(key storage.Key) error {
return c.db.Del(key.String()).Err()
}
// Close closes a redis client
func (c *redisClient) Close() error {
return c.db.Close()
} }

View File

@ -1,55 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package redis
import (
"errors"
"time"
)
type mockRedisClient struct {
data map[string][]byte
getCalled int
setCalled int
pingCalled int
}
// ErrMissingKey is the error returned if a key is not in the mock store
var ErrMissingKey = errors.New("missing")
// ErrForced is the error returned when the forced error flag is passed to mock an error
var ErrForced = errors.New("error forced by using 'error' key in mock")
func (m *mockRedisClient) Get(key string) ([]byte, error) {
m.getCalled++
if key == "error" {
return []byte{}, ErrForced
}
v, ok := m.data[key]
if !ok {
return []byte{}, ErrMissingKey
}
return v, nil
}
func (m *mockRedisClient) Set(key string, value []byte, ttl time.Duration) error {
m.setCalled++
m.data[key] = value
return nil
}
func (m *mockRedisClient) Ping() error {
m.pingCalled++
return nil
}
func newMockRedisClient(d map[string][]byte) *mockRedisClient {
return &mockRedisClient{
data: d,
getCalled: 0,
setCalled: 0,
pingCalled: 0,
}
}

View File

@ -1,118 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package redis
import (
"context"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"storj.io/storj/protos/overlay"
)
func TestGet(t *testing.T) {
cases := []struct {
testID string
expectedTimesCalled int
key string
expectedResponse *overlay.NodeAddress
expectedError error
client *mockRedisClient
}{
{
testID: "valid Get",
expectedTimesCalled: 1,
key: "foo",
expectedResponse: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"},
expectedError: nil,
client: newMockRedisClient(map[string][]byte{"foo": func() []byte {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
d, err := proto.Marshal(na)
assert.NoError(t, err)
return d
}()}),
},
{
testID: "error Get from redis",
expectedTimesCalled: 1,
key: "error",
expectedResponse: nil,
expectedError: ErrForced,
client: newMockRedisClient(map[string][]byte{"error": func() []byte {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
d, err := proto.Marshal(na)
assert.NoError(t, err)
return d
}()}),
},
{
testID: "get missing key",
expectedTimesCalled: 1,
key: "bar",
expectedResponse: nil,
expectedError: ErrMissingKey,
client: newMockRedisClient(map[string][]byte{"foo": func() []byte {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}
d, err := proto.Marshal(na)
assert.NoError(t, err)
return d
}()}),
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
oc := OverlayClient{DB: c.client}
assert.Equal(t, 0, c.client.getCalled)
resp, err := oc.Get(context.Background(), c.key)
assert.Equal(t, c.expectedError, err)
assert.Equal(t, c.expectedResponse, resp)
assert.Equal(t, c.expectedTimesCalled, c.client.getCalled)
})
}
}
func TestSet(t *testing.T) {
cases := []struct {
testID string
expectedTimesCalled int
key string
value overlay.NodeAddress
expectedError error
client *mockRedisClient
}{
{
testID: "valid Set",
expectedTimesCalled: 1,
key: "foo",
value: overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"},
expectedError: nil,
client: newMockRedisClient(map[string][]byte{}),
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
oc := OverlayClient{DB: c.client}
assert.Equal(t, 0, c.client.setCalled)
err := oc.Set(c.key, c.value)
assert.Equal(t, c.expectedError, err)
assert.Equal(t, c.expectedTimesCalled, c.client.setCalled)
v := c.client.data[c.key]
na := &overlay.NodeAddress{}
assert.NoError(t, proto.Unmarshal(v, na))
assert.Equal(t, na, &c.value)
})
}
}