diff --git a/Makefile b/Makefile index 6d702291a..83d0cf5c7 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,10 @@ check-copyrights: @echo "Running ${@}" @./scripts/check-for-header.sh +# Applies goimports to every go file (excluding vendored files) +goimports-fix: + goimports -w $$(find . -type f -name '*.go' -not -path "*/vendor/*") + proto: @echo "Running ${@}" ./scripts/build-protos.sh diff --git a/cmd/netstate/main.go b/cmd/netstate/main.go index 06d7a4777..4d672280f 100644 --- a/cmd/netstate/main.go +++ b/cmd/netstate/main.go @@ -26,7 +26,7 @@ var ( ) func (s *serv) Process(ctx context.Context) error { - bdb, err := boltdb.New(s.logger, *dbPath) + bdb, err := boltdb.NewClient(s.logger, *dbPath, boltdb.PointerBucket) if err != nil { return err diff --git a/internal/test/util.go b/internal/test/util.go new file mode 100644 index 000000000..7c91840f7 --- /dev/null +++ b/internal/test/util.go @@ -0,0 +1,196 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package test + +import ( + "crypto/rand" + "encoding/hex" + "flag" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/zeebo/errs" + + "storj.io/storj/storage" +) + +// KvStore is an in-memory, crappy key/value store type for testing +type KvStore map[string]storage.Value + +// MockKeyValueStore is a `KeyValueStore` type used for testing (see storj.io/storj/storage/common.go) +type MockKeyValueStore struct { + Data KvStore + GetCalled int + PutCalled int + ListCalled int + DeleteCalled int + CloseCalled int + PingCalled int +} + +// RedisDone is a function type that describes the callback returned by `EnsureRedis` +type RedisDone func() + +// RedisServer is a struct which holds and manages the state of a `redis-server` process +type RedisServer struct { + cmd *exec.Cmd + started bool +} + +var ( + // ErrMissingKey is the error returned if a key is not in the mock store + ErrMissingKey = errs.New("missing") + + // ErrForced is the error returned when the forced error flag is passed to mock an error + ErrForced = errs.New("error forced by using 'error' key in mock") + + redisRefs = map[string]bool{} + testRedis = &RedisServer{ + started: false, + } +) + +// Get looks up the provided key from the MockKeyValueStore returning either an error or the result. +func (m *MockKeyValueStore) Get(key storage.Key) (storage.Value, error) { + m.GetCalled++ + if key.String() == "error" { + return storage.Value{}, ErrForced + } + v, ok := m.Data[key.String()] + if !ok { + return storage.Value{}, ErrMissingKey + } + + return v, nil +} + +// Put adds a value to the provided key in the MockKeyValueStore, returning an error on failure. +func (m *MockKeyValueStore) Put(key storage.Key, value storage.Value) error { + m.PutCalled++ + m.Data[key.String()] = value + return nil +} + +// Delete deletes a key/value pair from the MockKeyValueStore, for a given the key +func (m *MockKeyValueStore) Delete(key storage.Key) error { + m.DeleteCalled++ + delete(m.Data, key.String()) + return nil +} + +// List returns either a list of keys for which the MockKeyValueStore has values or an error. +func (m *MockKeyValueStore) List() (_ storage.Keys, _ error) { + m.ListCalled++ + keys := storage.Keys{} + for k := range m.Data { + keys = append(keys, storage.Key(k)) + } + + return keys, nil +} + +// Close closes the client +func (m *MockKeyValueStore) Close() error { + m.CloseCalled++ + return nil +} + +// Ping is called by some redis client code +func (m *MockKeyValueStore) Ping() error { + m.PingCalled++ + return nil +} + +// NewMockKeyValueStore returns a mocked `KeyValueStore` implementation for testing +func NewMockKeyValueStore(d KvStore) *MockKeyValueStore { + return &MockKeyValueStore{ + Data: d, + GetCalled: 0, + PutCalled: 0, + ListCalled: 0, + DeleteCalled: 0, + CloseCalled: 0, + PingCalled: 0, + } +} + +// EnsureRedis attempts to start the `redis-server` binary +func EnsureRedis(t *testing.T) (_ RedisDone) { + flag.Set("redisAddress", "127.0.0.1:6379") + + index, _ := randomHex(5) + redisRefs[index] = true + + if testRedis.started != true { + testRedis.start(t) + } + + return func() { + if v := recover(); v != nil { + testRedis.stop() + panic(v) + } + + redisRefs[index] = false + + if !(redisRefCount() > 0) { + testRedis.stop() + } + } +} + +func redisRefCount() (_ int) { + count := 0 + for _, ref := range redisRefs { + if ref { + count++ + } + } + + return count +} + +func (r *RedisServer) start(t *testing.T) { + r.cmd = &exec.Cmd{} + cmd := r.cmd + + logPath, err := filepath.Abs("test_redis-server.log") + assert.NoError(t, err) + + binPath, err := exec.LookPath("redis-server") + assert.NoError(t, err) + + log, err := os.Create(logPath) + assert.NoError(t, err) + + cmd.Path = binPath + cmd.Stdout = log + + go func() { + r.started = true + + if err := cmd.Run(); err != nil { + // TODO(bryanchriswhite) error checking + } + }() + + time.Sleep(2 * time.Second) +} + +func (r *RedisServer) stop() { + r.started = false + r.cmd.Process.Kill() +} + +func randomHex(n int) (string, error) { + bytes := make([]byte, n) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} diff --git a/pkg/netstate/client_test.go b/pkg/netstate/client_test.go index c295ad818..d6d5c18ff 100644 --- a/pkg/netstate/client_test.go +++ b/pkg/netstate/client_test.go @@ -16,6 +16,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "storj.io/storj/internal/test" pb "storj.io/storj/protos/netstate" ) @@ -25,9 +27,7 @@ func TestNetStateClient(t *testing.T) { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 9000)) assert.NoError(t, err) - mdb := &MockDB{ - timesCalled: 0, - } + mdb := test.NewMockKeyValueStore(test.KvStore{}) grpcServer := grpc.NewServer() pb.RegisterNetStateServer(grpcServer, NewServer(mdb, logger)) @@ -57,31 +57,23 @@ func TestNetStateClient(t *testing.T) { APIKey: []byte("abc123"), } - if mdb.timesCalled != 0 { - t.Error("Expected mockdb to be called 0 times") - } - // Tests Server.Put _, err = c.Put(ctx, &pr1) if err != nil || status.Code(err) == codes.Internal { t.Error("Failed to Put") } - if mdb.timesCalled != 1 { + if mdb.PutCalled != 1 { t.Error("Failed to call mockdb correctly") } - if !bytes.Equal(mdb.puts[0].Path, pr1.Path) { - t.Error("Expected saved path to equal given path") - } - pointerBytes, err := proto.Marshal(pr1.Pointer) if err != nil { t.Error("failed to marshal test pointer") } - if !bytes.Equal(mdb.puts[0].Pointer, pointerBytes) { - t.Error("Expected saved value to equal given value") + if !bytes.Equal(mdb.Data[string(pr1.Path)], pointerBytes) { + t.Error("Expected saved pointer to equal given pointer") } // Tests Server.Get @@ -97,7 +89,7 @@ func TestNetStateClient(t *testing.T) { t.Error("Expected to get same content that was put") } - if mdb.timesCalled != 2 { + if mdb.GetCalled != 1 { t.Error("Failed to call mockdb correct number of times") } @@ -120,7 +112,7 @@ func TestNetStateClient(t *testing.T) { t.Error("Failed to Put") } - if mdb.timesCalled != 3 { + if mdb.PutCalled != 2 { t.Error("Failed to call mockdb correct number of times") } @@ -135,7 +127,7 @@ func TestNetStateClient(t *testing.T) { t.Error("Failed to delete") } - if mdb.timesCalled != 4 { + if mdb.DeleteCalled != 1 { t.Error("Failed to call mockdb correct number of times") } @@ -157,7 +149,7 @@ func TestNetStateClient(t *testing.T) { t.Error("Failed to list correct file path") } - if mdb.timesCalled != 5 { + if mdb.ListCalled != 1 { t.Error("Failed to call mockdb correct number of times") } } diff --git a/pkg/netstate/netstate.go b/pkg/netstate/netstate.go index ebdd8562b..53e808510 100644 --- a/pkg/netstate/netstate.go +++ b/pkg/netstate/netstate.go @@ -14,33 +14,29 @@ import ( "storj.io/storj/netstate/auth" pb "storj.io/storj/protos/netstate" - "storj.io/storj/storage/boltdb" + "storj.io/storj/storage" ) +// PointerEntry - Path and Pointer are saved as a key/value pair to a `storage.KeyValueStore`. +type PointerEntry struct { + Path []byte + Pointer []byte +} + // Server implements the network state RPC service type Server struct { - DB DB + DB storage.KeyValueStore logger *zap.Logger } // NewServer creates instance of Server -func NewServer(db DB, logger *zap.Logger) *Server { +func NewServer(db storage.KeyValueStore, logger *zap.Logger) *Server { return &Server{ DB: db, logger: logger, } } -// DB interface allows more modular unit testing -// and makes it easier in the future to substitute -// db clients other than bolt -type DB interface { - Put(boltdb.PointerEntry) error - Get([]byte) ([]byte, error) - List() ([][]byte, error) - Delete([]byte) error -} - func (s *Server) validateAuth(APIKeyBytes []byte) error { if !auth.ValidateAPIKey(string(APIKeyBytes)) { s.logger.Error("unauthorized request: ", zap.Error(grpc.Errorf(codes.Unauthenticated, "Invalid API credential"))) @@ -64,12 +60,12 @@ func (s *Server) Put(ctx context.Context, putReq *pb.PutRequest) (*pb.PutRespons return nil, status.Errorf(codes.Internal, err.Error()) } - pe := boltdb.PointerEntry{ + pe := PointerEntry{ Path: putReq.Path, Pointer: pointerBytes, } - if err := s.DB.Put(pe); err != nil { + if err := s.DB.Put(pe.Path, pe.Pointer); err != nil { s.logger.Error("err putting pointer", zap.Error(err)) return nil, status.Errorf(codes.Internal, err.Error()) } @@ -118,7 +114,7 @@ func (s *Server) List(ctx context.Context, req *pb.ListRequest) (*pb.ListRespons s.logger.Debug("path keys retrieved") return &pb.ListResponse{ // pathKeys is an array of byte arrays - Paths: pathKeys, + Paths: pathKeys.ByteSlices(), }, nil } diff --git a/pkg/netstate/server_test.go b/pkg/netstate/server_test.go index 6a07dd252..f1f667f47 100644 --- a/pkg/netstate/server_test.go +++ b/pkg/netstate/server_test.go @@ -4,13 +4,10 @@ package netstate import ( - "bytes" "os" "testing" "github.com/spf13/viper" - - "storj.io/storj/storage/boltdb" ) const ( @@ -23,48 +20,3 @@ func TestMain(m *testing.M) { viper.AutomaticEnv() os.Exit(m.Run()) } - -type MockDB struct { - timesCalled int - puts []boltdb.PointerEntry - pathKeys [][]byte -} - -func (m *MockDB) Put(f boltdb.PointerEntry) error { - m.timesCalled++ - m.puts = append(m.puts, f) - return nil -} - -func (m *MockDB) Get(path []byte) ([]byte, error) { - m.timesCalled++ - - for _, pointerEntry := range m.puts { - if bytes.Equal(path, pointerEntry.Path) { - return pointerEntry.Pointer, nil - } - } - panic("failed to get the given file") -} - -func (m *MockDB) List() ([][]byte, error) { - m.timesCalled++ - - for _, putReq := range m.puts { - m.pathKeys = append(m.pathKeys, putReq.Path) - } - - return m.pathKeys, nil -} - -func (m *MockDB) Delete(path []byte) error { - m.timesCalled++ - - for i, pointerEntry := range m.puts { - if bytes.Equal(path, pointerEntry.Path) { - m.puts = append(m.puts[:i], m.puts[i+1:]...) - } - } - - return nil -} diff --git a/storage/redis/overlay.go b/pkg/overlay/cache.go similarity index 62% rename from storage/redis/overlay.go rename to pkg/overlay/cache.go index 5d4a5b142..7c99a5709 100644 --- a/storage/redis/overlay.go +++ b/pkg/overlay/cache.go @@ -1,47 +1,60 @@ // Copyright (C) 2018 Storj Labs, Inc. // See LICENSE for copying information. -package redis +package overlay import ( "context" - "errors" "fmt" - "time" "github.com/gogo/protobuf/proto" + "github.com/zeebo/errs" "storj.io/storj/pkg/kademlia" "storj.io/storj/protos/overlay" + "storj.io/storj/storage" + "storj.io/storj/storage/boltdb" + "storj.io/storj/storage/redis" ) -const defaultNodeExpiration = 61 * time.Minute - // ErrNodeNotFound standardizes errors here -var ErrNodeNotFound = errors.New("Node not found") +var ErrNodeNotFound = errs.New("Node not found") -// OverlayClient is used to store overlay data in Redis -type OverlayClient struct { - DB Client +// Cache is used to store overlay data in Redis +type Cache struct { + DB storage.KeyValueStore DHT kademlia.DHT } -// NewOverlayClient returns a pointer to a new OverlayClient instance with an initalized connection to Redis. -func NewOverlayClient(address, password string, db int, DHT kademlia.DHT) (*OverlayClient, error) { - rc, err := NewRedisClient(address, password, db) +// NewRedisOverlayCache returns a pointer to a new Cache instance with an initalized connection to Redis. +func NewRedisOverlayCache(address, password string, db int, DHT kademlia.DHT) (*Cache, error) { + rc, err := redis.NewClient(address, password, db) if err != nil { return nil, err } - return &OverlayClient{ + return &Cache{ DB: rc, DHT: DHT, }, nil } +// NewBoltOverlayCache returns a pointer to a new Cache instance with an initalized connection to a Bolt db. +func NewBoltOverlayCache(dbPath string, DHT kademlia.DHT) (*Cache, error) { + bc, err := boltdb.NewClient(nil, dbPath, boltdb.OverlayBucket) + if err != nil { + return nil, err + } + + return &Cache{ + DB: bc, + DHT: DHT, + }, nil +} + // Get looks up the provided nodeID from the redis cache -func (o *OverlayClient) Get(ctx context.Context, key string) (*overlay.NodeAddress, error) { - b, err := o.DB.Get(key) +func (o *Cache) Get(ctx context.Context, key string) (*overlay.NodeAddress, error) { + b, err := o.DB.Get([]byte(key)) if err != nil { return nil, err } @@ -54,18 +67,18 @@ func (o *OverlayClient) Get(ctx context.Context, key string) (*overlay.NodeAddre return na, nil } -// Set adds a nodeID to the redis cache with a binary representation of proto defined NodeAddress -func (o *OverlayClient) Set(nodeID string, value overlay.NodeAddress) error { +// Put adds a nodeID to the redis cache with a binary representation of proto defined NodeAddress +func (o *Cache) Put(nodeID string, value overlay.NodeAddress) error { data, err := proto.Marshal(&value) if err != nil { return err } - return o.DB.Set(nodeID, data, defaultNodeExpiration) + return o.DB.Put([]byte(nodeID), []byte(data)) } // Bootstrap walks the initialized network and populates the cache -func (o *OverlayClient) Bootstrap(ctx context.Context) error { +func (o *Cache) Bootstrap(ctx context.Context) error { fmt.Println("bootstrapping cache") nodes, err := o.DHT.GetNodes(ctx, "0", 1280) @@ -75,7 +88,7 @@ func (o *OverlayClient) Bootstrap(ctx context.Context) error { fmt.Println("could not find node in network", err, v.Id) } addr, err := proto.Marshal(found.Address) - o.DB.Set(found.Id, addr, defaultNodeExpiration) + o.DB.Put([]byte(found.Id), addr) } // called after kademlia is bootstrapped // needs to take RoutingTable and start to persist it into the cache @@ -96,7 +109,7 @@ func (o *OverlayClient) Bootstrap(ctx context.Context) error { } // Refresh walks the network looking for new nodes and pings existing nodes to eliminate stale addresses -func (o *OverlayClient) Refresh(ctx context.Context) error { +func (o *Cache) Refresh(ctx context.Context) error { // iterate over all nodes // compare responses to find new nodes // listen for responses from existing nodes @@ -118,7 +131,7 @@ func (o *OverlayClient) Refresh(ctx context.Context) error { } // Walk iterates over buckets to traverse the network -func (o *OverlayClient) Walk(ctx context.Context) error { +func (o *Cache) Walk(ctx context.Context) error { nodes, err := o.DHT.GetNodes(ctx, "0", 128) if err != nil { return err diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go new file mode 100644 index 000000000..5156bb914 --- /dev/null +++ b/pkg/overlay/cache_test.go @@ -0,0 +1,302 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package overlay + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/zeebo/errs" + + "storj.io/storj/internal/test" + "storj.io/storj/pkg/utils" + "storj.io/storj/protos/overlay" + "storj.io/storj/storage" + "storj.io/storj/storage/boltdb" + "storj.io/storj/storage/redis" +) + +type dbClient int +type responses map[dbClient]*overlay.NodeAddress +type errors map[dbClient]error + +const ( + mock dbClient = iota + bolt + _redis +) + +var ( + getCases = []struct { + testID string + expectedTimesCalled int + key string + expectedResponses responses + expectedErrors errors + data test.KvStore + }{ + { + testID: "valid Get", + expectedTimesCalled: 1, + key: "foo", + expectedResponses: func() responses { + na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} + return responses{ + mock: na, + bolt: na, + _redis: na, + } + }(), + expectedErrors: errors{ + mock: nil, + bolt: nil, + _redis: nil, + }, + data: test.KvStore{"foo": func() storage.Value { + na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} + d, err := proto.Marshal(na) + if err != nil { + panic(err) + } + return d + }()}, + }, + { + testID: "forced get error", + expectedTimesCalled: 1, + key: "error", + expectedResponses: func() responses { + na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} + return responses{ + mock: nil, + bolt: na, + _redis: na, + } + }(), + expectedErrors: errors{ + mock: test.ErrForced, + bolt: nil, + _redis: nil, + }, + data: test.KvStore{"error": func() storage.Value { + na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} + d, err := proto.Marshal(na) + if err != nil { + panic(err) + } + return d + }()}, + }, + { + testID: "get missing key", + expectedTimesCalled: 1, + key: "bar", + expectedResponses: responses{ + mock: nil, + bolt: nil, + _redis: nil, + }, + // TODO(bryanchriswhite): compare actual errors + expectedErrors: errors{ + mock: test.ErrMissingKey, + bolt: errs.New("boltdb error"), + _redis: errs.New("redis error"), + }, + data: test.KvStore{"foo": func() storage.Value { + na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} + d, err := proto.Marshal(na) + if err != nil { + panic(err) + } + return d + }()}, + }, + } + + putCases = []struct { + testID string + expectedTimesCalled int + key string + value overlay.NodeAddress + expectedErrors errors + data test.KvStore + }{ + { + testID: "valid Put", + expectedTimesCalled: 1, + key: "foo", + value: overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}, + expectedErrors: errors{ + mock: nil, + bolt: nil, + _redis: nil, + }, + data: test.KvStore{}, + }, + } +) + +func redisTestClient(t *testing.T, data test.KvStore) storage.KeyValueStore { + client, err := redis.NewClient("127.0.0.1:6379", "", 1) + assert.NoError(t, err) + + populateStorage(t, client, data) + + return client +} + +func boltTestClient(t *testing.T, data test.KvStore) (_ storage.KeyValueStore, _ func()) { + boltPath, err := filepath.Abs("test_bolt.db") + assert.NoError(t, err) + + logger, err := utils.NewLogger("dev") + assert.NoError(t, err) + + client, err := boltdb.NewClient(logger, boltPath, "testBoltdb") + assert.NoError(t, err) + + cleanup := func() { + err := os.Remove(boltPath) + assert.NoError(t, err) + } + + populateStorage(t, client, data) + + return client, cleanup +} + +func populateStorage(t *testing.T, client storage.KeyValueStore, data test.KvStore) { + for k, v := range data { + err := client.Put(storage.Key(k), v) + assert.NoError(t, err) + } +} + +func TestRedisGet(t *testing.T) { + done := test.EnsureRedis(t) + defer done() + + for _, c := range getCases { + t.Run(c.testID, func(t *testing.T) { + db := redisTestClient(t, c.data) + oc := Cache{DB: db} + + resp, err := oc.Get(context.Background(), c.key) + if expectedErr := c.expectedErrors[_redis]; expectedErr != nil { + assert.Error(t, err) + } else { + assert.Equal(t, expectedErr, err) + } + assert.Equal(t, c.expectedResponses[_redis], resp) + }) + } +} + +func TestRedisPut(t *testing.T) { + done := test.EnsureRedis(t) + defer done() + + for _, c := range putCases { + t.Run(c.testID, func(t *testing.T) { + db, cleanup := boltTestClient(t, c.data) + defer cleanup() + + oc := Cache{DB: db} + + err := oc.Put(c.key, c.value) + assert.Equal(t, c.expectedErrors[_redis], err) + + v, err := db.Get([]byte(c.key)) + assert.NoError(t, err) + na := &overlay.NodeAddress{} + + assert.NoError(t, proto.Unmarshal(v, na)) + assert.Equal(t, na, &c.value) + }) + } +} + +func TestBoltGet(t *testing.T) { + for _, c := range getCases { + t.Run(c.testID, func(t *testing.T) { + db, cleanup := boltTestClient(t, c.data) + defer cleanup() + + oc := Cache{DB: db} + + resp, err := oc.Get(context.Background(), c.key) + if expectedErr := c.expectedErrors[bolt]; expectedErr != nil { + assert.Error(t, err) + } else { + assert.Equal(t, expectedErr, err) + } + assert.Equal(t, c.expectedResponses[bolt], resp) + + }) + } +} + +func TestBoltPut(t *testing.T) { + for _, c := range putCases { + t.Run(c.testID, func(t *testing.T) { + db, cleanup := boltTestClient(t, c.data) + defer cleanup() + + oc := Cache{DB: db} + + err := oc.Put(c.key, c.value) + assert.Equal(t, c.expectedErrors[_redis], err) + + v, err := db.Get([]byte(c.key)) + assert.NoError(t, err) + na := &overlay.NodeAddress{} + + assert.NoError(t, proto.Unmarshal(v, na)) + assert.Equal(t, na, &c.value) + }) + } +} + +func TestMockGet(t *testing.T) { + for _, c := range getCases { + t.Run(c.testID, func(t *testing.T) { + + db := test.NewMockKeyValueStore(c.data) + oc := Cache{DB: db} + + assert.Equal(t, 0, db.GetCalled) + + resp, err := oc.Get(context.Background(), c.key) + assert.Equal(t, c.expectedErrors[mock], err) + assert.Equal(t, c.expectedResponses[mock], resp) + assert.Equal(t, c.expectedTimesCalled, db.GetCalled) + }) + } +} + +func TestMockPut(t *testing.T) { + for _, c := range putCases { + t.Run(c.testID, func(t *testing.T) { + + db := test.NewMockKeyValueStore(c.data) + oc := Cache{DB: db} + + assert.Equal(t, 0, db.PutCalled) + + err := oc.Put(c.key, c.value) + assert.Equal(t, c.expectedErrors[mock], err) + assert.Equal(t, c.expectedTimesCalled, db.PutCalled) + + v := db.Data[c.key] + na := &overlay.NodeAddress{} + + assert.NoError(t, proto.Unmarshal(v, na)) + assert.Equal(t, na, &c.value) + }) + } +} diff --git a/pkg/overlay/overlay.go b/pkg/overlay/overlay.go index e0aa1d0c1..2a2c7f3a2 100644 --- a/pkg/overlay/overlay.go +++ b/pkg/overlay/overlay.go @@ -7,23 +7,22 @@ import ( "context" "go.uber.org/zap" - monkit "gopkg.in/spacemonkeygo/monkit.v2" + "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/kademlia" proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package - "storj.io/storj/storage/redis" ) // Overlay implements our overlay RPC service type Overlay struct { kad *kademlia.Kademlia - DB *redis.OverlayClient + cache *Cache logger *zap.Logger metrics *monkit.Registry } // Lookup finds the address of a node in our overlay network func (o *Overlay) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.LookupResponse, error) { - na, err := o.DB.Get(ctx, req.NodeID) + na, err := o.cache.Get(ctx, req.NodeID) if err != nil { o.logger.Error("Error looking up node", zap.Error(err), zap.String("nodeID", req.NodeID)) return nil, err diff --git a/pkg/overlay/service.go b/pkg/overlay/service.go index 4b35111fc..3716c0c58 100644 --- a/pkg/overlay/service.go +++ b/pkg/overlay/service.go @@ -12,11 +12,10 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" - monkit "gopkg.in/spacemonkeygo/monkit.v2" + "gopkg.in/spacemonkeygo/monkit.v2" "storj.io/storj/pkg/kademlia" proto "storj.io/storj/protos/overlay" - "storj.io/storj/storage/redis" ) var ( @@ -34,15 +33,14 @@ func init() { flag.StringVar(&bootstrapIP, "bootstrapIP", "", "Optional IP to bootstrap node against") flag.StringVar(&bootstrapPort, "bootstrapPort", "", "Optional port of node to bootstrap against") flag.StringVar(&localPort, "localPort", "8080", "Specify a different port to listen on locally") - flag.Parse() } // NewServer creates a new Overlay Service Server -func NewServer(k *kademlia.Kademlia, db *redis.OverlayClient, l *zap.Logger, m *monkit.Registry) *grpc.Server { +func NewServer(k *kademlia.Kademlia, cache *Cache, l *zap.Logger, m *monkit.Registry) *grpc.Server { grpcServer := grpc.NewServer() proto.RegisterOverlayServer(grpcServer, &Overlay{ kad: k, - DB: db, + cache: cache, logger: l, metrics: m, }) @@ -78,7 +76,7 @@ func (s *Service) Process(ctx context.Context) error { // TODO(coyle): Should add the ability to pass a configuration to change the bootstrap node in := kademlia.GetIntroNode(bootstrapIP, bootstrapPort) - kad, err := kademlia.NewKademlia([]proto.Node{in}, "bootstrap.storj.io", "8080") + kad, err := kademlia.NewKademlia([]proto.Node{in}, "0.0.0.0", localPort) if err != nil { s.logger.Error("Failed to instantiate new Kademlia", zap.Error(err)) return err @@ -95,10 +93,13 @@ func (s *Service) Process(ctx context.Context) error { } // bootstrap cache - cache, err := redis.NewOverlayClient(redisAddress, redisPassword, db, kad) - if err != nil { - s.logger.Error("Failed to create a new redis overlay client", zap.Error(err)) - return err + var cache *Cache + if redisAddress != "" { + cache, err = NewRedisOverlayCache(redisAddress, redisPassword, db, kad) + if err != nil { + s.logger.Error("Failed to create a new redis overlay client", zap.Error(err)) + return err + } } if err := cache.Bootstrap(ctx); err != nil { @@ -115,18 +116,20 @@ func (s *Service) Process(ctx context.Context) error { return err } - grpcServer := grpc.NewServer() - proto.RegisterOverlayServer(grpcServer, &Overlay{ - kad: kad, - DB: cache, - logger: s.logger, - metrics: s.metrics, - }) + grpcServer := NewServer(kad, cache, s.logger, s.metrics) http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") }) go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), nil) }() go cache.Walk(ctx) + // If the passed context times out or is cancelled, shutdown the gRPC server + go func() { + if _, ok := <-ctx.Done(); !ok { + grpcServer.GracefulStop() + } + }() + + // If `grpcServer.Serve(...)` returns an error, shutdown/cleanup the gRPC server defer grpcServer.GracefulStop() return grpcServer.Serve(lis) } diff --git a/pkg/overlay/service_test.go b/pkg/overlay/service_test.go index 5d6f01d80..f5d73c195 100644 --- a/pkg/overlay/service_test.go +++ b/pkg/overlay/service_test.go @@ -8,10 +8,12 @@ import ( "fmt" "net" "testing" + "time" "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "storj.io/storj/internal/test" proto "storj.io/storj/protos/overlay" // naming proto to avoid confusion with this package ) @@ -45,3 +47,13 @@ func TestNewClient(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, r) } + +func TestProcess(t *testing.T) { + done := test.EnsureRedis(t) + defer done() + + o := Service{} + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + err := o.Process(ctx) + assert.NoError(t, err) +} diff --git a/pkg/piecestore/pstore_test.go b/pkg/piecestore/pstore_test.go index 4e3d3f2fe..1d273e5e5 100644 --- a/pkg/piecestore/pstore_test.go +++ b/pkg/piecestore/pstore_test.go @@ -13,30 +13,30 @@ import ( ) func TestStore(t *testing.T) { - tests := []struct{ - it string - id string - content []byte + tests := []struct { + it string + id string + content []byte expectedContent []byte - err string - } { - { - it: "should successfully store data", - id: "0123456789ABCDEFGHIJ", - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - { - it: "should return an error when given an invalid id", - id: "012", - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "argError: Invalid id length", - }, - } + err string + }{ + { + it: "should successfully store data", + id: "0123456789ABCDEFGHIJ", + content: []byte("butts"), + expectedContent: []byte("butts"), + err: "", + }, + { + it: "should return an error when given an invalid id", + id: "012", + content: []byte("butts"), + expectedContent: []byte("butts"), + err: "argError: Invalid id length", + }, + } - for _, tt := range tests { + for _, tt := range tests { t.Run(tt.it, func(t *testing.T) { assert := assert.New(t) storeFile, err := StoreWriter(tt.id, os.TempDir()) @@ -79,67 +79,67 @@ func TestStore(t *testing.T) { return } }) - } + } } func TestRetrieve(t *testing.T) { - tests := []struct{ - it string - id string - size int64 - offset int64 - content []byte + tests := []struct { + it string + id string + size int64 + offset int64 + content []byte expectedContent []byte - err string - } { - { - it: "should successfully retrieve data", - id: "0123456789ABCDEFGHIJ", - size: 5, - offset: 0, - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - { - it: "should successfully retrieve data by offset", - id: "0123456789ABCDEFGHIJ", - size: 5, - offset: 5, - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - { - it: "should successfully retrieve data by chunk", - id: "0123456789ABCDEFGHIJ", - size: 2, - offset: 5, - content: []byte("bu"), - expectedContent: []byte("bu"), - err: "", - }, - { - it: "should return an error when given negative offset", - id: "0123456789ABCDEFGHIJ", - size: 0, - offset: -1337, - content: []byte("butts"), - expectedContent: []byte(""), - err: "argError: Invalid offset: -1337", - }, - { - it: "should successfully retrieve data with negative length", - id: "0123456789ABCDEFGHIJ", - size: -1, - offset: 0, - content: []byte("butts"), - expectedContent: []byte("butts"), - err: "", - }, - } + err string + }{ + { + it: "should successfully retrieve data", + id: "0123456789ABCDEFGHIJ", + size: 5, + offset: 0, + content: []byte("butts"), + expectedContent: []byte("butts"), + err: "", + }, + { + it: "should successfully retrieve data by offset", + id: "0123456789ABCDEFGHIJ", + size: 5, + offset: 5, + content: []byte("butts"), + expectedContent: []byte("butts"), + err: "", + }, + { + it: "should successfully retrieve data by chunk", + id: "0123456789ABCDEFGHIJ", + size: 2, + offset: 5, + content: []byte("bu"), + expectedContent: []byte("bu"), + err: "", + }, + { + it: "should return an error when given negative offset", + id: "0123456789ABCDEFGHIJ", + size: 0, + offset: -1337, + content: []byte("butts"), + expectedContent: []byte(""), + err: "argError: Invalid offset: -1337", + }, + { + it: "should successfully retrieve data with negative length", + id: "0123456789ABCDEFGHIJ", + size: -1, + offset: 0, + content: []byte("butts"), + expectedContent: []byte("butts"), + err: "", + }, + } - for _, tt := range tests { + for _, tt := range tests { t.Run(tt.it, func(t *testing.T) { assert := assert.New(t) @@ -194,31 +194,31 @@ func TestRetrieve(t *testing.T) { return } }) - } + } } func TestDelete(t *testing.T) { - tests := []struct{ - it string - id string - err string - } { - { - it: "should successfully delete data", - id: "11111111111111111111", - err: "", - }, - { - it: "should return nil-err with non-existent id", - id: "11111111111111111111", - err: "", - }, - { - it: "should err with invalid id length", - id: "111111", - err: "argError: Invalid id length", - }, - } + tests := []struct { + it string + id string + err string + }{ + { + it: "should successfully delete data", + id: "11111111111111111111", + err: "", + }, + { + it: "should return nil-err with non-existent id", + id: "11111111111111111111", + err: "", + }, + { + it: "should err with invalid id length", + id: "111111", + err: "argError: Invalid id length", + }, + } for _, tt := range tests { t.Run(tt.it, func(t *testing.T) { diff --git a/pkg/piecestore/rpc/server/server_test.go b/pkg/piecestore/rpc/server/server_test.go index d3e56324c..07e3a6c44 100644 --- a/pkg/piecestore/rpc/server/server_test.go +++ b/pkg/piecestore/rpc/server/server_test.go @@ -55,25 +55,25 @@ func TestPiece(t *testing.T) { // set up test cases tests := []struct { - id string + id string size int64 expiration int64 err string }{ { // should successfully retrieve piece meta-data - id: testId, + id: testId, size: 5, expiration: testExpiration, err: "", }, { // server should err with invalid id - id: "123", + id: "123", size: 5, expiration: testExpiration, err: "rpc error: code = Unknown desc = argError: Invalid id length", }, { // server should err with nonexistent file - id: "22222222222222222222", + id: "22222222222222222222", size: 5, expiration: testExpiration, err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(os.TempDir(), "/test-data/3000/22/22/2222222222222222")), @@ -83,14 +83,14 @@ func TestPiece(t *testing.T) { for _, tt := range tests { t.Run("should return expected PieceSummary values", func(t *testing.T) { - // simulate piece TTL entry - _, err = db.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, testCreatedDate, testExpiration)) - if err != nil { - t.Errorf("Error: %v\nCould not make TTL entry", err) - return - } + // simulate piece TTL entry + _, err = db.Exec(fmt.Sprintf(`INSERT INTO ttl (id, created, expires) VALUES ("%s", "%d", "%d")`, tt.id, testCreatedDate, testExpiration)) + if err != nil { + t.Errorf("Error: %v\nCould not make TTL entry", err) + return + } - defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) + defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) req := &pb.PieceId{Id: tt.id} resp, err := c.Piece(context.Background(), req) @@ -146,7 +146,7 @@ func TestRetrieve(t *testing.T) { // set up test cases tests := []struct { - id string + id string reqSize int64 respSize int64 offset int64 @@ -154,7 +154,7 @@ func TestRetrieve(t *testing.T) { err string }{ { // should successfully retrieve data - id: testId, + id: testId, reqSize: 5, respSize: 5, offset: 0, @@ -162,7 +162,7 @@ func TestRetrieve(t *testing.T) { err: "", }, { // server should err with invalid id - id: "123", + id: "123", reqSize: 5, respSize: 5, offset: 0, @@ -170,7 +170,7 @@ func TestRetrieve(t *testing.T) { err: "rpc error: code = Unknown desc = argError: Invalid id length", }, { // server should err with nonexistent file - id: "22222222222222222222", + id: "22222222222222222222", reqSize: 5, respSize: 5, offset: 0, @@ -178,7 +178,7 @@ func TestRetrieve(t *testing.T) { err: fmt.Sprintf("rpc error: code = Unknown desc = stat %s: no such file or directory", path.Join(os.TempDir(), "/test-data/3000/22/22/2222222222222222")), }, { // server should return expected content and respSize with offset and excess reqSize - id: testId, + id: testId, reqSize: 5, respSize: 4, offset: 1, @@ -186,7 +186,7 @@ func TestRetrieve(t *testing.T) { err: "", }, { // server should return expected content with reduced reqSize - id: testId, + id: testId, reqSize: 4, respSize: 4, offset: 0, @@ -229,7 +229,7 @@ func TestRetrieve(t *testing.T) { func TestStore(t *testing.T) { tests := []struct { - id string + id string size int64 ttl int64 offset int64 @@ -239,7 +239,7 @@ func TestStore(t *testing.T) { err string }{ { // should successfully store data - id: testId, + id: testId, ttl: testExpiration, content: []byte("butts"), message: "OK", @@ -247,7 +247,7 @@ func TestStore(t *testing.T) { err: "", }, { // should err with invalid id length - id: "butts", + id: "butts", ttl: testExpiration, content: []byte("butts"), message: "", @@ -279,7 +279,7 @@ func TestStore(t *testing.T) { resp, err := stream.CloseAndRecv() - defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) + defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) if len(tt.err) > 0 { if err != nil { @@ -305,22 +305,22 @@ func TestStore(t *testing.T) { func TestDelete(t *testing.T) { // set up test cases tests := []struct { - id string + id string message string err string }{ { // should successfully delete data - id: testId, + id: testId, message: "OK", err: "", }, { // should err with invalid id length - id: "123", + id: "123", message: "rpc error: code = Unknown desc = argError: Invalid id length", err: "rpc error: code = Unknown desc = argError: Invalid id length", }, { // should return OK with nonexistent file - id: "22222222222222222223", + id: "22222222222222222223", message: "OK", err: "", }, @@ -351,7 +351,7 @@ func TestDelete(t *testing.T) { return } - defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) + defer db.Exec(fmt.Sprintf(`DELETE FROM ttl WHERE id="%s"`, tt.id)) defer pstore.Delete(testId, s.PieceStoreDir) diff --git a/protos/piecestore/piece_store.pb.mock.go b/protos/piecestore/piece_store.pb.mock.go index cf4884b8a..8adae7e3d 100644 --- a/protos/piecestore/piece_store.pb.mock.go +++ b/protos/piecestore/piece_store.pb.mock.go @@ -1,3 +1,6 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + // Code generated by MockGen. DO NOT EDIT. // Source: storj.io/storj/protos/piecestore (interfaces: PieceStoreRoutesClient,PieceStoreRoutes_RetrieveClient) diff --git a/storage/boltdb/client.go b/storage/boltdb/client.go index 8fd3fc963..0789edd00 100644 --- a/storage/boltdb/client.go +++ b/storage/boltdb/client.go @@ -8,39 +8,101 @@ import ( "github.com/boltdb/bolt" "go.uber.org/zap" + + "storj.io/storj/storage" +) + +type boltClient struct { + logger *zap.Logger + db *bolt.DB + Path string + Bucket []byte +} + +const ( + // fileMode sets permissions so owner can read and write + fileMode = 0600 + // PointerBucket is the string representing the bucket used for `PointerEntries` + PointerBucket = "pointers" + // OverlayBucket is the string representing the bucket used for a bolt-backed overlay dht cache + OverlayBucket = "overlay" ) var ( defaultTimeout = 1 * time.Second ) -const ( - // fileMode sets permissions so owner can read and write - fileMode = 0600 -) - -// Client is the storage interface for the Bolt database -type Client struct { - logger *zap.Logger - db *bolt.DB - Path string -} - -// New instantiates a new BoltDB client -func New(logger *zap.Logger, path string) (*Client, error) { +// NewClient instantiates a new BoltDB client given a zap logger, db file path, and a bucket name +func NewClient(logger *zap.Logger, path, bucket string) (storage.KeyValueStore, error) { db, err := bolt.Open(path, fileMode, &bolt.Options{Timeout: defaultTimeout}) if err != nil { return nil, err } - return &Client{ + return &boltClient{ logger: logger, db: db, Path: path, + Bucket: []byte(bucket), }, nil } +// Put adds a value to the provided key in boltdb, returning an error on failure. +func (c *boltClient) Put(key storage.Key, value storage.Value) error { + c.logger.Debug("entering bolt put") + return c.db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists(c.Bucket) + if err != nil { + return err + } + + return b.Put(key, value) + }) +} + +// Get looks up the provided key from boltdb returning either an error or the result. +func (c *boltClient) Get(pathKey storage.Key) (storage.Value, error) { + c.logger.Debug("entering bolt get: " + string(pathKey)) + var pointerBytes []byte + err := c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(c.Bucket) + v := b.Get(pathKey) + if v == nil { + return Error.New("pointer at %#v not found", string(pathKey)) + } + pointerBytes = v + return nil + }) + + return pointerBytes, err +} + +// List returns either a list of keys for which boltdb has values or an error. +func (c *boltClient) List() (storage.Keys, error) { + c.logger.Debug("entering bolt list") + var paths storage.Keys + err := c.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(c.Bucket) + + err := b.ForEach(func(key, value []byte) error { + paths = append(paths, key) + return nil + }) + return err + }) + + return paths, err +} + +// Delete deletes a key/value pair from boltdb, for a given the key +func (c *boltClient) Delete(pathKey storage.Key) error { + c.logger.Debug("entering bolt delete: " + string(pathKey)) + return c.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(c.Bucket).Delete(pathKey) + }) +} + // Close closes a BoltDB client -func (c *Client) Close() error { +func (c *boltClient) Close() error { return c.db.Close() } diff --git a/storage/boltdb/netstate_test.go b/storage/boltdb/client_test.go similarity index 79% rename from storage/boltdb/netstate_test.go rename to storage/boltdb/client_test.go index 39197c5d8..b01695aa6 100644 --- a/storage/boltdb/netstate_test.go +++ b/storage/boltdb/client_test.go @@ -10,6 +10,7 @@ import ( "testing" "go.uber.org/zap" + "storj.io/storj/pkg/netstate" ) func tempfile() string { @@ -27,27 +28,30 @@ func tempfile() string { func TestNetState(t *testing.T) { logger, _ := zap.NewDevelopment() - c, err := New(logger, tempfile()) + c, err := NewClient(logger, tempfile(), "test_bucket") if err != nil { t.Error("Failed to create test db") } defer func() { c.Close() - os.Remove(c.Path) + switch client := c.(type) { + case *boltClient: + os.Remove(client.Path) + } }() - testEntry1 := PointerEntry{ + testEntry1 := netstate.PointerEntry{ Path: []byte(`test/path`), Pointer: []byte(`pointer1`), } - testEntry2 := PointerEntry{ + testEntry2 := netstate.PointerEntry{ Path: []byte(`test/path2`), Pointer: []byte(`pointer2`), } // tests Put function - if err := c.Put(testEntry1); err != nil { + if err := c.Put(testEntry1.Path, testEntry1.Pointer); err != nil { t.Error("Failed to save testFile to pointers bucket") } @@ -66,7 +70,7 @@ func TestNetState(t *testing.T) { } // tests List function - if err := c.Put(testEntry2); err != nil { + if err := c.Put(testEntry2.Path, testEntry2.Pointer); err != nil { t.Error("Failed to put testEntry2 to pointers bucket") } testPaths, err := c.List() diff --git a/storage/boltdb/netstate.go b/storage/boltdb/netstate.go deleted file mode 100644 index ecd0ca548..000000000 --- a/storage/boltdb/netstate.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package boltdb - -import ( - "github.com/boltdb/bolt" -) - -// PointerEntry - Path and Pointer are saved as a kv pair to boltdb. -// The following boltdb methods handle the pointer type (defined in -// the protobuf file) after it has been marshalled into bytes. -type PointerEntry struct { - Path []byte - Pointer []byte -} - -const ( - pointerBucket = "pointers" -) - -// Put saves the Path and Pointer as a kv entry in the "pointers" bucket -func (client *Client) Put(pe PointerEntry) error { - client.logger.Debug("entering bolt put") - return client.db.Update(func(tx *bolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(pointerBucket)) - if err != nil { - return err - } - - return b.Put(pe.Path, pe.Pointer) - }) -} - -// Get retrieves the Pointer value stored at the Path key -func (client *Client) Get(pathKey []byte) ([]byte, error) { - client.logger.Debug("entering bolt get: " + string(pathKey)) - var pointerBytes []byte - err := client.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(pointerBucket)) - v := b.Get(pathKey) - if v == nil { - return Error.New("pointer at %#v not found", string(pathKey)) - } - pointerBytes = v - return nil - }) - - return pointerBytes, err -} - -// List creates a byte array of all path keys in in the "pointers" bucket -func (client *Client) List() ([][]byte, error) { - client.logger.Debug("entering bolt list") - var paths [][]byte - err := client.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(pointerBucket)) - - err := b.ForEach(func(key, value []byte) error { - paths = append(paths, key) - return nil - }) - return err - }) - - return paths, err -} - -// Delete deletes a kv pair from the "pointers" bucket, given the Path key -func (client *Client) Delete(pathKey []byte) error { - client.logger.Debug("entering bolt delete: " + string(pathKey)) - return client.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket([]byte(pointerBucket)).Delete(pathKey) - }) -} diff --git a/storage/common.go b/storage/common.go new file mode 100644 index 000000000..1f2451bc1 --- /dev/null +++ b/storage/common.go @@ -0,0 +1,49 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package storage + +// Key is the type for the keys in a `KeyValueStore` +type Key []byte + +// Value is the type for the values in a `ValueValueStore` +type Value []byte + +// Keys is the type for a slice of keys in a `KeyValueStore` +type Keys []Key + +// KeyValueStore is an interface describing key/value stores like redis and boltdb +type KeyValueStore interface { + // Put adds a value to the provided key in the KeyValueStore, returning an error on failure. + Put(Key, Value) error + Get(Key) (Value, error) + List() (Keys, error) + Delete(Key) error + Close() error +} + +// MarshalBinary implements the encoding.BinaryMarshaler interface for the Value type +func (v *Value) MarshalBinary() (_ []byte, _ error) { + return *v, nil +} + +// MarshalBinary implements the encoding.BinaryMarshaler interface for the Key type +func (k *Key) MarshalBinary() (_ []byte, _ error) { + return *k, nil +} + +// ByteSlices converts a `Keys` struct to a slice of byte-slices (i.e. `[][]byte`) +func (k *Keys) ByteSlices() [][]byte { + result := make([][]byte, len(*k)) + + for _k, v := range *k { + result[_k] = []byte(v) + } + + return result +} + +// String implements the Stringer interface +func (k *Key) String() string { + return string(*k) +} diff --git a/storage/redis/client.go b/storage/redis/client.go index db6da5a61..d3da281d6 100644 --- a/storage/redis/client.go +++ b/storage/redis/client.go @@ -7,50 +7,74 @@ import ( "time" "github.com/go-redis/redis" + "github.com/zeebo/errs" + "storj.io/storj/storage" ) -// Client defines the interface for communicating with a Storj redis instance -type Client interface { - Get(key string) ([]byte, error) - Set(key string, value []byte, ttl time.Duration) error - Ping() error -} +const defaultNodeExpiration = 61 * time.Minute -// Client is the entrypoint into Redis +// redisClient is the entrypoint into Redis type redisClient struct { - DB *redis.Client + db *redis.Client + TTL time.Duration } -// NewRedisClient returns a configured Client instance, verifying a sucessful connection to redis -func NewRedisClient(address, password string, db int) (Client, error) { +// NewClient returns a configured Client instance, verifying a sucessful connection to redis +func NewClient(address, password string, db int) (storage.KeyValueStore, error) { c := &redisClient{ - DB: redis.NewClient(&redis.Options{ + db: redis.NewClient(&redis.Options{ Addr: address, Password: password, DB: db, }), + TTL: defaultNodeExpiration, } - // ping here to verify we are able to connect to the redis instacne with the initialized client. - if err := c.DB.Ping().Err(); err != nil { + // ping here to verify we are able to connect to redis with the initialized client. + if err := c.db.Ping().Err(); err != nil { return nil, err } return c, nil } -// Get looks up the provided key from the redis cache returning either an error or the result. -func (c *redisClient) Get(key string) ([]byte, error) { - return c.DB.Get(key).Bytes() +// Get looks up the provided key from redis returning either an error or the result. +func (c *redisClient) Get(key storage.Key) (storage.Value, error) { + return c.db.Get(string(key)).Bytes() } -// Set adds a value to the provided key in the Redis cache, returning an error on failure. +// Put adds a value to the provided key in redis, returning an error on failure. +func (c *redisClient) Put(key storage.Key, value storage.Value) error { + v, err := value.MarshalBinary() -func (c *redisClient) Set(key string, value []byte, ttl time.Duration) error { - return c.DB.Set(key, value, ttl).Err() + if err != nil { + return err + } + + return c.db.Set(key.String(), v, c.TTL).Err() } -// Ping returns an error if pinging the underlying redis server failed -func (c *redisClient) Ping() error { - return c.DB.Ping().Err() +// List returns either a list of keys for which boltdb has values or an error. +func (c *redisClient) List() (_ storage.Keys, _ error) { + results, err := c.db.Keys("*").Result() + if err != nil { + return nil, errs.Wrap(err) + } + + keys := make(storage.Keys, len(results)) + for i, k := range results { + keys[i] = storage.Key(k) + } + + return keys, nil +} + +// Delete deletes a key/value pair from redis, for a given the key +func (c *redisClient) Delete(key storage.Key) error { + return c.db.Del(key.String()).Err() +} + +// Close closes a redis client +func (c *redisClient) Close() error { + return c.db.Close() } diff --git a/storage/redis/mock_client_test.go b/storage/redis/mock_client_test.go deleted file mode 100644 index 3c9476078..000000000 --- a/storage/redis/mock_client_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package redis - -import ( - "errors" - "time" -) - -type mockRedisClient struct { - data map[string][]byte - getCalled int - setCalled int - pingCalled int -} - -// ErrMissingKey is the error returned if a key is not in the mock store -var ErrMissingKey = errors.New("missing") - -// ErrForced is the error returned when the forced error flag is passed to mock an error -var ErrForced = errors.New("error forced by using 'error' key in mock") - -func (m *mockRedisClient) Get(key string) ([]byte, error) { - m.getCalled++ - if key == "error" { - return []byte{}, ErrForced - } - v, ok := m.data[key] - if !ok { - return []byte{}, ErrMissingKey - } - - return v, nil -} - -func (m *mockRedisClient) Set(key string, value []byte, ttl time.Duration) error { - m.setCalled++ - m.data[key] = value - return nil -} - -func (m *mockRedisClient) Ping() error { - m.pingCalled++ - return nil -} - -func newMockRedisClient(d map[string][]byte) *mockRedisClient { - return &mockRedisClient{ - data: d, - getCalled: 0, - setCalled: 0, - pingCalled: 0, - } -} diff --git a/storage/redis/overlay_test.go b/storage/redis/overlay_test.go deleted file mode 100644 index 1f147142b..000000000 --- a/storage/redis/overlay_test.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package redis - -import ( - "context" - "testing" - - "github.com/gogo/protobuf/proto" - "github.com/stretchr/testify/assert" - - "storj.io/storj/protos/overlay" -) - -func TestGet(t *testing.T) { - cases := []struct { - testID string - expectedTimesCalled int - key string - expectedResponse *overlay.NodeAddress - expectedError error - client *mockRedisClient - }{ - { - testID: "valid Get", - expectedTimesCalled: 1, - key: "foo", - expectedResponse: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}, - expectedError: nil, - client: newMockRedisClient(map[string][]byte{"foo": func() []byte { - na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} - d, err := proto.Marshal(na) - assert.NoError(t, err) - return d - }()}), - }, - { - testID: "error Get from redis", - expectedTimesCalled: 1, - key: "error", - expectedResponse: nil, - expectedError: ErrForced, - client: newMockRedisClient(map[string][]byte{"error": func() []byte { - na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} - d, err := proto.Marshal(na) - assert.NoError(t, err) - return d - }()}), - }, - { - testID: "get missing key", - expectedTimesCalled: 1, - key: "bar", - expectedResponse: nil, - expectedError: ErrMissingKey, - client: newMockRedisClient(map[string][]byte{"foo": func() []byte { - na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} - d, err := proto.Marshal(na) - assert.NoError(t, err) - return d - }()}), - }, - } - - for _, c := range cases { - t.Run(c.testID, func(t *testing.T) { - - oc := OverlayClient{DB: c.client} - - assert.Equal(t, 0, c.client.getCalled) - - resp, err := oc.Get(context.Background(), c.key) - assert.Equal(t, c.expectedError, err) - assert.Equal(t, c.expectedResponse, resp) - assert.Equal(t, c.expectedTimesCalled, c.client.getCalled) - }) - } -} - -func TestSet(t *testing.T) { - cases := []struct { - testID string - expectedTimesCalled int - key string - value overlay.NodeAddress - expectedError error - client *mockRedisClient - }{ - { - testID: "valid Set", - expectedTimesCalled: 1, - key: "foo", - value: overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}, - expectedError: nil, - client: newMockRedisClient(map[string][]byte{}), - }, - } - - for _, c := range cases { - t.Run(c.testID, func(t *testing.T) { - - oc := OverlayClient{DB: c.client} - - assert.Equal(t, 0, c.client.setCalled) - - err := oc.Set(c.key, c.value) - assert.Equal(t, c.expectedError, err) - assert.Equal(t, c.expectedTimesCalled, c.client.setCalled) - - v := c.client.data[c.key] - na := &overlay.NodeAddress{} - - assert.NoError(t, proto.Unmarshal(v, na)) - assert.Equal(t, na, &c.value) - }) - } -}