adds netstate pagination (#95)

* adds netstate rpc server pagination, mocks pagination in test/util.go

* updates ns client example, combines ns client and server test to netstate_test, adds pagination to bolt client

* better organizes netstate test calls

* wip breaking netstate test into smaller tests

* wip modularizing netstate tests

* adds some test panics

* wip netstate test attempts

* testing bug in netstate TestDeleteAuth

* wip fixes global variable problem, still issues with list

* wip fixes get request params and args

* fixes bug in path when using MakePointers helper fn

* updates mockdb list func, adds test, changes Limit to int

* fixes merge conflicts

* fixes broken tests from merge

* remove unnecessary PointerEntry struct

* removes error when Get returns nil value from boltdb

* breaks boltdb client tests into smaller tests

* renames AssertNoErr test helper to HandleErr

* adds StartingKey and Limit parameters to redis list func, adds beginning of redis tests

* adds helper func for mockdb List function

* if no starting key provided for netstate List, the first value in storage will be used

* adds basic pagination for redis List function, adds tests

* adds list limit to call in overlay/server.go

* streamlines/fixes some nits from review

* removes use of obsolete EncryptedUnencryptedSize

* uses MockKeyValueStore instead of redis instance in redis client test

* changes test to expect nil returned for getting missing key

* remove error from `KeyValueStore#Get`

* fix bolt test

* Merge pull request #1 from bryanchriswhite/nat-pagination

remove error from `KeyValueStore#Get`

* adds Get returning error back to KeyValueStore interface and affected clients

* trying to appease travis: returns errors in Get calls in overlay/cache and cache_test

* handles redis get error when no key found
This commit is contained in:
Natalie Villasana 2018-06-29 16:06:25 -04:00 committed by Dennis Coyle
parent 26f68fa774
commit 80727ae90b
17 changed files with 898 additions and 368 deletions

View File

@ -18,7 +18,8 @@ import (
)
var (
port string
port string
apiKey = []byte("abc123")
)
func initializeFlags() {
@ -44,36 +45,57 @@ func main() {
ctx := context.Background()
// Example pointer paths to put
//pr1 passes with api creds
pr1 := proto.PutRequest{
Path: []byte("welcome/to/my/pointer/journey"),
Path: []byte("test/path/1"),
Pointer: &proto.Pointer{
Type: proto.Pointer_INLINE,
InlineSegment: []byte("granola"),
InlineSegment: []byte("inline1"),
},
APIKey: []byte("abc123"),
APIKey: apiKey,
}
// pr2 passes with api creds
pr2 := proto.PutRequest{
Path: []byte("so/many/pointers"),
Path: []byte("test/path/2"),
Pointer: &proto.Pointer{
Type: proto.Pointer_INLINE,
InlineSegment: []byte("m&ms"),
InlineSegment: []byte("inline2"),
},
APIKey: []byte("abc123"),
APIKey: apiKey,
}
// pr3 fails api creds
pr3 := proto.PutRequest{
Path: []byte("another/pointer/for/the/pile"),
Path: []byte("test/path/3"),
Pointer: &proto.Pointer{
Type: proto.Pointer_INLINE,
InlineSegment: []byte("popcorn"),
InlineSegment: []byte("inline3"),
},
APIKey: []byte("abc13"),
APIKey: apiKey,
}
// rps is an example slice of RemotePieces, which is passed into
// this example Pointer of type REMOTE.
var rps []*proto.RemotePiece
rps = append(rps, &proto.RemotePiece{
PieceNum: int64(1),
NodeId: "testId",
})
pr4 := proto.PutRequest{
Path: []byte("test/path/4"),
Pointer: &proto.Pointer{
Type: proto.Pointer_REMOTE,
Remote: &proto.RemoteSegment{
Redundancy: &proto.RedundancyScheme{
Type: proto.RedundancyScheme_RS,
MinReq: int64(1),
Total: int64(3),
RepairThreshold: int64(2),
SuccessThreshold: int64(3),
},
PieceId: "testId",
RemotePieces: rps,
},
},
APIKey: apiKey,
}
// Example Puts
// puts passes api creds
_, err = client.Put(ctx, &pr1)
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to put", zap.Error(err))
@ -86,12 +108,15 @@ func main() {
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to put", zap.Error(err))
}
_, err = client.Put(ctx, &pr4)
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to put", zap.Error(err))
}
// Example Get
// get passes api creds
getReq := proto.GetRequest{
Path: []byte("so/many/pointers"),
APIKey: []byte("abc123"),
Path: []byte("test/path/1"),
APIKey: apiKey,
}
getRes, err := client.Get(ctx, &getReq)
if err != nil || status.Code(err) == codes.Internal {
@ -102,15 +127,11 @@ func main() {
}
// Example List
// list passes api creds
listReq := proto.ListRequest{
// This pagination functionality doesn't work yet.
// The given arguments are placeholders.
StartingPathKey: []byte("test/pointer/path"),
StartingPathKey: []byte("test/path/2"),
Limit: 5,
APIKey: []byte("abc123"),
APIKey: apiKey,
}
listRes, err := client.List(ctx, &listReq)
if err != nil || status.Code(err) == codes.Internal {
logger.Error("failed to list file paths")
@ -119,14 +140,13 @@ func main() {
for _, pathByte := range listRes.Paths {
stringList = append(stringList, string(pathByte))
}
logger.Debug("listed paths: " + strings.Join(stringList, ", "))
logger.Debug("listed paths: " + strings.Join(stringList, ", ") + "; truncated: " + fmt.Sprintf("%t", listRes.Truncated))
}
// Example Delete
// delete passes api creds
delReq := proto.DeleteRequest{
Path: []byte("welcome/to/my/pointer/journey"),
APIKey: []byte("abc123"),
Path: []byte("test/path/1"),
APIKey: apiKey,
}
_, err = client.Delete(ctx, &delReq)
if err != nil || status.Code(err) == codes.Internal {

View File

@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"path/filepath"
"sort"
"testing"
"time"
@ -65,11 +66,11 @@ var (
func (m *MockKeyValueStore) Get(key storage.Key) (storage.Value, error) {
m.GetCalled++
if key.String() == "error" {
return storage.Value{}, ErrForced.New("forced error")
return nil, nil
}
v, ok := m.Data[key.String()]
if !ok {
return storage.Value{}, ErrMissingKey.New("key %v missing", key)
return storage.Value{}, nil
}
return v, nil
@ -90,16 +91,42 @@ func (m *MockKeyValueStore) Delete(key storage.Key) error {
}
// List returns either a list of keys for which the MockKeyValueStore has values or an error.
func (m *MockKeyValueStore) List() (_ storage.Keys, _ error) {
func (m *MockKeyValueStore) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
m.ListCalled++
keys := storage.Keys{}
for k := range m.Data {
keys = append(keys, storage.Key(k))
}
keySlice := mapIntoSlice(m.Data)
started := false
if startingKey == nil {
started = true
}
for _, key := range keySlice {
if !started && key == string(startingKey) {
keys = append(keys, storage.Key(key))
started = true
continue
}
if started {
if len(keys) == int(limit) {
break
}
keys = append(keys, storage.Key(key))
}
}
return keys, nil
}
func mapIntoSlice(data KvStore) []string {
keySlice := make([]string, len(data))
i := 0
for k := range data {
keySlice[i] = k
i++
}
sort.Strings(keySlice)
return keySlice
}
// Close closes the client
func (m *MockKeyValueStore) Close() error {
m.CloseCalled++

View File

@ -1,147 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package netstate
import (
"bytes"
"context"
"fmt"
"net"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"storj.io/storj/internal/test"
pb "storj.io/storj/protos/netstate"
)
func TestNetStateClient(t *testing.T) {
logger, _ := zap.NewDevelopment()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 9000))
assert.NoError(t, err)
mdb := test.NewMockKeyValueStore(test.KvStore{})
grpcServer := grpc.NewServer()
pb.RegisterNetStateServer(grpcServer, NewServer(mdb, logger))
defer grpcServer.GracefulStop()
go grpcServer.Serve(lis)
address := lis.Addr().String()
conn, err := grpc.Dial(address, grpc.WithInsecure())
assert.NoError(t, err)
c := pb.NewNetStateClient(conn)
ctx := context.Background()
// example file path to put/get
pr1 := pb.PutRequest{
Path: []byte("here/is/a/path"),
Pointer: &pb.Pointer{
Type: pb.Pointer_INLINE,
InlineSegment: []byte("oatmeal"),
},
APIKey: []byte("abc123"),
}
// Tests Server.Put
_, err = c.Put(ctx, &pr1)
if err != nil || status.Code(err) == codes.Internal {
t.Error("Failed to Put")
}
if mdb.PutCalled != 1 {
t.Error("Failed to call mockdb correctly")
}
pointerBytes, err := proto.Marshal(pr1.Pointer)
if err != nil {
t.Error("failed to marshal test pointer")
}
if !bytes.Equal(mdb.Data[string(pr1.Path)], pointerBytes) {
t.Error("Expected saved pointer to equal given pointer")
}
// Tests Server.Get
getReq := pb.GetRequest{
Path: []byte("here/is/a/path"),
APIKey: []byte("abc123"),
}
getRes, err := c.Get(ctx, &getReq)
assert.NoError(t, err)
if !bytes.Equal(getRes.Pointer, pointerBytes) {
t.Error("Expected to get same content that was put")
}
if mdb.GetCalled != 1 {
t.Error("Failed to call mockdb correct number of times")
}
// Puts another pointer entry to test delete and list
pr2 := pb.PutRequest{
Path: []byte("here/is/another/path"),
Pointer: &pb.Pointer{
Type: pb.Pointer_INLINE,
InlineSegment: []byte("raisins"),
},
APIKey: []byte("abc123"),
}
_, err = c.Put(ctx, &pr2)
if err != nil || status.Code(err) == codes.Internal {
t.Error("Failed to Put")
}
if mdb.PutCalled != 2 {
t.Error("Failed to call mockdb correct number of times")
}
// Test Server.Delete
delReq := pb.DeleteRequest{
Path: []byte("here/is/a/path"),
APIKey: []byte("abc123"),
}
_, err = c.Delete(ctx, &delReq)
if err != nil || status.Code(err) == codes.Internal {
t.Error("Failed to delete")
}
if mdb.DeleteCalled != 1 {
t.Error("Failed to call mockdb correct number of times")
}
// Tests Server.List
listReq := pb.ListRequest{
// This pagination functionality doesn't work yet.
// The given arguments are placeholders.
StartingPathKey: []byte("test/pointer/path"),
Limit: 5,
APIKey: []byte("abc123"),
}
listRes, err := c.List(ctx, &listReq)
if err != nil {
t.Error("Failed to list file paths")
}
if !bytes.Equal(listRes.Paths[0], []byte("here/is/another/path")) {
t.Error("Failed to list correct file path")
}
if mdb.ListCalled != 1 {
t.Error("Failed to call mockdb correct number of times")
}
}

11
pkg/netstate/common.go Normal file
View File

@ -0,0 +1,11 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package netstate
import (
"github.com/zeebo/errs"
)
// Error is the default boltdb errs class
var Error = errs.Class("netstate error")

View File

@ -17,12 +17,6 @@ import (
"storj.io/storj/storage"
)
// PointerEntry - Path and Pointer are saved as a key/value pair to a `storage.KeyValueStore`.
type PointerEntry struct {
Path []byte
Pointer []byte
}
// Server implements the network state RPC service
type Server struct {
DB storage.KeyValueStore
@ -45,7 +39,7 @@ func (s *Server) validateAuth(APIKeyBytes []byte) error {
return nil
}
// Put formats and hands off a file path to be saved to boltdb
// Put formats and hands off a key/value (path/pointer) to be saved to boltdb
func (s *Server) Put(ctx context.Context, putReq *pb.PutRequest) (*pb.PutResponse, error) {
s.logger.Debug("entering netstate put")
@ -60,16 +54,11 @@ func (s *Server) Put(ctx context.Context, putReq *pb.PutRequest) (*pb.PutRespons
return nil, status.Errorf(codes.Internal, err.Error())
}
pe := PointerEntry{
Path: putReq.Path,
Pointer: pointerBytes,
}
if err := s.DB.Put(pe.Path, pe.Pointer); err != nil {
if err := s.DB.Put(putReq.Path, pointerBytes); err != nil {
s.logger.Error("err putting pointer", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
s.logger.Debug("put to the db: " + string(pe.Path))
s.logger.Debug("put to the db: " + string(putReq.Path))
return &pb.PutResponse{}, nil
}
@ -84,7 +73,6 @@ func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse,
}
pointerBytes, err := s.DB.Get(req.Path)
if err != nil {
s.logger.Error("err getting file", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
@ -99,25 +87,48 @@ func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse,
func (s *Server) List(ctx context.Context, req *pb.ListRequest) (*pb.ListResponse, error) {
s.logger.Debug("entering netstate list")
if req.Limit <= 0 {
return nil, Error.New("err Limit is less than or equal to 0")
}
APIKeyBytes := []byte(req.APIKey)
if err := s.validateAuth(APIKeyBytes); err != nil {
return nil, err
}
pathKeys, err := s.DB.List()
if err != nil {
s.logger.Error("err listing path keys", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
var keyList storage.Keys
if req.StartingPathKey == nil {
pathKeys, err := s.DB.List(nil, storage.Limit(req.Limit))
if err != nil {
s.logger.Error("err listing path keys with no starting key", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
keyList = pathKeys
} else if req.StartingPathKey != nil {
pathKeys, err := s.DB.List(storage.Key(req.StartingPathKey), storage.Limit(req.Limit))
if err != nil {
s.logger.Error("err listing path keys", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
keyList = pathKeys
}
truncated := isItTruncated(keyList, int(req.Limit))
s.logger.Debug("path keys retrieved")
return &pb.ListResponse{
// pathKeys is an array of byte arrays
Paths: pathKeys.ByteSlices(),
Paths: keyList.ByteSlices(),
Truncated: truncated,
}, nil
}
func isItTruncated(keyList storage.Keys, limit int) bool {
if len(keyList) == limit {
return true
}
return false
}
// Delete formats and hands off a file path to delete from boltdb
func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
s.logger.Debug("entering netstate delete")
@ -129,7 +140,7 @@ func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteR
err := s.DB.Delete(req.Path)
if err != nil {
s.logger.Error("err deleting pointer entry", zap.Error(err))
s.logger.Error("err deleting path and pointer", zap.Error(err))
return nil, status.Errorf(codes.Internal, err.Error())
}
s.logger.Debug("deleted pointer at path: " + string(req.Path))

View File

@ -0,0 +1,423 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package netstate
import (
"bytes"
"context"
"fmt"
"net"
"testing"
"github.com/golang/protobuf/proto"
"github.com/spf13/viper"
"go.uber.org/zap"
"google.golang.org/grpc"
"storj.io/storj/internal/test"
pb "storj.io/storj/protos/netstate"
)
var (
ctx = context.Background()
)
type NetStateClientTest struct {
*testing.T
server *grpc.Server
lis net.Listener
mdb *test.MockKeyValueStore
c pb.NetStateClient
}
func NewNetStateClientTest(t *testing.T) *NetStateClientTest {
mdb := test.NewMockKeyValueStore(test.KvStore{})
viper.Reset()
viper.Set("key", "abc123")
// tests should always listen on "localhost:0"
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
panic(err)
}
grpcServer := grpc.NewServer()
pb.RegisterNetStateServer(grpcServer, NewServer(mdb, zap.L()))
go grpcServer.Serve(lis)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
grpcServer.GracefulStop()
lis.Close()
t.Fatal(err)
}
return &NetStateClientTest{
T: t,
server: grpcServer,
lis: lis,
mdb: mdb,
c: pb.NewNetStateClient(conn),
}
}
func (nt *NetStateClientTest) Close() {
nt.server.GracefulStop()
nt.lis.Close()
}
func MakePointer(path []byte, auth bool) pb.PutRequest {
var APIKey = "abc123"
if !auth {
APIKey = "wrong key"
}
// rps is an example slice of RemotePieces to add to this
// REMOTE pointer type.
var rps []*pb.RemotePiece
rps = append(rps, &pb.RemotePiece{
PieceNum: int64(1),
NodeId: "testId",
})
pr := pb.PutRequest{
Path: path,
Pointer: &pb.Pointer{
Type: pb.Pointer_REMOTE,
Remote: &pb.RemoteSegment{
Redundancy: &pb.RedundancyScheme{
Type: pb.RedundancyScheme_RS,
MinReq: int64(1),
Total: int64(3),
RepairThreshold: int64(2),
SuccessThreshold: int64(3),
},
PieceId: "testId",
RemotePieces: rps,
},
Size: int64(1),
},
APIKey: []byte(APIKey),
}
return pr
}
func MakePointers(howMany int) []pb.PutRequest {
var pointers []pb.PutRequest
for i := 1; i <= howMany; i++ {
newPointer := MakePointer([]byte("file/path/"+fmt.Sprintf("%d", i)), true)
pointers = append(pointers, newPointer)
}
return pointers
}
func (nt *NetStateClientTest) Put(pr pb.PutRequest) *pb.PutResponse {
pre := nt.mdb.PutCalled
putRes, err := nt.c.Put(ctx, &pr)
if err != nil {
nt.HandleErr(err, "Failed to put")
}
if pre+1 != nt.mdb.PutCalled {
nt.HandleErr(nil, "Failed to call Put correct number of times")
}
return putRes
}
func (nt *NetStateClientTest) Get(gr pb.GetRequest) *pb.GetResponse {
pre := nt.mdb.GetCalled
getRes, err := nt.c.Get(ctx, &gr)
if err != nil {
nt.HandleErr(err, "Failed to get")
}
if pre+1 != nt.mdb.GetCalled {
nt.HandleErr(nil, "Failed to call Get correct number of times")
}
return getRes
}
func (nt *NetStateClientTest) List(lr pb.ListRequest) (listRes *pb.ListResponse) {
pre := nt.mdb.ListCalled
listRes, err := nt.c.List(ctx, &lr)
if err != nil {
nt.HandleErr(err, "Failed to list")
}
if pre+1 != nt.mdb.ListCalled {
nt.HandleErr(nil, "Failed to call List correct number of times")
}
return listRes
}
func (nt *NetStateClientTest) Delete(dr pb.DeleteRequest) (delRes *pb.DeleteResponse) {
pre := nt.mdb.DeleteCalled
delRes, err := nt.c.Delete(ctx, &dr)
if err != nil {
nt.HandleErr(err, "Failed to delete")
}
if pre+1 != nt.mdb.DeleteCalled {
nt.HandleErr(nil, "Failed to call Delete correct number of times")
}
return delRes
}
func (nt *NetStateClientTest) HandleErr(err error, msg string) {
nt.Error(msg)
if err != nil {
panic(err)
}
panic(msg)
}
func TestMockList(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
err := nt.mdb.Put([]byte("k1"), []byte("v1"))
if err != nil {
panic(err)
}
err = nt.mdb.Put([]byte("k2"), []byte("v2"))
if err != nil {
panic(err)
}
err = nt.mdb.Put([]byte("k3"), []byte("v3"))
if err != nil {
panic(err)
}
err = nt.mdb.Put([]byte("k4"), []byte("v4"))
if err != nil {
panic(err)
}
keys, err := nt.mdb.List([]byte("k2"), 2)
if err != nil {
nt.HandleErr(err, "Failed to list")
}
if fmt.Sprintf("%s", keys) != "[k2 k3]" {
nt.HandleErr(nil, "Failed to receive accepted list. Received "+fmt.Sprintf("%s", keys))
}
keys, err = nt.mdb.List(nil, 3)
if err != nil {
nt.HandleErr(err, "Failed to list")
}
if fmt.Sprintf("%s", keys) != "[k1 k2 k3]" {
nt.HandleErr(nil, "Failed to receive accepted list. Received "+fmt.Sprintf("%s", keys))
}
}
func TestNetStatePutGet(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
preGet := nt.mdb.GetCalled
prePut := nt.mdb.PutCalled
gr := nt.Get(pb.GetRequest{
Path: []byte("file/path/1"),
APIKey: []byte("abc123"),
})
if gr.Pointer != nil {
nt.HandleErr(nil, "Expected no pointer")
}
pr := MakePointer([]byte("file/path/1"), true)
nt.Put(pr)
gr = nt.Get(pb.GetRequest{
Path: []byte("file/path/1"),
APIKey: []byte("abc123"),
})
if gr == nil {
nt.HandleErr(nil, "Failed to get the put pointer")
}
pointerBytes, err := proto.Marshal(pr.Pointer)
if err != nil {
nt.HandleErr(err, "Failed to marshal test pointer")
}
if !bytes.Equal(gr.Pointer, pointerBytes) {
nt.HandleErr(nil, "Expected to get same content that was put")
}
if nt.mdb.GetCalled != preGet+2 {
nt.HandleErr(nil, "Failed to call get correct number of times")
}
if nt.mdb.PutCalled != prePut+1 {
nt.HandleErr(nil, "Failed to call put correct number of times")
}
}
func TestGetAuth(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
getReq := pb.GetRequest{
Path: []byte("file/path/1"),
APIKey: []byte("wrong key"),
}
_, err := nt.c.Get(ctx, &getReq)
if err == nil {
nt.HandleErr(nil, "Failed to error for wrong auth key")
}
}
func TestPutAuth(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
pr := MakePointer([]byte("file/path"), false)
_, err := nt.c.Put(ctx, &pr)
if err == nil {
nt.HandleErr(nil, "Failed to error for wrong auth key")
}
}
func TestDelete(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
pre := nt.mdb.DeleteCalled
reqs := MakePointers(1)
_, err := nt.c.Put(ctx, &reqs[0])
if err != nil {
nt.HandleErr(err, "Failed to put")
}
delReq := pb.DeleteRequest{
Path: []byte("file/path/1"),
APIKey: []byte("abc123"),
}
_, err = nt.c.Delete(ctx, &delReq)
if err != nil {
nt.HandleErr(err, "Failed to delete")
}
if pre+1 != nt.mdb.DeleteCalled {
nt.HandleErr(nil, "Failed to call Delete correct number of times")
}
}
func TestDeleteAuth(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
reqs := MakePointers(1)
_, err := nt.c.Put(ctx, &reqs[0])
if err != nil {
nt.HandleErr(err, "Failed to put")
}
delReq := pb.DeleteRequest{
Path: []byte("file/path/1"),
APIKey: []byte("wrong key"),
}
_, err = nt.c.Delete(ctx, &delReq)
if err == nil {
nt.HandleErr(nil, "Failed to error with wrong auth key")
}
}
func TestList(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
reqs := MakePointers(4)
for _, req := range reqs {
nt.Put(req)
}
listReq := pb.ListRequest{
StartingPathKey: []byte("file/path/2"),
Limit: 5,
APIKey: []byte("abc123"),
}
listRes := nt.List(listReq)
if listRes.Truncated {
nt.HandleErr(nil, "Expected list slice to not be truncated")
}
if !bytes.Equal(listRes.Paths[0], []byte("file/path/2")) {
nt.HandleErr(nil, "Failed to list correct file paths")
}
}
func TestListTruncated(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
reqs := MakePointers(3)
for _, req := range reqs {
_, err := nt.c.Put(ctx, &req)
if err != nil {
nt.HandleErr(err, "Failed to put")
}
}
listReq := pb.ListRequest{
StartingPathKey: []byte("file/path/1"),
Limit: 1,
APIKey: []byte("abc123"),
}
listRes, err := nt.c.List(ctx, &listReq)
if err != nil {
nt.HandleErr(err, "Failed to list file paths")
}
if !listRes.Truncated {
nt.HandleErr(nil, "Expected list slice to be truncated")
}
}
func TestListWithoutStartingKey(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
reqs := MakePointers(3)
for _, req := range reqs {
_, err := nt.c.Put(ctx, &req)
if err != nil {
nt.HandleErr(err, "Failed to put")
}
}
listReq := pb.ListRequest{
Limit: 3,
APIKey: []byte("abc123"),
}
listRes, err := nt.c.List(ctx, &listReq)
if err != nil {
nt.HandleErr(err, "Failed to list without starting key")
}
if !bytes.Equal(listRes.Paths[2], []byte("file/path/3")) {
nt.HandleErr(nil, "Failed to list correct paths")
}
}
func TestListWithoutLimit(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
listReq := pb.ListRequest{
StartingPathKey: []byte("file/path/3"),
APIKey: []byte("abc123"),
}
_, err := nt.c.List(ctx, &listReq)
if err == nil {
t.Error("Failed to error when not given limit")
}
}
func TestListAuth(t *testing.T) {
nt := NewNetStateClientTest(t)
defer nt.Close()
listReq := pb.ListRequest{
StartingPathKey: []byte("file/path/3"),
Limit: 1,
APIKey: []byte("wrong key"),
}
_, err := nt.c.List(ctx, &listReq)
if err == nil {
t.Error("Failed to error when given wrong auth key")
}
}

View File

@ -1,22 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package netstate
import (
"os"
"testing"
"github.com/spf13/viper"
)
const (
API_KEY = "abc123"
)
func TestMain(m *testing.M) {
viper.SetEnvPrefix("API")
os.Setenv("API_KEY", API_KEY)
viper.AutomaticEnv()
os.Exit(m.Run())
}

View File

@ -59,6 +59,10 @@ func (o *Cache) Get(ctx context.Context, key string) (*overlay.NodeAddress, erro
if err != nil {
return nil, err
}
if b.IsZero() {
// TODO: log? return an error?
return nil, nil
}
na := &overlay.NodeAddress{}
if err := proto.Unmarshal(b, na); err != nil {

View File

@ -83,7 +83,7 @@ var (
}
}(),
expectedErrors: errors{
mock: &test.ErrForced,
mock: nil,
bolt: nil,
_redis: nil,
},
@ -107,9 +107,9 @@ var (
},
// TODO(bryanchriswhite): compare actual errors
expectedErrors: errors{
mock: &test.ErrMissingKey,
bolt: &boltdb.Error,
_redis: &redis.Error,
mock: nil,
bolt: nil,
_redis: nil,
},
data: test.KvStore{"foo": func() storage.Value {
na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}

View File

@ -47,7 +47,7 @@ func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.L
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (*proto.FindStorageNodesResponse, error) {
// NB: call FilterNodeReputation from node_reputation package to find nodes for storage
keys, err := o.cache.DB.List()
keys, err := o.cache.DB.List(nil, storage.Limit(10))
if err != nil {
o.logger.Error("Error listing nodes", zap.Error(err))
return nil, err

View File

@ -1,6 +1,5 @@
// Code generated by protoc-gen-go.
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: netstate.proto
// DO NOT EDIT!
/*
Package netstate is a generated protocol buffer package.
@ -323,7 +322,7 @@ func (m *Pointer) GetMetadata() []byte {
type PutRequest struct {
Path []byte `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
Pointer *Pointer `protobuf:"bytes,2,opt,name=pointer" json:"pointer,omitempty"`
APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"`
APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,proto3" json:"APIKey,omitempty"`
}
func (m *PutRequest) Reset() { *m = PutRequest{} }
@ -355,7 +354,7 @@ func (m *PutRequest) GetAPIKey() []byte {
// GetRequest is a request message for the Get rpc call
type GetRequest struct {
Path []byte `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"`
APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,proto3" json:"APIKey,omitempty"`
}
func (m *GetRequest) Reset() { *m = GetRequest{} }
@ -381,7 +380,7 @@ func (m *GetRequest) GetAPIKey() []byte {
type ListRequest struct {
StartingPathKey []byte `protobuf:"bytes,1,opt,name=starting_path_key,json=startingPathKey,proto3" json:"starting_path_key,omitempty"`
Limit int64 `protobuf:"varint,2,opt,name=limit" json:"limit,omitempty"`
APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"`
APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,proto3" json:"APIKey,omitempty"`
}
func (m *ListRequest) Reset() { *m = ListRequest{} }
@ -438,7 +437,8 @@ func (m *GetResponse) GetPointer() []byte {
// ListResponse is a response message for the List rpc call
type ListResponse struct {
Paths [][]byte `protobuf:"bytes,1,rep,name=paths,proto3" json:"paths,omitempty"`
Paths [][]byte `protobuf:"bytes,1,rep,name=paths,proto3" json:"paths,omitempty"`
Truncated bool `protobuf:"varint,2,opt,name=truncated" json:"truncated,omitempty"`
}
func (m *ListResponse) Reset() { *m = ListResponse{} }
@ -453,9 +453,16 @@ func (m *ListResponse) GetPaths() [][]byte {
return nil
}
func (m *ListResponse) GetTruncated() bool {
if m != nil {
return m.Truncated
}
return false
}
type DeleteRequest struct {
Path []byte `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"`
APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,proto3" json:"APIKey,omitempty"`
}
func (m *DeleteRequest) Reset() { *m = DeleteRequest{} }
@ -687,58 +694,58 @@ var _NetState_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("netstate.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 843 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0xdb, 0x46,
0x10, 0x36, 0x2d, 0x9b, 0x92, 0x87, 0x92, 0x4c, 0x2f, 0x14, 0x87, 0x55, 0x0f, 0x31, 0x88, 0x06,
0x75, 0x6a, 0x40, 0x06, 0x54, 0x14, 0x48, 0x13, 0x14, 0x45, 0x2a, 0x0b, 0x86, 0x90, 0x44, 0x11,
0x56, 0x3a, 0xf4, 0x46, 0x6c, 0xc4, 0xa9, 0x44, 0x44, 0xdc, 0xa5, 0xb9, 0x2b, 0xa0, 0xea, 0xeb,
0xf5, 0x5d, 0x7a, 0x68, 0x6f, 0x7d, 0x82, 0x82, 0xbb, 0xfc, 0x93, 0x02, 0xb7, 0x40, 0x4f, 0xe2,
0xcc, 0x7e, 0xdf, 0xfc, 0x7c, 0x33, 0x23, 0xe8, 0x72, 0x54, 0x52, 0x31, 0x85, 0x83, 0x24, 0x15,
0x4a, 0x90, 0x56, 0x61, 0xf7, 0x9f, 0xad, 0x84, 0x58, 0x6d, 0xf0, 0x56, 0xfb, 0x3f, 0x6e, 0x7f,
0xb9, 0x55, 0x51, 0x8c, 0x52, 0xb1, 0x38, 0x31, 0x50, 0xff, 0x4f, 0x0b, 0x5c, 0x8a, 0xe1, 0x96,
0x87, 0x8c, 0x2f, 0x77, 0xf3, 0xe5, 0x1a, 0x63, 0x24, 0xdf, 0xc3, 0x89, 0xda, 0x25, 0xe8, 0x59,
0x57, 0xd6, 0x75, 0x77, 0xf8, 0x7c, 0x50, 0x86, 0x3f, 0x44, 0x0e, 0xcc, 0xcf, 0x62, 0x97, 0x20,
0xd5, 0x14, 0xf2, 0x14, 0x9a, 0x71, 0xc4, 0x83, 0x14, 0x1f, 0xbc, 0xe3, 0x2b, 0xeb, 0xba, 0x41,
0xed, 0x38, 0xe2, 0x14, 0x1f, 0x48, 0x0f, 0x4e, 0x95, 0x50, 0x6c, 0xe3, 0x35, 0xb4, 0xdb, 0x18,
0xe4, 0x05, 0xb8, 0x29, 0x26, 0x2c, 0x4a, 0x03, 0xb5, 0x4e, 0x51, 0xae, 0xc5, 0x26, 0xf4, 0x4e,
0x34, 0xe0, 0xdc, 0xf8, 0x17, 0x85, 0x9b, 0xdc, 0xc0, 0x85, 0xdc, 0x2e, 0x97, 0x28, 0x65, 0x0d,
0x7b, 0xaa, 0xb1, 0x6e, 0xfe, 0x50, 0x82, 0xfd, 0x1e, 0x40, 0x55, 0x1a, 0xb1, 0xe1, 0x98, 0xce,
0xdd, 0x23, 0xff, 0x6f, 0x0b, 0xdc, 0x31, 0x5f, 0xa6, 0xbb, 0x44, 0x45, 0x82, 0xe7, 0xcd, 0xfe,
0xb0, 0xd7, 0xec, 0x8b, 0xaa, 0xd9, 0x43, 0x64, 0xcd, 0x51, 0x6b, 0xf8, 0x25, 0x78, 0x68, 0xfc,
0x18, 0x06, 0x58, 0x22, 0x82, 0x4f, 0xb8, 0xd3, 0x0a, 0xb4, 0xe9, 0x65, 0xf9, 0x5e, 0x05, 0x78,
0x8b, 0xbb, 0x7d, 0xa6, 0x54, 0x2c, 0x55, 0x11, 0x5f, 0x05, 0x5c, 0xf0, 0x25, 0x6a, 0x91, 0xea,
0xcc, 0x79, 0xfe, 0x3c, 0xcd, 0x5e, 0xfd, 0x1b, 0xe8, 0xee, 0xd7, 0x42, 0x00, 0xec, 0x37, 0xe3,
0xf9, 0xfd, 0xe8, 0xbd, 0x7b, 0x44, 0x3a, 0x70, 0x36, 0x1f, 0x8f, 0xe8, 0x78, 0xf1, 0xd3, 0x87,
0x9f, 0x5d, 0xcb, 0x1f, 0x81, 0x43, 0x31, 0x16, 0x0a, 0x67, 0x11, 0x2e, 0x91, 0x7c, 0x09, 0x67,
0x49, 0xf6, 0x11, 0xf0, 0x6d, 0xac, 0x7b, 0x6e, 0xd0, 0x96, 0x76, 0x4c, 0xb7, 0x71, 0x36, 0x3d,
0x2e, 0x42, 0x0c, 0xa2, 0x50, 0xd7, 0x7e, 0x46, 0xed, 0xcc, 0x9c, 0x84, 0xfe, 0xef, 0x16, 0x74,
0x4c, 0x94, 0x39, 0xae, 0x62, 0xe4, 0x8a, 0xbc, 0x02, 0x48, 0xcb, 0x6d, 0xd0, 0x81, 0x9c, 0x61,
0xff, 0xf1, 0x4d, 0xa1, 0x35, 0x34, 0xf9, 0x02, 0x4c, 0xca, 0x2a, 0x4f, 0x53, 0xdb, 0x93, 0x90,
0xbc, 0x82, 0x4e, 0xaa, 0xf3, 0x04, 0xda, 0x23, 0xbd, 0xc6, 0x55, 0xe3, 0xda, 0x19, 0x3e, 0xa9,
0x47, 0x2e, 0x9b, 0xa1, 0xed, 0xb4, 0x32, 0x24, 0x79, 0x06, 0x4e, 0x8c, 0xe9, 0xa7, 0x0d, 0x06,
0xa9, 0x10, 0x4a, 0xef, 0x51, 0x9b, 0x82, 0x71, 0x51, 0x21, 0x94, 0xff, 0xd7, 0x31, 0x34, 0x67,
0x22, 0xe2, 0x0a, 0x53, 0x32, 0xd8, 0x1b, 0x7b, 0xad, 0xf2, 0x1c, 0x30, 0xb8, 0x63, 0x8a, 0xd5,
0xe6, 0xfc, 0x1c, 0xba, 0x11, 0xdf, 0x44, 0x1c, 0x03, 0x69, 0x14, 0xc8, 0x67, 0xd4, 0x31, 0xde,
0x42, 0x96, 0x5b, 0xb0, 0x4d, 0x4d, 0x3a, 0xbd, 0x33, 0x7c, 0x7a, 0x58, 0x78, 0x0e, 0xa4, 0x39,
0x8c, 0x10, 0x38, 0x91, 0xd1, 0x6f, 0x98, 0x6f, 0xb2, 0xfe, 0x26, 0x3f, 0x42, 0x67, 0x99, 0x22,
0xd3, 0x7b, 0x14, 0x32, 0x85, 0x9e, 0x9d, 0xcb, 0x6b, 0xae, 0x79, 0x50, 0x5c, 0xf3, 0x60, 0x51,
0x5c, 0x33, 0x6d, 0x17, 0x84, 0x3b, 0xa6, 0x90, 0x8c, 0xe0, 0x1c, 0x7f, 0x4d, 0xa2, 0xb4, 0x16,
0xa2, 0xf9, 0x9f, 0x21, 0xba, 0x15, 0x45, 0x07, 0xe9, 0x43, 0x2b, 0x46, 0xc5, 0x42, 0xa6, 0x98,
0xd7, 0xd2, 0xbd, 0x96, 0xb6, 0xef, 0x43, 0xab, 0xd0, 0x27, 0xdb, 0xbd, 0xc9, 0xf4, 0xdd, 0x64,
0x3a, 0x76, 0x8f, 0xb2, 0x6f, 0x3a, 0x7e, 0xff, 0x61, 0x31, 0x76, 0x2d, 0x1f, 0x01, 0x66, 0x5b,
0x45, 0xf1, 0x61, 0x8b, 0x52, 0x65, 0x7d, 0x26, 0x4c, 0xad, 0xb5, 0xde, 0x6d, 0xaa, 0xbf, 0xc9,
0x0d, 0x34, 0x13, 0xa3, 0xb6, 0x5e, 0x03, 0x67, 0x78, 0xf1, 0xd9, 0x18, 0x68, 0x81, 0x20, 0x97,
0x60, 0xbf, 0x99, 0x4d, 0xde, 0xe2, 0x2e, 0x17, 0xde, 0x66, 0xda, 0xf2, 0x5f, 0x02, 0xdc, 0xe3,
0xbf, 0xa6, 0xa9, 0x98, 0xc7, 0x7b, 0xcc, 0x15, 0x38, 0xef, 0x22, 0x59, 0x52, 0xbf, 0x81, 0x8b,
0xf2, 0x0a, 0x33, 0x9e, 0x3e, 0x61, 0x13, 0xe7, 0xbc, 0x78, 0x98, 0x31, 0xb5, 0xce, 0x6e, 0xb7,
0x07, 0xa7, 0x9b, 0x28, 0x8e, 0x54, 0xfe, 0x27, 0x67, 0x8c, 0x47, 0x4b, 0xec, 0x80, 0xa3, 0x95,
0x90, 0x89, 0xe0, 0x12, 0xfd, 0xaf, 0xc1, 0xd1, 0x15, 0x1b, 0x93, 0x78, 0x95, 0x0a, 0x26, 0x5b,
0x61, 0xfa, 0x5f, 0x41, 0xdb, 0x14, 0x98, 0x23, 0x7b, 0x70, 0x9a, 0x15, 0x26, 0x3d, 0xeb, 0xaa,
0x71, 0xdd, 0xa6, 0xc6, 0xf0, 0x5f, 0x43, 0xe7, 0x0e, 0x37, 0xa8, 0xf0, 0xff, 0x68, 0xe0, 0x42,
0xb7, 0x20, 0x9b, 0x24, 0xc3, 0x3f, 0x2c, 0x68, 0x4d, 0x51, 0xcd, 0xb3, 0x29, 0x90, 0x21, 0x34,
0x66, 0x5b, 0x45, 0x7a, 0xb5, 0xb9, 0x94, 0x23, 0xed, 0x3f, 0x39, 0xf0, 0xe6, 0x55, 0x0e, 0xa1,
0x71, 0x8f, 0x7b, 0x9c, 0x6a, 0x3e, 0x75, 0x4e, 0x5d, 0x83, 0xef, 0xe0, 0x24, 0xeb, 0x94, 0xd4,
0x9e, 0x6b, 0xa3, 0xe9, 0x5f, 0x1e, 0xba, 0x73, 0xda, 0x6b, 0xb0, 0x4d, 0xf5, 0xa4, 0x76, 0x67,
0x7b, 0x62, 0xf4, 0xbd, 0xcf, 0x1f, 0x0c, 0xf9, 0xa3, 0xad, 0x6f, 0xe0, 0xdb, 0x7f, 0x02, 0x00,
0x00, 0xff, 0xff, 0x7b, 0x95, 0x0c, 0x06, 0x3e, 0x07, 0x00, 0x00,
// 848 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x5f, 0x8b, 0xdb, 0x46,
0x10, 0x3f, 0x9d, 0xef, 0x6c, 0xdf, 0xc8, 0xf6, 0xe9, 0x16, 0xe7, 0xa2, 0xba, 0x85, 0x1e, 0x82,
0xd0, 0x4b, 0x0f, 0x5c, 0x70, 0x29, 0xa4, 0x09, 0xa5, 0x24, 0x77, 0xe6, 0x30, 0x49, 0x1c, 0xb3,
0xf6, 0x43, 0xdf, 0xc4, 0x46, 0x9a, 0xda, 0x22, 0xd6, 0x4a, 0x27, 0x8d, 0xa0, 0xee, 0xd7, 0xeb,
0x77, 0xe9, 0x43, 0xfb, 0xd6, 0x4f, 0x50, 0xb4, 0xab, 0x7f, 0x76, 0x48, 0x0b, 0x79, 0xb2, 0x66,
0xe6, 0xf7, 0x9b, 0x9d, 0xf9, 0xcd, 0x8c, 0x61, 0x20, 0x91, 0x52, 0x12, 0x84, 0xe3, 0x38, 0x89,
0x28, 0x62, 0xdd, 0xd2, 0x1e, 0x9d, 0x53, 0x10, 0x62, 0x4a, 0x22, 0x8c, 0x75, 0xc8, 0xf9, 0xcb,
0x00, 0x8b, 0xa3, 0x9f, 0x49, 0x5f, 0x48, 0x6f, 0xb7, 0xf4, 0x36, 0x18, 0x22, 0xfb, 0x11, 0x4e,
0x68, 0x17, 0xa3, 0x6d, 0x5c, 0x19, 0xd7, 0x83, 0xc9, 0x93, 0x71, 0x95, 0xee, 0x10, 0x39, 0xd6,
0x3f, 0xab, 0x5d, 0x8c, 0x5c, 0x51, 0xd8, 0x63, 0xe8, 0x84, 0x81, 0x74, 0x13, 0x7c, 0xb0, 0x8f,
0xaf, 0x8c, 0xeb, 0x16, 0x6f, 0x87, 0x81, 0xe4, 0xf8, 0xc0, 0x86, 0x70, 0x4a, 0x11, 0x89, 0xad,
0xdd, 0x52, 0x6e, 0x6d, 0xb0, 0xa7, 0x60, 0x25, 0x18, 0x8b, 0x20, 0x71, 0x69, 0x93, 0x60, 0xba,
0x89, 0xb6, 0xbe, 0x7d, 0xa2, 0x00, 0xe7, 0xda, 0xbf, 0x2a, 0xdd, 0xec, 0x06, 0x2e, 0xd2, 0xcc,
0xf3, 0x30, 0x4d, 0x1b, 0xd8, 0x53, 0x85, 0xb5, 0x8a, 0x40, 0x05, 0x76, 0x86, 0x00, 0x75, 0x69,
0xac, 0x0d, 0xc7, 0x7c, 0x69, 0x1d, 0x39, 0xff, 0x18, 0x60, 0x4d, 0xa5, 0x97, 0xec, 0x62, 0x0a,
0x22, 0x59, 0x34, 0xfb, 0xd3, 0x5e, 0xb3, 0x4f, 0xeb, 0x66, 0x0f, 0x91, 0x0d, 0x47, 0xa3, 0xe1,
0x67, 0x60, 0xa3, 0xf6, 0xa3, 0xef, 0x62, 0x85, 0x70, 0x3f, 0xe0, 0x4e, 0x29, 0xd0, 0xe3, 0x97,
0x55, 0xbc, 0x4e, 0xf0, 0x1a, 0x77, 0xfb, 0xcc, 0x94, 0x44, 0x42, 0x81, 0x5c, 0xbb, 0x32, 0x92,
0x1e, 0x2a, 0x91, 0x9a, 0xcc, 0x65, 0x11, 0x9e, 0xe7, 0x51, 0xe7, 0x06, 0x06, 0xfb, 0xb5, 0x30,
0x80, 0xf6, 0xcb, 0xe9, 0xf2, 0xfe, 0xf6, 0xad, 0x75, 0xc4, 0xfa, 0x70, 0xb6, 0x9c, 0xde, 0xf2,
0xe9, 0xea, 0xd5, 0xbb, 0x5f, 0x2c, 0xc3, 0xb9, 0x05, 0x93, 0x63, 0x18, 0x11, 0x2e, 0x02, 0xf4,
0x90, 0x7d, 0x09, 0x67, 0x71, 0xfe, 0xe1, 0xca, 0x2c, 0x54, 0x3d, 0xb7, 0x78, 0x57, 0x39, 0xe6,
0x59, 0x98, 0x4f, 0x4f, 0x46, 0x3e, 0xba, 0x81, 0xaf, 0x6a, 0x3f, 0xe3, 0xed, 0xdc, 0x9c, 0xf9,
0xce, 0x1f, 0x06, 0xf4, 0x75, 0x96, 0x25, 0xae, 0x43, 0x94, 0xc4, 0x9e, 0x03, 0x24, 0xd5, 0x36,
0xa8, 0x44, 0xe6, 0x64, 0xf4, 0xe9, 0x4d, 0xe1, 0x0d, 0x34, 0xfb, 0x02, 0xf4, 0x93, 0xf5, 0x3b,
0x1d, 0x65, 0xcf, 0x7c, 0xf6, 0x1c, 0xfa, 0x89, 0x7a, 0xc7, 0x55, 0x9e, 0xd4, 0x6e, 0x5d, 0xb5,
0xae, 0xcd, 0xc9, 0xa3, 0x66, 0xe6, 0xaa, 0x19, 0xde, 0x4b, 0x6a, 0x23, 0x65, 0x5f, 0x83, 0x19,
0x62, 0xf2, 0x61, 0x8b, 0x6e, 0x12, 0x45, 0xa4, 0xf6, 0xa8, 0xc7, 0x41, 0xbb, 0x78, 0x14, 0x91,
0xf3, 0xf7, 0x31, 0x74, 0x16, 0x51, 0x20, 0x09, 0x13, 0x36, 0xde, 0x1b, 0x7b, 0xa3, 0xf2, 0x02,
0x30, 0xbe, 0x13, 0x24, 0x1a, 0x73, 0x7e, 0x02, 0x83, 0x40, 0x6e, 0x03, 0x89, 0x6e, 0xaa, 0x15,
0x28, 0x66, 0xd4, 0xd7, 0xde, 0x52, 0x96, 0xef, 0xa0, 0xad, 0x6b, 0x52, 0xcf, 0x9b, 0x93, 0xc7,
0x87, 0x85, 0x17, 0x40, 0x5e, 0xc0, 0x18, 0x83, 0x93, 0x34, 0xf8, 0x1d, 0x8b, 0x4d, 0x56, 0xdf,
0xec, 0x67, 0xe8, 0x7b, 0x09, 0x0a, 0xb5, 0x47, 0xbe, 0x20, 0xb4, 0xdb, 0x85, 0xbc, 0xeb, 0x28,
0x5a, 0x6f, 0x8b, 0xab, 0x7e, 0x9f, 0xfd, 0x3a, 0x5e, 0x95, 0xd7, 0xcc, 0x7b, 0x25, 0xe1, 0x4e,
0x10, 0xb2, 0x5b, 0x38, 0xc7, 0xdf, 0xe2, 0x20, 0x69, 0xa4, 0xe8, 0xfc, 0x6f, 0x8a, 0x41, 0x4d,
0x51, 0x49, 0x46, 0xd0, 0x0d, 0x91, 0x84, 0x2f, 0x48, 0xd8, 0x5d, 0xd5, 0x6b, 0x65, 0x3b, 0x0e,
0x74, 0x4b, 0x7d, 0xf2, 0xdd, 0x9b, 0xcd, 0xdf, 0xcc, 0xe6, 0x53, 0xeb, 0x28, 0xff, 0xe6, 0xd3,
0xb7, 0xef, 0x56, 0x53, 0xcb, 0x70, 0x10, 0x60, 0x91, 0x11, 0xc7, 0x87, 0x0c, 0x53, 0xca, 0xfb,
0x8c, 0x05, 0x6d, 0x94, 0xde, 0x3d, 0xae, 0xbe, 0xd9, 0x0d, 0x74, 0x62, 0xad, 0xb6, 0x5a, 0x03,
0x73, 0x72, 0xf1, 0xd1, 0x18, 0x78, 0x89, 0x60, 0x97, 0xd0, 0x7e, 0xb9, 0x98, 0xbd, 0xc6, 0x5d,
0x21, 0x7c, 0x61, 0x39, 0xcf, 0x00, 0xee, 0xf1, 0x3f, 0x9f, 0xa9, 0x99, 0xc7, 0x7b, 0xcc, 0x35,
0x98, 0x6f, 0x82, 0xb4, 0xa2, 0x7e, 0x0b, 0x17, 0xd5, 0x15, 0xe6, 0x3c, 0x75, 0xc2, 0x3a, 0xcf,
0x79, 0x19, 0x58, 0x08, 0xda, 0xe4, 0xb7, 0x3b, 0x84, 0xd3, 0x6d, 0x10, 0x06, 0x54, 0xfc, 0xc9,
0x69, 0xe3, 0x93, 0x25, 0xf6, 0xc1, 0x54, 0x4a, 0xa4, 0x71, 0x24, 0x53, 0x74, 0xbe, 0x01, 0x53,
0x55, 0xac, 0x4d, 0x66, 0xd7, 0x2a, 0xe8, 0xd7, 0x4a, 0xd3, 0x79, 0x05, 0x3d, 0x5d, 0x60, 0x81,
0x1c, 0xc2, 0x69, 0x5e, 0x58, 0x6a, 0x1b, 0x57, 0xad, 0xeb, 0x1e, 0xd7, 0x06, 0xfb, 0x0a, 0xce,
0x28, 0xc9, 0xa4, 0x27, 0x08, 0xf5, 0x39, 0x75, 0x79, 0xed, 0x70, 0x5e, 0x40, 0xff, 0x0e, 0xb7,
0x48, 0xf8, 0x39, 0x0a, 0x59, 0x30, 0x28, 0xc9, 0xba, 0x84, 0xc9, 0x9f, 0x06, 0x74, 0xe7, 0x48,
0xcb, 0x7c, 0x46, 0x6c, 0x02, 0xad, 0x45, 0x46, 0x6c, 0xd8, 0x98, 0x5a, 0x35, 0xf0, 0xd1, 0xa3,
0x03, 0x6f, 0xd1, 0xc3, 0x04, 0x5a, 0xf7, 0xb8, 0xc7, 0xa9, 0xa7, 0xd7, 0xe4, 0x34, 0x15, 0xfa,
0x01, 0x4e, 0x72, 0x1d, 0x58, 0x23, 0xdc, 0x18, 0xdc, 0xe8, 0xf2, 0xd0, 0x5d, 0xd0, 0x5e, 0x40,
0x5b, 0x57, 0xcf, 0x1a, 0x57, 0xb8, 0x27, 0xc6, 0xc8, 0xfe, 0x38, 0xa0, 0xc9, 0xef, 0xdb, 0xea,
0x42, 0xbe, 0xff, 0x37, 0x00, 0x00, 0xff, 0xff, 0x3a, 0xbb, 0x3a, 0x6c, 0x4c, 0x07, 0x00, 0x00,
}

View File

@ -105,6 +105,7 @@ message GetResponse {
// ListResponse is a response message for the List rpc call
message ListResponse {
repeated bytes paths = 1;
bool truncated = 2;
}
message DeleteRequest {

View File

@ -67,30 +67,38 @@ func (c *boltClient) Get(pathKey storage.Key) (storage.Value, error) {
err := c.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(c.Bucket)
v := b.Get(pathKey)
if v == nil {
return Error.New("pointer at %#v not found", string(pathKey))
}
pointerBytes = v
return nil
})
return pointerBytes, err
if err != nil {
// TODO: log
return nil, err
}
return pointerBytes, nil
}
// List returns either a list of keys for which boltdb has values or an error.
func (c *boltClient) List() (storage.Keys, error) {
func (c *boltClient) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
c.logger.Debug("entering bolt list")
var paths storage.Keys
err := c.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(c.Bucket)
err := b.ForEach(func(key, value []byte) error {
paths = append(paths, key)
return nil
})
return err
cur := tx.Bucket(c.Bucket).Cursor()
var k []byte
if startingKey == nil {
k, _ = cur.First()
} else {
k, _ = cur.Seek(startingKey)
}
for ; k != nil; k, _ = cur.Next() {
paths = append(paths, k)
if limit > 0 && int(limit) == int(len(paths)) {
break
}
}
return nil
})
return paths, err
}

View File

@ -10,9 +10,46 @@ import (
"testing"
"go.uber.org/zap"
"storj.io/storj/pkg/netstate"
"storj.io/storj/storage"
)
type BoltClientTest struct {
*testing.T
c storage.KeyValueStore
}
func NewBoltClientTest(t *testing.T) *BoltClientTest {
logger, _ := zap.NewDevelopment()
dbName := tempfile()
c, err := NewClient(logger, dbName, "test_bucket")
if err != nil {
t.Error("Failed to create test db")
panic(err)
}
return &BoltClientTest{
T: t,
c: c,
}
}
func (bt *BoltClientTest) Close() {
bt.c.Close()
switch client := bt.c.(type) {
case *boltClient:
os.Remove(client.Path)
}
}
func (bt *BoltClientTest) HandleErr(err error, msg string) {
bt.Error(msg)
if err != nil {
panic(err)
}
panic(msg)
}
func tempfile() string {
f, err := ioutil.TempFile("", "TempBolt-")
if err != nil {
@ -26,60 +63,94 @@ func tempfile() string {
return f.Name()
}
func TestNetState(t *testing.T) {
logger, _ := zap.NewDevelopment()
c, err := NewClient(logger, tempfile(), "test_bucket")
if err != nil {
t.Error("Failed to create test db")
}
defer func() {
c.Close()
switch client := c.(type) {
case *boltClient:
os.Remove(client.Path)
}
}()
func TestPut(t *testing.T) {
bt := NewBoltClientTest(t)
defer bt.Close()
testEntry1 := netstate.PointerEntry{
Path: []byte(`test/path`),
Pointer: []byte(`pointer1`),
}
testEntry2 := netstate.PointerEntry{
Path: []byte(`test/path2`),
Pointer: []byte(`pointer2`),
}
// tests Put function
if err := c.Put(testEntry1.Path, testEntry1.Pointer); err != nil {
t.Error("Failed to save testFile to pointers bucket")
}
// tests Get function
retrvValue, err := c.Get([]byte("test/path"))
if err != nil {
t.Error("Failed to get saved test pointer")
}
if !bytes.Equal(retrvValue, testEntry1.Pointer) {
t.Error("Retrieved pointer was not same as put pointer")
}
// tests Delete function
if err := c.Delete([]byte("test/path")); err != nil {
t.Error("Failed to delete test entry")
}
// tests List function
if err := c.Put(testEntry2.Path, testEntry2.Pointer); err != nil {
t.Error("Failed to put testEntry2 to pointers bucket")
}
testPaths, err := c.List()
if err != nil {
t.Error("Failed to list Path keys in pointers bucket")
}
// tests List + Delete function
if !bytes.Equal(testPaths[0], []byte("test/path2")) {
t.Error("Expected only testEntry2 path in list")
if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil {
bt.HandleErr(err, "Failed to save pointer1 to pointers bucket")
}
}
func TestGet(t *testing.T) {
bt := NewBoltClientTest(t)
defer bt.Close()
if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil {
bt.HandleErr(err, "Failed to save pointer1 to pointers bucket")
}
retrvValue, err := bt.c.Get([]byte("test/path/1"))
if err != nil {
bt.HandleErr(err, "Failed to get")
}
if retrvValue.IsZero() {
bt.HandleErr(nil, "Failed to get saved test pointer")
}
if !bytes.Equal(retrvValue, []byte("pointer1")) {
bt.HandleErr(nil, "Retrieved pointer was not same as put pointer")
}
// tests Get non-existent path
getRes, err := bt.c.Get([]byte("fake/path"))
if err != nil {
bt.HandleErr(err, "Failed to get")
}
if !getRes.IsZero() {
bt.HandleErr(nil, "Expected zero-value response for getting fake path")
}
}
func TestDelete(t *testing.T) {
bt := NewBoltClientTest(t)
defer bt.Close()
if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil {
bt.HandleErr(err, "Failed to save pointer1 to pointers bucket")
}
if err := bt.c.Delete([]byte("test/path/1")); err != nil {
bt.HandleErr(err, "Failed to delete test entry")
}
}
func TestList(t *testing.T) {
bt := NewBoltClientTest(t)
defer bt.Close()
if err := bt.c.Put([]byte("test/path/2"), []byte("pointer2")); err != nil {
bt.HandleErr(err, "Failed to put pointer2 to pointers bucket")
}
testPaths, err := bt.c.List([]byte("test/path/2"), storage.Limit(1))
if err != nil {
bt.HandleErr(err, "Failed to list Path keys in pointers bucket")
}
if !bytes.Equal(testPaths[0], []byte("test/path/2")) {
bt.HandleErr(nil, "Expected only test/path/2 in list")
}
}
func TestListNoStartingKey(t *testing.T) {
bt := NewBoltClientTest(t)
defer bt.Close()
if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil {
bt.HandleErr(err, "Failed to save pointer1 to pointers bucket")
}
if err := bt.c.Put([]byte("test/path/2"), []byte("pointer2")); err != nil {
bt.HandleErr(err, "Failed to save pointer2 to pointers bucket")
}
if err := bt.c.Put([]byte("test/path/3"), []byte("pointer3")); err != nil {
bt.HandleErr(err, "Failed to save pointer3 to pointers bucket")
}
testPaths, err := bt.c.List(nil, storage.Limit(3))
if err != nil {
bt.HandleErr(err, "Failed to list Paths")
}
if !bytes.Equal(testPaths[2], []byte("test/path/3")) {
bt.HandleErr(nil, "Expected test/path/3 to be last in list")
}
}

View File

@ -12,16 +12,29 @@ type Value []byte
// Keys is the type for a slice of keys in a `KeyValueStore`
type Keys []Key
// Limit indicates how many keys to return when calling List
type Limit int
// KeyValueStore is an interface describing key/value stores like redis and boltdb
type KeyValueStore interface {
// Put adds a value to the provided key in the KeyValueStore, returning an error on failure.
Put(Key, Value) error
Get(Key) (Value, error)
List() (Keys, error)
List(Key, Limit) (Keys, error)
Delete(Key) error
Close() error
}
// IsZero returns true if the value struct is it's zero value
func (v *Value) IsZero() (_ bool) {
return len(*v) == 0
}
// IsZero returns true if the key struct is it's zero value
func (k *Key) IsZero() (_ bool) {
return len(*k) == 0
}
// MarshalBinary implements the encoding.BinaryMarshaler interface for the Value type
func (v *Value) MarshalBinary() (_ []byte, _ error) {
return *v, nil

View File

@ -4,6 +4,7 @@
package redis
import (
"fmt"
"time"
"github.com/go-redis/redis"
@ -49,6 +50,11 @@ func NewClient(address, password string, db int) (storage.KeyValueStore, error)
func (c *redisClient) Get(key storage.Key) (storage.Value, error) {
b, err := c.db.Get(string(key)).Bytes()
if err != nil {
if err.Error() == "redis: nil" {
return nil, nil
}
// TODO: log
return nil, Error.New("get error", err)
}
@ -72,18 +78,32 @@ func (c *redisClient) Put(key storage.Key, value storage.Value) error {
}
// List returns either a list of keys for which boltdb has values or an error.
func (c *redisClient) List() (_ storage.Keys, _ error) {
results, err := c.db.Keys("*").Result()
if err != nil {
return nil, Error.New("list error", err)
func (c *redisClient) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) {
var noOrderKeys []string
if startingKey != nil {
_, cursor, err := c.db.Scan(0, fmt.Sprintf("%s", startingKey), int64(limit)).Result()
if err != nil {
return nil, Error.New("list error with starting key", err)
}
keys, _, err := c.db.Scan(cursor, "", int64(limit)).Result()
if err != nil {
return nil, Error.New("list error with starting key", err)
}
noOrderKeys = keys
} else if startingKey == nil {
keys, _, err := c.db.Scan(0, "", int64(limit)).Result()
if err != nil {
return nil, Error.New("list error without starting key", err)
}
noOrderKeys = keys
}
keys := make(storage.Keys, len(results))
for i, k := range results {
keys[i] = storage.Key(k)
listKeys := make(storage.Keys, len(noOrderKeys))
for i, k := range noOrderKeys {
listKeys[i] = storage.Key(k)
}
return keys, nil
return listKeys, nil
}
// Delete deletes a key/value pair from redis, for a given the key

View File

@ -0,0 +1,83 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package redis
import (
"testing"
"storj.io/storj/internal/test"
"storj.io/storj/storage"
)
type RedisClientTest struct {
*testing.T
c storage.KeyValueStore
}
func NewRedisClientTest(t *testing.T) *RedisClientTest {
kv := make(test.KvStore)
c := test.NewMockKeyValueStore(kv)
return &RedisClientTest{
T: t,
c: c,
}
}
func (rt *RedisClientTest) Close() {
rt.c.Close()
}
func (rt *RedisClientTest) HandleErr(err error, msg string) {
rt.Error(msg)
if err != nil {
panic(err)
}
panic(msg)
}
func TestListWithoutStartKey(t *testing.T) {
rt := NewRedisClientTest(t)
defer rt.Close()
if err := rt.c.Put(storage.Key([]byte("path/1")), []byte("pointer1")); err != nil {
rt.HandleErr(err, "Failed to put")
}
if err := rt.c.Put(storage.Key([]byte("path/2")), []byte("pointer2")); err != nil {
rt.HandleErr(err, "Failed to put")
}
if err := rt.c.Put(storage.Key([]byte("path/3")), []byte("pointer3")); err != nil {
rt.HandleErr(err, "Failed to put")
}
_, err := rt.c.List(nil, storage.Limit(3))
if err != nil {
rt.HandleErr(err, "Failed to list")
}
}
func TestListWithStartKey(t *testing.T) {
rt := NewRedisClientTest(t)
defer rt.Close()
if err := rt.c.Put(storage.Key([]byte("path/1")), []byte("pointer1")); err != nil {
rt.HandleErr(err, "Failed to put")
}
if err := rt.c.Put(storage.Key([]byte("path/2")), []byte("pointer2")); err != nil {
rt.HandleErr(err, "Failed to put")
}
if err := rt.c.Put(storage.Key([]byte("path/3")), []byte("pointer3")); err != nil {
rt.HandleErr(err, "Failed to put")
}
if err := rt.c.Put(storage.Key([]byte("path/4")), []byte("pointer4")); err != nil {
rt.HandleErr(err, "Failed to put")
}
if err := rt.c.Put(storage.Key([]byte("path/5")), []byte("pointer5")); err != nil {
rt.HandleErr(err, "Failed to put")
}
_, err := rt.c.List([]byte("path/2"), storage.Limit(2))
if err != nil {
rt.HandleErr(err, "Failed to list")
}
}