diff --git a/examples/netstate-client/main.go b/examples/netstate-client/main.go index 33bc30e2d..48f33b4be 100644 --- a/examples/netstate-client/main.go +++ b/examples/netstate-client/main.go @@ -18,7 +18,8 @@ import ( ) var ( - port string + port string + apiKey = []byte("abc123") ) func initializeFlags() { @@ -44,36 +45,57 @@ func main() { ctx := context.Background() // Example pointer paths to put - //pr1 passes with api creds pr1 := proto.PutRequest{ - Path: []byte("welcome/to/my/pointer/journey"), + Path: []byte("test/path/1"), Pointer: &proto.Pointer{ Type: proto.Pointer_INLINE, - InlineSegment: []byte("granola"), + InlineSegment: []byte("inline1"), }, - APIKey: []byte("abc123"), + APIKey: apiKey, } - // pr2 passes with api creds pr2 := proto.PutRequest{ - Path: []byte("so/many/pointers"), + Path: []byte("test/path/2"), Pointer: &proto.Pointer{ Type: proto.Pointer_INLINE, - InlineSegment: []byte("m&ms"), + InlineSegment: []byte("inline2"), }, - APIKey: []byte("abc123"), + APIKey: apiKey, } - // pr3 fails api creds pr3 := proto.PutRequest{ - Path: []byte("another/pointer/for/the/pile"), + Path: []byte("test/path/3"), Pointer: &proto.Pointer{ Type: proto.Pointer_INLINE, - InlineSegment: []byte("popcorn"), + InlineSegment: []byte("inline3"), }, - APIKey: []byte("abc13"), + APIKey: apiKey, + } + // rps is an example slice of RemotePieces, which is passed into + // this example Pointer of type REMOTE. + var rps []*proto.RemotePiece + rps = append(rps, &proto.RemotePiece{ + PieceNum: int64(1), + NodeId: "testId", + }) + pr4 := proto.PutRequest{ + Path: []byte("test/path/4"), + Pointer: &proto.Pointer{ + Type: proto.Pointer_REMOTE, + Remote: &proto.RemoteSegment{ + Redundancy: &proto.RedundancyScheme{ + Type: proto.RedundancyScheme_RS, + MinReq: int64(1), + Total: int64(3), + RepairThreshold: int64(2), + SuccessThreshold: int64(3), + }, + PieceId: "testId", + RemotePieces: rps, + }, + }, + APIKey: apiKey, } // Example Puts - // puts passes api creds _, err = client.Put(ctx, &pr1) if err != nil || status.Code(err) == codes.Internal { logger.Error("failed to put", zap.Error(err)) @@ -86,12 +108,15 @@ func main() { if err != nil || status.Code(err) == codes.Internal { logger.Error("failed to put", zap.Error(err)) } + _, err = client.Put(ctx, &pr4) + if err != nil || status.Code(err) == codes.Internal { + logger.Error("failed to put", zap.Error(err)) + } // Example Get - // get passes api creds getReq := proto.GetRequest{ - Path: []byte("so/many/pointers"), - APIKey: []byte("abc123"), + Path: []byte("test/path/1"), + APIKey: apiKey, } getRes, err := client.Get(ctx, &getReq) if err != nil || status.Code(err) == codes.Internal { @@ -102,15 +127,11 @@ func main() { } // Example List - // list passes api creds listReq := proto.ListRequest{ - // This pagination functionality doesn't work yet. - // The given arguments are placeholders. - StartingPathKey: []byte("test/pointer/path"), + StartingPathKey: []byte("test/path/2"), Limit: 5, - APIKey: []byte("abc123"), + APIKey: apiKey, } - listRes, err := client.List(ctx, &listReq) if err != nil || status.Code(err) == codes.Internal { logger.Error("failed to list file paths") @@ -119,14 +140,13 @@ func main() { for _, pathByte := range listRes.Paths { stringList = append(stringList, string(pathByte)) } - logger.Debug("listed paths: " + strings.Join(stringList, ", ")) + logger.Debug("listed paths: " + strings.Join(stringList, ", ") + "; truncated: " + fmt.Sprintf("%t", listRes.Truncated)) } // Example Delete - // delete passes api creds delReq := proto.DeleteRequest{ - Path: []byte("welcome/to/my/pointer/journey"), - APIKey: []byte("abc123"), + Path: []byte("test/path/1"), + APIKey: apiKey, } _, err = client.Delete(ctx, &delReq) if err != nil || status.Code(err) == codes.Internal { diff --git a/internal/test/storage.go b/internal/test/storage.go index ed2386e33..fad164f21 100644 --- a/internal/test/storage.go +++ b/internal/test/storage.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "path/filepath" + "sort" "testing" "time" @@ -65,11 +66,11 @@ var ( func (m *MockKeyValueStore) Get(key storage.Key) (storage.Value, error) { m.GetCalled++ if key.String() == "error" { - return storage.Value{}, ErrForced.New("forced error") + return nil, nil } v, ok := m.Data[key.String()] if !ok { - return storage.Value{}, ErrMissingKey.New("key %v missing", key) + return storage.Value{}, nil } return v, nil @@ -90,16 +91,42 @@ func (m *MockKeyValueStore) Delete(key storage.Key) error { } // List returns either a list of keys for which the MockKeyValueStore has values or an error. -func (m *MockKeyValueStore) List() (_ storage.Keys, _ error) { +func (m *MockKeyValueStore) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) { m.ListCalled++ keys := storage.Keys{} - for k := range m.Data { - keys = append(keys, storage.Key(k)) - } + keySlice := mapIntoSlice(m.Data) + started := false + if startingKey == nil { + started = true + } + for _, key := range keySlice { + if !started && key == string(startingKey) { + keys = append(keys, storage.Key(key)) + started = true + continue + } + if started { + if len(keys) == int(limit) { + break + } + keys = append(keys, storage.Key(key)) + } + } return keys, nil } +func mapIntoSlice(data KvStore) []string { + keySlice := make([]string, len(data)) + i := 0 + for k := range data { + keySlice[i] = k + i++ + } + sort.Strings(keySlice) + return keySlice +} + // Close closes the client func (m *MockKeyValueStore) Close() error { m.CloseCalled++ diff --git a/pkg/netstate/client_test.go b/pkg/netstate/client_test.go deleted file mode 100644 index 3ff41ca48..000000000 --- a/pkg/netstate/client_test.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package netstate - -import ( - "bytes" - "context" - "fmt" - "net" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "storj.io/storj/internal/test" - pb "storj.io/storj/protos/netstate" -) - -func TestNetStateClient(t *testing.T) { - logger, _ := zap.NewDevelopment() - - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 9000)) - assert.NoError(t, err) - - mdb := test.NewMockKeyValueStore(test.KvStore{}) - - grpcServer := grpc.NewServer() - pb.RegisterNetStateServer(grpcServer, NewServer(mdb, logger)) - - defer grpcServer.GracefulStop() - go grpcServer.Serve(lis) - - address := lis.Addr().String() - conn, err := grpc.Dial(address, grpc.WithInsecure()) - assert.NoError(t, err) - - c := pb.NewNetStateClient(conn) - - ctx := context.Background() - - // example file path to put/get - pr1 := pb.PutRequest{ - Path: []byte("here/is/a/path"), - Pointer: &pb.Pointer{ - Type: pb.Pointer_INLINE, - InlineSegment: []byte("oatmeal"), - }, - APIKey: []byte("abc123"), - } - - // Tests Server.Put - _, err = c.Put(ctx, &pr1) - if err != nil || status.Code(err) == codes.Internal { - t.Error("Failed to Put") - } - - if mdb.PutCalled != 1 { - t.Error("Failed to call mockdb correctly") - } - - pointerBytes, err := proto.Marshal(pr1.Pointer) - if err != nil { - t.Error("failed to marshal test pointer") - } - - if !bytes.Equal(mdb.Data[string(pr1.Path)], pointerBytes) { - t.Error("Expected saved pointer to equal given pointer") - } - - // Tests Server.Get - getReq := pb.GetRequest{ - Path: []byte("here/is/a/path"), - APIKey: []byte("abc123"), - } - - getRes, err := c.Get(ctx, &getReq) - assert.NoError(t, err) - - if !bytes.Equal(getRes.Pointer, pointerBytes) { - t.Error("Expected to get same content that was put") - } - - if mdb.GetCalled != 1 { - t.Error("Failed to call mockdb correct number of times") - } - - // Puts another pointer entry to test delete and list - pr2 := pb.PutRequest{ - Path: []byte("here/is/another/path"), - Pointer: &pb.Pointer{ - Type: pb.Pointer_INLINE, - InlineSegment: []byte("raisins"), - }, - APIKey: []byte("abc123"), - } - - _, err = c.Put(ctx, &pr2) - if err != nil || status.Code(err) == codes.Internal { - t.Error("Failed to Put") - } - - if mdb.PutCalled != 2 { - t.Error("Failed to call mockdb correct number of times") - } - - // Test Server.Delete - delReq := pb.DeleteRequest{ - Path: []byte("here/is/a/path"), - APIKey: []byte("abc123"), - } - - _, err = c.Delete(ctx, &delReq) - if err != nil || status.Code(err) == codes.Internal { - t.Error("Failed to delete") - } - - if mdb.DeleteCalled != 1 { - t.Error("Failed to call mockdb correct number of times") - } - - // Tests Server.List - listReq := pb.ListRequest{ - // This pagination functionality doesn't work yet. - // The given arguments are placeholders. - StartingPathKey: []byte("test/pointer/path"), - Limit: 5, - APIKey: []byte("abc123"), - } - - listRes, err := c.List(ctx, &listReq) - if err != nil { - t.Error("Failed to list file paths") - } - - if !bytes.Equal(listRes.Paths[0], []byte("here/is/another/path")) { - t.Error("Failed to list correct file path") - } - - if mdb.ListCalled != 1 { - t.Error("Failed to call mockdb correct number of times") - } -} diff --git a/pkg/netstate/common.go b/pkg/netstate/common.go new file mode 100644 index 000000000..f0c9ade17 --- /dev/null +++ b/pkg/netstate/common.go @@ -0,0 +1,11 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package netstate + +import ( + "github.com/zeebo/errs" +) + +// Error is the default boltdb errs class +var Error = errs.Class("netstate error") diff --git a/pkg/netstate/netstate.go b/pkg/netstate/netstate.go index 53e808510..835a77f64 100644 --- a/pkg/netstate/netstate.go +++ b/pkg/netstate/netstate.go @@ -17,12 +17,6 @@ import ( "storj.io/storj/storage" ) -// PointerEntry - Path and Pointer are saved as a key/value pair to a `storage.KeyValueStore`. -type PointerEntry struct { - Path []byte - Pointer []byte -} - // Server implements the network state RPC service type Server struct { DB storage.KeyValueStore @@ -45,7 +39,7 @@ func (s *Server) validateAuth(APIKeyBytes []byte) error { return nil } -// Put formats and hands off a file path to be saved to boltdb +// Put formats and hands off a key/value (path/pointer) to be saved to boltdb func (s *Server) Put(ctx context.Context, putReq *pb.PutRequest) (*pb.PutResponse, error) { s.logger.Debug("entering netstate put") @@ -60,16 +54,11 @@ func (s *Server) Put(ctx context.Context, putReq *pb.PutRequest) (*pb.PutRespons return nil, status.Errorf(codes.Internal, err.Error()) } - pe := PointerEntry{ - Path: putReq.Path, - Pointer: pointerBytes, - } - - if err := s.DB.Put(pe.Path, pe.Pointer); err != nil { + if err := s.DB.Put(putReq.Path, pointerBytes); err != nil { s.logger.Error("err putting pointer", zap.Error(err)) return nil, status.Errorf(codes.Internal, err.Error()) } - s.logger.Debug("put to the db: " + string(pe.Path)) + s.logger.Debug("put to the db: " + string(putReq.Path)) return &pb.PutResponse{}, nil } @@ -84,7 +73,6 @@ func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, } pointerBytes, err := s.DB.Get(req.Path) - if err != nil { s.logger.Error("err getting file", zap.Error(err)) return nil, status.Errorf(codes.Internal, err.Error()) @@ -99,25 +87,48 @@ func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, func (s *Server) List(ctx context.Context, req *pb.ListRequest) (*pb.ListResponse, error) { s.logger.Debug("entering netstate list") + if req.Limit <= 0 { + return nil, Error.New("err Limit is less than or equal to 0") + } + APIKeyBytes := []byte(req.APIKey) if err := s.validateAuth(APIKeyBytes); err != nil { return nil, err } - pathKeys, err := s.DB.List() - - if err != nil { - s.logger.Error("err listing path keys", zap.Error(err)) - return nil, status.Errorf(codes.Internal, err.Error()) + var keyList storage.Keys + if req.StartingPathKey == nil { + pathKeys, err := s.DB.List(nil, storage.Limit(req.Limit)) + if err != nil { + s.logger.Error("err listing path keys with no starting key", zap.Error(err)) + return nil, status.Errorf(codes.Internal, err.Error()) + } + keyList = pathKeys + } else if req.StartingPathKey != nil { + pathKeys, err := s.DB.List(storage.Key(req.StartingPathKey), storage.Limit(req.Limit)) + if err != nil { + s.logger.Error("err listing path keys", zap.Error(err)) + return nil, status.Errorf(codes.Internal, err.Error()) + } + keyList = pathKeys } + truncated := isItTruncated(keyList, int(req.Limit)) + s.logger.Debug("path keys retrieved") return &pb.ListResponse{ - // pathKeys is an array of byte arrays - Paths: pathKeys.ByteSlices(), + Paths: keyList.ByteSlices(), + Truncated: truncated, }, nil } +func isItTruncated(keyList storage.Keys, limit int) bool { + if len(keyList) == limit { + return true + } + return false +} + // Delete formats and hands off a file path to delete from boltdb func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) { s.logger.Debug("entering netstate delete") @@ -129,7 +140,7 @@ func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteR err := s.DB.Delete(req.Path) if err != nil { - s.logger.Error("err deleting pointer entry", zap.Error(err)) + s.logger.Error("err deleting path and pointer", zap.Error(err)) return nil, status.Errorf(codes.Internal, err.Error()) } s.logger.Debug("deleted pointer at path: " + string(req.Path)) diff --git a/pkg/netstate/netstate_test.go b/pkg/netstate/netstate_test.go new file mode 100644 index 000000000..a641f6a06 --- /dev/null +++ b/pkg/netstate/netstate_test.go @@ -0,0 +1,423 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package netstate + +import ( + "bytes" + "context" + "fmt" + "net" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/spf13/viper" + "go.uber.org/zap" + "google.golang.org/grpc" + + "storj.io/storj/internal/test" + pb "storj.io/storj/protos/netstate" +) + +var ( + ctx = context.Background() +) + +type NetStateClientTest struct { + *testing.T + + server *grpc.Server + lis net.Listener + mdb *test.MockKeyValueStore + c pb.NetStateClient +} + +func NewNetStateClientTest(t *testing.T) *NetStateClientTest { + mdb := test.NewMockKeyValueStore(test.KvStore{}) + + viper.Reset() + viper.Set("key", "abc123") + + // tests should always listen on "localhost:0" + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + panic(err) + } + + grpcServer := grpc.NewServer() + pb.RegisterNetStateServer(grpcServer, NewServer(mdb, zap.L())) + go grpcServer.Serve(lis) + + conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + grpcServer.GracefulStop() + lis.Close() + t.Fatal(err) + } + + return &NetStateClientTest{ + T: t, + server: grpcServer, + lis: lis, + mdb: mdb, + c: pb.NewNetStateClient(conn), + } +} + +func (nt *NetStateClientTest) Close() { + nt.server.GracefulStop() + nt.lis.Close() +} + +func MakePointer(path []byte, auth bool) pb.PutRequest { + var APIKey = "abc123" + if !auth { + APIKey = "wrong key" + } + // rps is an example slice of RemotePieces to add to this + // REMOTE pointer type. + var rps []*pb.RemotePiece + rps = append(rps, &pb.RemotePiece{ + PieceNum: int64(1), + NodeId: "testId", + }) + pr := pb.PutRequest{ + Path: path, + Pointer: &pb.Pointer{ + Type: pb.Pointer_REMOTE, + Remote: &pb.RemoteSegment{ + Redundancy: &pb.RedundancyScheme{ + Type: pb.RedundancyScheme_RS, + MinReq: int64(1), + Total: int64(3), + RepairThreshold: int64(2), + SuccessThreshold: int64(3), + }, + PieceId: "testId", + RemotePieces: rps, + }, + Size: int64(1), + }, + APIKey: []byte(APIKey), + } + return pr +} + +func MakePointers(howMany int) []pb.PutRequest { + var pointers []pb.PutRequest + for i := 1; i <= howMany; i++ { + newPointer := MakePointer([]byte("file/path/"+fmt.Sprintf("%d", i)), true) + pointers = append(pointers, newPointer) + } + return pointers +} + +func (nt *NetStateClientTest) Put(pr pb.PutRequest) *pb.PutResponse { + pre := nt.mdb.PutCalled + putRes, err := nt.c.Put(ctx, &pr) + if err != nil { + nt.HandleErr(err, "Failed to put") + } + if pre+1 != nt.mdb.PutCalled { + nt.HandleErr(nil, "Failed to call Put correct number of times") + } + return putRes +} + +func (nt *NetStateClientTest) Get(gr pb.GetRequest) *pb.GetResponse { + pre := nt.mdb.GetCalled + getRes, err := nt.c.Get(ctx, &gr) + if err != nil { + nt.HandleErr(err, "Failed to get") + } + if pre+1 != nt.mdb.GetCalled { + nt.HandleErr(nil, "Failed to call Get correct number of times") + } + return getRes +} + +func (nt *NetStateClientTest) List(lr pb.ListRequest) (listRes *pb.ListResponse) { + pre := nt.mdb.ListCalled + listRes, err := nt.c.List(ctx, &lr) + if err != nil { + nt.HandleErr(err, "Failed to list") + } + if pre+1 != nt.mdb.ListCalled { + nt.HandleErr(nil, "Failed to call List correct number of times") + } + return listRes +} + +func (nt *NetStateClientTest) Delete(dr pb.DeleteRequest) (delRes *pb.DeleteResponse) { + pre := nt.mdb.DeleteCalled + delRes, err := nt.c.Delete(ctx, &dr) + if err != nil { + nt.HandleErr(err, "Failed to delete") + } + if pre+1 != nt.mdb.DeleteCalled { + nt.HandleErr(nil, "Failed to call Delete correct number of times") + } + + return delRes +} + +func (nt *NetStateClientTest) HandleErr(err error, msg string) { + nt.Error(msg) + if err != nil { + panic(err) + } + panic(msg) +} + +func TestMockList(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + err := nt.mdb.Put([]byte("k1"), []byte("v1")) + if err != nil { + panic(err) + } + err = nt.mdb.Put([]byte("k2"), []byte("v2")) + if err != nil { + panic(err) + } + err = nt.mdb.Put([]byte("k3"), []byte("v3")) + if err != nil { + panic(err) + } + err = nt.mdb.Put([]byte("k4"), []byte("v4")) + if err != nil { + panic(err) + } + + keys, err := nt.mdb.List([]byte("k2"), 2) + if err != nil { + nt.HandleErr(err, "Failed to list") + } + if fmt.Sprintf("%s", keys) != "[k2 k3]" { + nt.HandleErr(nil, "Failed to receive accepted list. Received "+fmt.Sprintf("%s", keys)) + } + + keys, err = nt.mdb.List(nil, 3) + if err != nil { + nt.HandleErr(err, "Failed to list") + } + if fmt.Sprintf("%s", keys) != "[k1 k2 k3]" { + nt.HandleErr(nil, "Failed to receive accepted list. Received "+fmt.Sprintf("%s", keys)) + } +} + +func TestNetStatePutGet(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + preGet := nt.mdb.GetCalled + prePut := nt.mdb.PutCalled + + gr := nt.Get(pb.GetRequest{ + Path: []byte("file/path/1"), + APIKey: []byte("abc123"), + }) + if gr.Pointer != nil { + nt.HandleErr(nil, "Expected no pointer") + } + + pr := MakePointer([]byte("file/path/1"), true) + nt.Put(pr) + + gr = nt.Get(pb.GetRequest{ + Path: []byte("file/path/1"), + APIKey: []byte("abc123"), + }) + if gr == nil { + nt.HandleErr(nil, "Failed to get the put pointer") + } + + pointerBytes, err := proto.Marshal(pr.Pointer) + if err != nil { + nt.HandleErr(err, "Failed to marshal test pointer") + } + if !bytes.Equal(gr.Pointer, pointerBytes) { + nt.HandleErr(nil, "Expected to get same content that was put") + } + if nt.mdb.GetCalled != preGet+2 { + nt.HandleErr(nil, "Failed to call get correct number of times") + } + if nt.mdb.PutCalled != prePut+1 { + nt.HandleErr(nil, "Failed to call put correct number of times") + } +} + +func TestGetAuth(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + getReq := pb.GetRequest{ + Path: []byte("file/path/1"), + APIKey: []byte("wrong key"), + } + _, err := nt.c.Get(ctx, &getReq) + if err == nil { + nt.HandleErr(nil, "Failed to error for wrong auth key") + } +} + +func TestPutAuth(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + pr := MakePointer([]byte("file/path"), false) + _, err := nt.c.Put(ctx, &pr) + if err == nil { + nt.HandleErr(nil, "Failed to error for wrong auth key") + } +} + +func TestDelete(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + pre := nt.mdb.DeleteCalled + + reqs := MakePointers(1) + _, err := nt.c.Put(ctx, &reqs[0]) + if err != nil { + nt.HandleErr(err, "Failed to put") + } + + delReq := pb.DeleteRequest{ + Path: []byte("file/path/1"), + APIKey: []byte("abc123"), + } + _, err = nt.c.Delete(ctx, &delReq) + if err != nil { + nt.HandleErr(err, "Failed to delete") + } + if pre+1 != nt.mdb.DeleteCalled { + nt.HandleErr(nil, "Failed to call Delete correct number of times") + } +} + +func TestDeleteAuth(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + reqs := MakePointers(1) + _, err := nt.c.Put(ctx, &reqs[0]) + if err != nil { + nt.HandleErr(err, "Failed to put") + } + + delReq := pb.DeleteRequest{ + Path: []byte("file/path/1"), + APIKey: []byte("wrong key"), + } + _, err = nt.c.Delete(ctx, &delReq) + if err == nil { + nt.HandleErr(nil, "Failed to error with wrong auth key") + } +} + +func TestList(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + reqs := MakePointers(4) + for _, req := range reqs { + nt.Put(req) + } + + listReq := pb.ListRequest{ + StartingPathKey: []byte("file/path/2"), + Limit: 5, + APIKey: []byte("abc123"), + } + listRes := nt.List(listReq) + if listRes.Truncated { + nt.HandleErr(nil, "Expected list slice to not be truncated") + } + if !bytes.Equal(listRes.Paths[0], []byte("file/path/2")) { + nt.HandleErr(nil, "Failed to list correct file paths") + } +} + +func TestListTruncated(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + reqs := MakePointers(3) + for _, req := range reqs { + _, err := nt.c.Put(ctx, &req) + if err != nil { + nt.HandleErr(err, "Failed to put") + } + } + + listReq := pb.ListRequest{ + StartingPathKey: []byte("file/path/1"), + Limit: 1, + APIKey: []byte("abc123"), + } + listRes, err := nt.c.List(ctx, &listReq) + if err != nil { + nt.HandleErr(err, "Failed to list file paths") + } + if !listRes.Truncated { + nt.HandleErr(nil, "Expected list slice to be truncated") + } +} + +func TestListWithoutStartingKey(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + reqs := MakePointers(3) + for _, req := range reqs { + _, err := nt.c.Put(ctx, &req) + if err != nil { + nt.HandleErr(err, "Failed to put") + } + } + + listReq := pb.ListRequest{ + Limit: 3, + APIKey: []byte("abc123"), + } + listRes, err := nt.c.List(ctx, &listReq) + if err != nil { + nt.HandleErr(err, "Failed to list without starting key") + } + + if !bytes.Equal(listRes.Paths[2], []byte("file/path/3")) { + nt.HandleErr(nil, "Failed to list correct paths") + } +} + +func TestListWithoutLimit(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + listReq := pb.ListRequest{ + StartingPathKey: []byte("file/path/3"), + APIKey: []byte("abc123"), + } + _, err := nt.c.List(ctx, &listReq) + if err == nil { + t.Error("Failed to error when not given limit") + } +} + +func TestListAuth(t *testing.T) { + nt := NewNetStateClientTest(t) + defer nt.Close() + + listReq := pb.ListRequest{ + StartingPathKey: []byte("file/path/3"), + Limit: 1, + APIKey: []byte("wrong key"), + } + _, err := nt.c.List(ctx, &listReq) + if err == nil { + t.Error("Failed to error when given wrong auth key") + } +} diff --git a/pkg/netstate/server_test.go b/pkg/netstate/server_test.go deleted file mode 100644 index f1f667f47..000000000 --- a/pkg/netstate/server_test.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (C) 2018 Storj Labs, Inc. -// See LICENSE for copying information. - -package netstate - -import ( - "os" - "testing" - - "github.com/spf13/viper" -) - -const ( - API_KEY = "abc123" -) - -func TestMain(m *testing.M) { - viper.SetEnvPrefix("API") - os.Setenv("API_KEY", API_KEY) - viper.AutomaticEnv() - os.Exit(m.Run()) -} diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index 0ca8ebd01..f33cc35b3 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -59,6 +59,10 @@ func (o *Cache) Get(ctx context.Context, key string) (*overlay.NodeAddress, erro if err != nil { return nil, err } + if b.IsZero() { + // TODO: log? return an error? + return nil, nil + } na := &overlay.NodeAddress{} if err := proto.Unmarshal(b, na); err != nil { diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go index 5740b533a..ac9e1e5b0 100644 --- a/pkg/overlay/cache_test.go +++ b/pkg/overlay/cache_test.go @@ -83,7 +83,7 @@ var ( } }(), expectedErrors: errors{ - mock: &test.ErrForced, + mock: nil, bolt: nil, _redis: nil, }, @@ -107,9 +107,9 @@ var ( }, // TODO(bryanchriswhite): compare actual errors expectedErrors: errors{ - mock: &test.ErrMissingKey, - bolt: &boltdb.Error, - _redis: &redis.Error, + mock: nil, + bolt: nil, + _redis: nil, }, data: test.KvStore{"foo": func() storage.Value { na := &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"} diff --git a/pkg/overlay/server.go b/pkg/overlay/server.go index a03526bfa..d2bee316e 100644 --- a/pkg/overlay/server.go +++ b/pkg/overlay/server.go @@ -47,7 +47,7 @@ func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.L // FindStorageNodes searches the overlay network for nodes that meet the provided requirements func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (*proto.FindStorageNodesResponse, error) { // NB: call FilterNodeReputation from node_reputation package to find nodes for storage - keys, err := o.cache.DB.List() + keys, err := o.cache.DB.List(nil, storage.Limit(10)) if err != nil { o.logger.Error("Error listing nodes", zap.Error(err)) return nil, err diff --git a/protos/netstate/netstate.pb.go b/protos/netstate/netstate.pb.go index c81367880..97dec1f01 100644 --- a/protos/netstate/netstate.pb.go +++ b/protos/netstate/netstate.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-go. +// Code generated by protoc-gen-go. DO NOT EDIT. // source: netstate.proto -// DO NOT EDIT! /* Package netstate is a generated protocol buffer package. @@ -323,7 +322,7 @@ func (m *Pointer) GetMetadata() []byte { type PutRequest struct { Path []byte `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` Pointer *Pointer `protobuf:"bytes,2,opt,name=pointer" json:"pointer,omitempty"` - APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"` + APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,proto3" json:"APIKey,omitempty"` } func (m *PutRequest) Reset() { *m = PutRequest{} } @@ -355,7 +354,7 @@ func (m *PutRequest) GetAPIKey() []byte { // GetRequest is a request message for the Get rpc call type GetRequest struct { Path []byte `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"` + APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,proto3" json:"APIKey,omitempty"` } func (m *GetRequest) Reset() { *m = GetRequest{} } @@ -381,7 +380,7 @@ func (m *GetRequest) GetAPIKey() []byte { type ListRequest struct { StartingPathKey []byte `protobuf:"bytes,1,opt,name=starting_path_key,json=startingPathKey,proto3" json:"starting_path_key,omitempty"` Limit int64 `protobuf:"varint,2,opt,name=limit" json:"limit,omitempty"` - APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"` + APIKey []byte `protobuf:"bytes,3,opt,name=APIKey,proto3" json:"APIKey,omitempty"` } func (m *ListRequest) Reset() { *m = ListRequest{} } @@ -438,7 +437,8 @@ func (m *GetResponse) GetPointer() []byte { // ListResponse is a response message for the List rpc call type ListResponse struct { - Paths [][]byte `protobuf:"bytes,1,rep,name=paths,proto3" json:"paths,omitempty"` + Paths [][]byte `protobuf:"bytes,1,rep,name=paths,proto3" json:"paths,omitempty"` + Truncated bool `protobuf:"varint,2,opt,name=truncated" json:"truncated,omitempty"` } func (m *ListResponse) Reset() { *m = ListResponse{} } @@ -453,9 +453,16 @@ func (m *ListResponse) GetPaths() [][]byte { return nil } +func (m *ListResponse) GetTruncated() bool { + if m != nil { + return m.Truncated + } + return false +} + type DeleteRequest struct { Path []byte `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,json=aPIKey,proto3" json:"APIKey,omitempty"` + APIKey []byte `protobuf:"bytes,2,opt,name=APIKey,proto3" json:"APIKey,omitempty"` } func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } @@ -687,58 +694,58 @@ var _NetState_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("netstate.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 843 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0xdb, 0x46, - 0x10, 0x36, 0x2d, 0x9b, 0x92, 0x87, 0x92, 0x4c, 0x2f, 0x14, 0x87, 0x55, 0x0f, 0x31, 0x88, 0x06, - 0x75, 0x6a, 0x40, 0x06, 0x54, 0x14, 0x48, 0x13, 0x14, 0x45, 0x2a, 0x0b, 0x86, 0x90, 0x44, 0x11, - 0x56, 0x3a, 0xf4, 0x46, 0x6c, 0xc4, 0xa9, 0x44, 0x44, 0xdc, 0xa5, 0xb9, 0x2b, 0xa0, 0xea, 0xeb, - 0xf5, 0x5d, 0x7a, 0x68, 0x6f, 0x7d, 0x82, 0x82, 0xbb, 0xfc, 0x93, 0x02, 0xb7, 0x40, 0x4f, 0xe2, - 0xcc, 0x7e, 0xdf, 0xfc, 0x7c, 0x33, 0x23, 0xe8, 0x72, 0x54, 0x52, 0x31, 0x85, 0x83, 0x24, 0x15, - 0x4a, 0x90, 0x56, 0x61, 0xf7, 0x9f, 0xad, 0x84, 0x58, 0x6d, 0xf0, 0x56, 0xfb, 0x3f, 0x6e, 0x7f, - 0xb9, 0x55, 0x51, 0x8c, 0x52, 0xb1, 0x38, 0x31, 0x50, 0xff, 0x4f, 0x0b, 0x5c, 0x8a, 0xe1, 0x96, - 0x87, 0x8c, 0x2f, 0x77, 0xf3, 0xe5, 0x1a, 0x63, 0x24, 0xdf, 0xc3, 0x89, 0xda, 0x25, 0xe8, 0x59, - 0x57, 0xd6, 0x75, 0x77, 0xf8, 0x7c, 0x50, 0x86, 0x3f, 0x44, 0x0e, 0xcc, 0xcf, 0x62, 0x97, 0x20, - 0xd5, 0x14, 0xf2, 0x14, 0x9a, 0x71, 0xc4, 0x83, 0x14, 0x1f, 0xbc, 0xe3, 0x2b, 0xeb, 0xba, 0x41, - 0xed, 0x38, 0xe2, 0x14, 0x1f, 0x48, 0x0f, 0x4e, 0x95, 0x50, 0x6c, 0xe3, 0x35, 0xb4, 0xdb, 0x18, - 0xe4, 0x05, 0xb8, 0x29, 0x26, 0x2c, 0x4a, 0x03, 0xb5, 0x4e, 0x51, 0xae, 0xc5, 0x26, 0xf4, 0x4e, - 0x34, 0xe0, 0xdc, 0xf8, 0x17, 0x85, 0x9b, 0xdc, 0xc0, 0x85, 0xdc, 0x2e, 0x97, 0x28, 0x65, 0x0d, - 0x7b, 0xaa, 0xb1, 0x6e, 0xfe, 0x50, 0x82, 0xfd, 0x1e, 0x40, 0x55, 0x1a, 0xb1, 0xe1, 0x98, 0xce, - 0xdd, 0x23, 0xff, 0x6f, 0x0b, 0xdc, 0x31, 0x5f, 0xa6, 0xbb, 0x44, 0x45, 0x82, 0xe7, 0xcd, 0xfe, - 0xb0, 0xd7, 0xec, 0x8b, 0xaa, 0xd9, 0x43, 0x64, 0xcd, 0x51, 0x6b, 0xf8, 0x25, 0x78, 0x68, 0xfc, - 0x18, 0x06, 0x58, 0x22, 0x82, 0x4f, 0xb8, 0xd3, 0x0a, 0xb4, 0xe9, 0x65, 0xf9, 0x5e, 0x05, 0x78, - 0x8b, 0xbb, 0x7d, 0xa6, 0x54, 0x2c, 0x55, 0x11, 0x5f, 0x05, 0x5c, 0xf0, 0x25, 0x6a, 0x91, 0xea, - 0xcc, 0x79, 0xfe, 0x3c, 0xcd, 0x5e, 0xfd, 0x1b, 0xe8, 0xee, 0xd7, 0x42, 0x00, 0xec, 0x37, 0xe3, - 0xf9, 0xfd, 0xe8, 0xbd, 0x7b, 0x44, 0x3a, 0x70, 0x36, 0x1f, 0x8f, 0xe8, 0x78, 0xf1, 0xd3, 0x87, - 0x9f, 0x5d, 0xcb, 0x1f, 0x81, 0x43, 0x31, 0x16, 0x0a, 0x67, 0x11, 0x2e, 0x91, 0x7c, 0x09, 0x67, - 0x49, 0xf6, 0x11, 0xf0, 0x6d, 0xac, 0x7b, 0x6e, 0xd0, 0x96, 0x76, 0x4c, 0xb7, 0x71, 0x36, 0x3d, - 0x2e, 0x42, 0x0c, 0xa2, 0x50, 0xd7, 0x7e, 0x46, 0xed, 0xcc, 0x9c, 0x84, 0xfe, 0xef, 0x16, 0x74, - 0x4c, 0x94, 0x39, 0xae, 0x62, 0xe4, 0x8a, 0xbc, 0x02, 0x48, 0xcb, 0x6d, 0xd0, 0x81, 0x9c, 0x61, - 0xff, 0xf1, 0x4d, 0xa1, 0x35, 0x34, 0xf9, 0x02, 0x4c, 0xca, 0x2a, 0x4f, 0x53, 0xdb, 0x93, 0x90, - 0xbc, 0x82, 0x4e, 0xaa, 0xf3, 0x04, 0xda, 0x23, 0xbd, 0xc6, 0x55, 0xe3, 0xda, 0x19, 0x3e, 0xa9, - 0x47, 0x2e, 0x9b, 0xa1, 0xed, 0xb4, 0x32, 0x24, 0x79, 0x06, 0x4e, 0x8c, 0xe9, 0xa7, 0x0d, 0x06, - 0xa9, 0x10, 0x4a, 0xef, 0x51, 0x9b, 0x82, 0x71, 0x51, 0x21, 0x94, 0xff, 0xd7, 0x31, 0x34, 0x67, - 0x22, 0xe2, 0x0a, 0x53, 0x32, 0xd8, 0x1b, 0x7b, 0xad, 0xf2, 0x1c, 0x30, 0xb8, 0x63, 0x8a, 0xd5, - 0xe6, 0xfc, 0x1c, 0xba, 0x11, 0xdf, 0x44, 0x1c, 0x03, 0x69, 0x14, 0xc8, 0x67, 0xd4, 0x31, 0xde, - 0x42, 0x96, 0x5b, 0xb0, 0x4d, 0x4d, 0x3a, 0xbd, 0x33, 0x7c, 0x7a, 0x58, 0x78, 0x0e, 0xa4, 0x39, - 0x8c, 0x10, 0x38, 0x91, 0xd1, 0x6f, 0x98, 0x6f, 0xb2, 0xfe, 0x26, 0x3f, 0x42, 0x67, 0x99, 0x22, - 0xd3, 0x7b, 0x14, 0x32, 0x85, 0x9e, 0x9d, 0xcb, 0x6b, 0xae, 0x79, 0x50, 0x5c, 0xf3, 0x60, 0x51, - 0x5c, 0x33, 0x6d, 0x17, 0x84, 0x3b, 0xa6, 0x90, 0x8c, 0xe0, 0x1c, 0x7f, 0x4d, 0xa2, 0xb4, 0x16, - 0xa2, 0xf9, 0x9f, 0x21, 0xba, 0x15, 0x45, 0x07, 0xe9, 0x43, 0x2b, 0x46, 0xc5, 0x42, 0xa6, 0x98, - 0xd7, 0xd2, 0xbd, 0x96, 0xb6, 0xef, 0x43, 0xab, 0xd0, 0x27, 0xdb, 0xbd, 0xc9, 0xf4, 0xdd, 0x64, - 0x3a, 0x76, 0x8f, 0xb2, 0x6f, 0x3a, 0x7e, 0xff, 0x61, 0x31, 0x76, 0x2d, 0x1f, 0x01, 0x66, 0x5b, - 0x45, 0xf1, 0x61, 0x8b, 0x52, 0x65, 0x7d, 0x26, 0x4c, 0xad, 0xb5, 0xde, 0x6d, 0xaa, 0xbf, 0xc9, - 0x0d, 0x34, 0x13, 0xa3, 0xb6, 0x5e, 0x03, 0x67, 0x78, 0xf1, 0xd9, 0x18, 0x68, 0x81, 0x20, 0x97, - 0x60, 0xbf, 0x99, 0x4d, 0xde, 0xe2, 0x2e, 0x17, 0xde, 0x66, 0xda, 0xf2, 0x5f, 0x02, 0xdc, 0xe3, - 0xbf, 0xa6, 0xa9, 0x98, 0xc7, 0x7b, 0xcc, 0x15, 0x38, 0xef, 0x22, 0x59, 0x52, 0xbf, 0x81, 0x8b, - 0xf2, 0x0a, 0x33, 0x9e, 0x3e, 0x61, 0x13, 0xe7, 0xbc, 0x78, 0x98, 0x31, 0xb5, 0xce, 0x6e, 0xb7, - 0x07, 0xa7, 0x9b, 0x28, 0x8e, 0x54, 0xfe, 0x27, 0x67, 0x8c, 0x47, 0x4b, 0xec, 0x80, 0xa3, 0x95, - 0x90, 0x89, 0xe0, 0x12, 0xfd, 0xaf, 0xc1, 0xd1, 0x15, 0x1b, 0x93, 0x78, 0x95, 0x0a, 0x26, 0x5b, - 0x61, 0xfa, 0x5f, 0x41, 0xdb, 0x14, 0x98, 0x23, 0x7b, 0x70, 0x9a, 0x15, 0x26, 0x3d, 0xeb, 0xaa, - 0x71, 0xdd, 0xa6, 0xc6, 0xf0, 0x5f, 0x43, 0xe7, 0x0e, 0x37, 0xa8, 0xf0, 0xff, 0x68, 0xe0, 0x42, - 0xb7, 0x20, 0x9b, 0x24, 0xc3, 0x3f, 0x2c, 0x68, 0x4d, 0x51, 0xcd, 0xb3, 0x29, 0x90, 0x21, 0x34, - 0x66, 0x5b, 0x45, 0x7a, 0xb5, 0xb9, 0x94, 0x23, 0xed, 0x3f, 0x39, 0xf0, 0xe6, 0x55, 0x0e, 0xa1, - 0x71, 0x8f, 0x7b, 0x9c, 0x6a, 0x3e, 0x75, 0x4e, 0x5d, 0x83, 0xef, 0xe0, 0x24, 0xeb, 0x94, 0xd4, - 0x9e, 0x6b, 0xa3, 0xe9, 0x5f, 0x1e, 0xba, 0x73, 0xda, 0x6b, 0xb0, 0x4d, 0xf5, 0xa4, 0x76, 0x67, - 0x7b, 0x62, 0xf4, 0xbd, 0xcf, 0x1f, 0x0c, 0xf9, 0xa3, 0xad, 0x6f, 0xe0, 0xdb, 0x7f, 0x02, 0x00, - 0x00, 0xff, 0xff, 0x7b, 0x95, 0x0c, 0x06, 0x3e, 0x07, 0x00, 0x00, + // 848 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x5f, 0x8b, 0xdb, 0x46, + 0x10, 0x3f, 0x9d, 0xef, 0x6c, 0xdf, 0xc8, 0xf6, 0xe9, 0x16, 0xe7, 0xa2, 0xba, 0x85, 0x1e, 0x82, + 0xd0, 0x4b, 0x0f, 0x5c, 0x70, 0x29, 0xa4, 0x09, 0xa5, 0x24, 0x77, 0xe6, 0x30, 0x49, 0x1c, 0xb3, + 0xf6, 0x43, 0xdf, 0xc4, 0x46, 0x9a, 0xda, 0x22, 0xd6, 0x4a, 0x27, 0x8d, 0xa0, 0xee, 0xd7, 0xeb, + 0x77, 0xe9, 0x43, 0xfb, 0xd6, 0x4f, 0x50, 0xb4, 0xab, 0x7f, 0x76, 0x48, 0x0b, 0x79, 0xb2, 0x66, + 0xe6, 0xf7, 0x9b, 0x9d, 0xf9, 0xcd, 0x8c, 0x61, 0x20, 0x91, 0x52, 0x12, 0x84, 0xe3, 0x38, 0x89, + 0x28, 0x62, 0xdd, 0xd2, 0x1e, 0x9d, 0x53, 0x10, 0x62, 0x4a, 0x22, 0x8c, 0x75, 0xc8, 0xf9, 0xcb, + 0x00, 0x8b, 0xa3, 0x9f, 0x49, 0x5f, 0x48, 0x6f, 0xb7, 0xf4, 0x36, 0x18, 0x22, 0xfb, 0x11, 0x4e, + 0x68, 0x17, 0xa3, 0x6d, 0x5c, 0x19, 0xd7, 0x83, 0xc9, 0x93, 0x71, 0x95, 0xee, 0x10, 0x39, 0xd6, + 0x3f, 0xab, 0x5d, 0x8c, 0x5c, 0x51, 0xd8, 0x63, 0xe8, 0x84, 0x81, 0x74, 0x13, 0x7c, 0xb0, 0x8f, + 0xaf, 0x8c, 0xeb, 0x16, 0x6f, 0x87, 0x81, 0xe4, 0xf8, 0xc0, 0x86, 0x70, 0x4a, 0x11, 0x89, 0xad, + 0xdd, 0x52, 0x6e, 0x6d, 0xb0, 0xa7, 0x60, 0x25, 0x18, 0x8b, 0x20, 0x71, 0x69, 0x93, 0x60, 0xba, + 0x89, 0xb6, 0xbe, 0x7d, 0xa2, 0x00, 0xe7, 0xda, 0xbf, 0x2a, 0xdd, 0xec, 0x06, 0x2e, 0xd2, 0xcc, + 0xf3, 0x30, 0x4d, 0x1b, 0xd8, 0x53, 0x85, 0xb5, 0x8a, 0x40, 0x05, 0x76, 0x86, 0x00, 0x75, 0x69, + 0xac, 0x0d, 0xc7, 0x7c, 0x69, 0x1d, 0x39, 0xff, 0x18, 0x60, 0x4d, 0xa5, 0x97, 0xec, 0x62, 0x0a, + 0x22, 0x59, 0x34, 0xfb, 0xd3, 0x5e, 0xb3, 0x4f, 0xeb, 0x66, 0x0f, 0x91, 0x0d, 0x47, 0xa3, 0xe1, + 0x67, 0x60, 0xa3, 0xf6, 0xa3, 0xef, 0x62, 0x85, 0x70, 0x3f, 0xe0, 0x4e, 0x29, 0xd0, 0xe3, 0x97, + 0x55, 0xbc, 0x4e, 0xf0, 0x1a, 0x77, 0xfb, 0xcc, 0x94, 0x44, 0x42, 0x81, 0x5c, 0xbb, 0x32, 0x92, + 0x1e, 0x2a, 0x91, 0x9a, 0xcc, 0x65, 0x11, 0x9e, 0xe7, 0x51, 0xe7, 0x06, 0x06, 0xfb, 0xb5, 0x30, + 0x80, 0xf6, 0xcb, 0xe9, 0xf2, 0xfe, 0xf6, 0xad, 0x75, 0xc4, 0xfa, 0x70, 0xb6, 0x9c, 0xde, 0xf2, + 0xe9, 0xea, 0xd5, 0xbb, 0x5f, 0x2c, 0xc3, 0xb9, 0x05, 0x93, 0x63, 0x18, 0x11, 0x2e, 0x02, 0xf4, + 0x90, 0x7d, 0x09, 0x67, 0x71, 0xfe, 0xe1, 0xca, 0x2c, 0x54, 0x3d, 0xb7, 0x78, 0x57, 0x39, 0xe6, + 0x59, 0x98, 0x4f, 0x4f, 0x46, 0x3e, 0xba, 0x81, 0xaf, 0x6a, 0x3f, 0xe3, 0xed, 0xdc, 0x9c, 0xf9, + 0xce, 0x1f, 0x06, 0xf4, 0x75, 0x96, 0x25, 0xae, 0x43, 0x94, 0xc4, 0x9e, 0x03, 0x24, 0xd5, 0x36, + 0xa8, 0x44, 0xe6, 0x64, 0xf4, 0xe9, 0x4d, 0xe1, 0x0d, 0x34, 0xfb, 0x02, 0xf4, 0x93, 0xf5, 0x3b, + 0x1d, 0x65, 0xcf, 0x7c, 0xf6, 0x1c, 0xfa, 0x89, 0x7a, 0xc7, 0x55, 0x9e, 0xd4, 0x6e, 0x5d, 0xb5, + 0xae, 0xcd, 0xc9, 0xa3, 0x66, 0xe6, 0xaa, 0x19, 0xde, 0x4b, 0x6a, 0x23, 0x65, 0x5f, 0x83, 0x19, + 0x62, 0xf2, 0x61, 0x8b, 0x6e, 0x12, 0x45, 0xa4, 0xf6, 0xa8, 0xc7, 0x41, 0xbb, 0x78, 0x14, 0x91, + 0xf3, 0xf7, 0x31, 0x74, 0x16, 0x51, 0x20, 0x09, 0x13, 0x36, 0xde, 0x1b, 0x7b, 0xa3, 0xf2, 0x02, + 0x30, 0xbe, 0x13, 0x24, 0x1a, 0x73, 0x7e, 0x02, 0x83, 0x40, 0x6e, 0x03, 0x89, 0x6e, 0xaa, 0x15, + 0x28, 0x66, 0xd4, 0xd7, 0xde, 0x52, 0x96, 0xef, 0xa0, 0xad, 0x6b, 0x52, 0xcf, 0x9b, 0x93, 0xc7, + 0x87, 0x85, 0x17, 0x40, 0x5e, 0xc0, 0x18, 0x83, 0x93, 0x34, 0xf8, 0x1d, 0x8b, 0x4d, 0x56, 0xdf, + 0xec, 0x67, 0xe8, 0x7b, 0x09, 0x0a, 0xb5, 0x47, 0xbe, 0x20, 0xb4, 0xdb, 0x85, 0xbc, 0xeb, 0x28, + 0x5a, 0x6f, 0x8b, 0xab, 0x7e, 0x9f, 0xfd, 0x3a, 0x5e, 0x95, 0xd7, 0xcc, 0x7b, 0x25, 0xe1, 0x4e, + 0x10, 0xb2, 0x5b, 0x38, 0xc7, 0xdf, 0xe2, 0x20, 0x69, 0xa4, 0xe8, 0xfc, 0x6f, 0x8a, 0x41, 0x4d, + 0x51, 0x49, 0x46, 0xd0, 0x0d, 0x91, 0x84, 0x2f, 0x48, 0xd8, 0x5d, 0xd5, 0x6b, 0x65, 0x3b, 0x0e, + 0x74, 0x4b, 0x7d, 0xf2, 0xdd, 0x9b, 0xcd, 0xdf, 0xcc, 0xe6, 0x53, 0xeb, 0x28, 0xff, 0xe6, 0xd3, + 0xb7, 0xef, 0x56, 0x53, 0xcb, 0x70, 0x10, 0x60, 0x91, 0x11, 0xc7, 0x87, 0x0c, 0x53, 0xca, 0xfb, + 0x8c, 0x05, 0x6d, 0x94, 0xde, 0x3d, 0xae, 0xbe, 0xd9, 0x0d, 0x74, 0x62, 0xad, 0xb6, 0x5a, 0x03, + 0x73, 0x72, 0xf1, 0xd1, 0x18, 0x78, 0x89, 0x60, 0x97, 0xd0, 0x7e, 0xb9, 0x98, 0xbd, 0xc6, 0x5d, + 0x21, 0x7c, 0x61, 0x39, 0xcf, 0x00, 0xee, 0xf1, 0x3f, 0x9f, 0xa9, 0x99, 0xc7, 0x7b, 0xcc, 0x35, + 0x98, 0x6f, 0x82, 0xb4, 0xa2, 0x7e, 0x0b, 0x17, 0xd5, 0x15, 0xe6, 0x3c, 0x75, 0xc2, 0x3a, 0xcf, + 0x79, 0x19, 0x58, 0x08, 0xda, 0xe4, 0xb7, 0x3b, 0x84, 0xd3, 0x6d, 0x10, 0x06, 0x54, 0xfc, 0xc9, + 0x69, 0xe3, 0x93, 0x25, 0xf6, 0xc1, 0x54, 0x4a, 0xa4, 0x71, 0x24, 0x53, 0x74, 0xbe, 0x01, 0x53, + 0x55, 0xac, 0x4d, 0x66, 0xd7, 0x2a, 0xe8, 0xd7, 0x4a, 0xd3, 0x79, 0x05, 0x3d, 0x5d, 0x60, 0x81, + 0x1c, 0xc2, 0x69, 0x5e, 0x58, 0x6a, 0x1b, 0x57, 0xad, 0xeb, 0x1e, 0xd7, 0x06, 0xfb, 0x0a, 0xce, + 0x28, 0xc9, 0xa4, 0x27, 0x08, 0xf5, 0x39, 0x75, 0x79, 0xed, 0x70, 0x5e, 0x40, 0xff, 0x0e, 0xb7, + 0x48, 0xf8, 0x39, 0x0a, 0x59, 0x30, 0x28, 0xc9, 0xba, 0x84, 0xc9, 0x9f, 0x06, 0x74, 0xe7, 0x48, + 0xcb, 0x7c, 0x46, 0x6c, 0x02, 0xad, 0x45, 0x46, 0x6c, 0xd8, 0x98, 0x5a, 0x35, 0xf0, 0xd1, 0xa3, + 0x03, 0x6f, 0xd1, 0xc3, 0x04, 0x5a, 0xf7, 0xb8, 0xc7, 0xa9, 0xa7, 0xd7, 0xe4, 0x34, 0x15, 0xfa, + 0x01, 0x4e, 0x72, 0x1d, 0x58, 0x23, 0xdc, 0x18, 0xdc, 0xe8, 0xf2, 0xd0, 0x5d, 0xd0, 0x5e, 0x40, + 0x5b, 0x57, 0xcf, 0x1a, 0x57, 0xb8, 0x27, 0xc6, 0xc8, 0xfe, 0x38, 0xa0, 0xc9, 0xef, 0xdb, 0xea, + 0x42, 0xbe, 0xff, 0x37, 0x00, 0x00, 0xff, 0xff, 0x3a, 0xbb, 0x3a, 0x6c, 0x4c, 0x07, 0x00, 0x00, } diff --git a/protos/netstate/netstate.proto b/protos/netstate/netstate.proto index a66b2aab9..3e960fbad 100644 --- a/protos/netstate/netstate.proto +++ b/protos/netstate/netstate.proto @@ -105,6 +105,7 @@ message GetResponse { // ListResponse is a response message for the List rpc call message ListResponse { repeated bytes paths = 1; + bool truncated = 2; } message DeleteRequest { diff --git a/storage/boltdb/client.go b/storage/boltdb/client.go index 0789edd00..0189aee60 100644 --- a/storage/boltdb/client.go +++ b/storage/boltdb/client.go @@ -67,30 +67,38 @@ func (c *boltClient) Get(pathKey storage.Key) (storage.Value, error) { err := c.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(c.Bucket) v := b.Get(pathKey) - if v == nil { - return Error.New("pointer at %#v not found", string(pathKey)) - } pointerBytes = v return nil }) - return pointerBytes, err + if err != nil { + // TODO: log + return nil, err + } + + return pointerBytes, nil } // List returns either a list of keys for which boltdb has values or an error. -func (c *boltClient) List() (storage.Keys, error) { +func (c *boltClient) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) { c.logger.Debug("entering bolt list") var paths storage.Keys err := c.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(c.Bucket) - - err := b.ForEach(func(key, value []byte) error { - paths = append(paths, key) - return nil - }) - return err + cur := tx.Bucket(c.Bucket).Cursor() + var k []byte + if startingKey == nil { + k, _ = cur.First() + } else { + k, _ = cur.Seek(startingKey) + } + for ; k != nil; k, _ = cur.Next() { + paths = append(paths, k) + if limit > 0 && int(limit) == int(len(paths)) { + break + } + } + return nil }) - return paths, err } diff --git a/storage/boltdb/client_test.go b/storage/boltdb/client_test.go index b01695aa6..39727b24e 100644 --- a/storage/boltdb/client_test.go +++ b/storage/boltdb/client_test.go @@ -10,9 +10,46 @@ import ( "testing" "go.uber.org/zap" - "storj.io/storj/pkg/netstate" + "storj.io/storj/storage" ) +type BoltClientTest struct { + *testing.T + c storage.KeyValueStore +} + +func NewBoltClientTest(t *testing.T) *BoltClientTest { + logger, _ := zap.NewDevelopment() + dbName := tempfile() + + c, err := NewClient(logger, dbName, "test_bucket") + if err != nil { + t.Error("Failed to create test db") + panic(err) + } + + return &BoltClientTest{ + T: t, + c: c, + } +} + +func (bt *BoltClientTest) Close() { + bt.c.Close() + switch client := bt.c.(type) { + case *boltClient: + os.Remove(client.Path) + } +} + +func (bt *BoltClientTest) HandleErr(err error, msg string) { + bt.Error(msg) + if err != nil { + panic(err) + } + panic(msg) +} + func tempfile() string { f, err := ioutil.TempFile("", "TempBolt-") if err != nil { @@ -26,60 +63,94 @@ func tempfile() string { return f.Name() } -func TestNetState(t *testing.T) { - logger, _ := zap.NewDevelopment() - c, err := NewClient(logger, tempfile(), "test_bucket") - if err != nil { - t.Error("Failed to create test db") - } - defer func() { - c.Close() - switch client := c.(type) { - case *boltClient: - os.Remove(client.Path) - } - }() +func TestPut(t *testing.T) { + bt := NewBoltClientTest(t) + defer bt.Close() - testEntry1 := netstate.PointerEntry{ - Path: []byte(`test/path`), - Pointer: []byte(`pointer1`), - } - - testEntry2 := netstate.PointerEntry{ - Path: []byte(`test/path2`), - Pointer: []byte(`pointer2`), - } - - // tests Put function - if err := c.Put(testEntry1.Path, testEntry1.Pointer); err != nil { - t.Error("Failed to save testFile to pointers bucket") - } - - // tests Get function - retrvValue, err := c.Get([]byte("test/path")) - if err != nil { - t.Error("Failed to get saved test pointer") - } - if !bytes.Equal(retrvValue, testEntry1.Pointer) { - t.Error("Retrieved pointer was not same as put pointer") - } - - // tests Delete function - if err := c.Delete([]byte("test/path")); err != nil { - t.Error("Failed to delete test entry") - } - - // tests List function - if err := c.Put(testEntry2.Path, testEntry2.Pointer); err != nil { - t.Error("Failed to put testEntry2 to pointers bucket") - } - testPaths, err := c.List() - if err != nil { - t.Error("Failed to list Path keys in pointers bucket") - } - - // tests List + Delete function - if !bytes.Equal(testPaths[0], []byte("test/path2")) { - t.Error("Expected only testEntry2 path in list") + if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil { + bt.HandleErr(err, "Failed to save pointer1 to pointers bucket") + } +} + +func TestGet(t *testing.T) { + bt := NewBoltClientTest(t) + defer bt.Close() + + if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil { + bt.HandleErr(err, "Failed to save pointer1 to pointers bucket") + } + + retrvValue, err := bt.c.Get([]byte("test/path/1")) + if err != nil { + bt.HandleErr(err, "Failed to get") + } + if retrvValue.IsZero() { + bt.HandleErr(nil, "Failed to get saved test pointer") + } + if !bytes.Equal(retrvValue, []byte("pointer1")) { + bt.HandleErr(nil, "Retrieved pointer was not same as put pointer") + } + + // tests Get non-existent path + getRes, err := bt.c.Get([]byte("fake/path")) + if err != nil { + bt.HandleErr(err, "Failed to get") + } + if !getRes.IsZero() { + bt.HandleErr(nil, "Expected zero-value response for getting fake path") + } +} + +func TestDelete(t *testing.T) { + bt := NewBoltClientTest(t) + defer bt.Close() + + if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil { + bt.HandleErr(err, "Failed to save pointer1 to pointers bucket") + } + + if err := bt.c.Delete([]byte("test/path/1")); err != nil { + bt.HandleErr(err, "Failed to delete test entry") + } +} + +func TestList(t *testing.T) { + bt := NewBoltClientTest(t) + defer bt.Close() + + if err := bt.c.Put([]byte("test/path/2"), []byte("pointer2")); err != nil { + bt.HandleErr(err, "Failed to put pointer2 to pointers bucket") + } + testPaths, err := bt.c.List([]byte("test/path/2"), storage.Limit(1)) + if err != nil { + bt.HandleErr(err, "Failed to list Path keys in pointers bucket") + } + + if !bytes.Equal(testPaths[0], []byte("test/path/2")) { + bt.HandleErr(nil, "Expected only test/path/2 in list") + } +} + +func TestListNoStartingKey(t *testing.T) { + bt := NewBoltClientTest(t) + defer bt.Close() + + if err := bt.c.Put([]byte("test/path/1"), []byte("pointer1")); err != nil { + bt.HandleErr(err, "Failed to save pointer1 to pointers bucket") + } + if err := bt.c.Put([]byte("test/path/2"), []byte("pointer2")); err != nil { + bt.HandleErr(err, "Failed to save pointer2 to pointers bucket") + } + if err := bt.c.Put([]byte("test/path/3"), []byte("pointer3")); err != nil { + bt.HandleErr(err, "Failed to save pointer3 to pointers bucket") + } + + testPaths, err := bt.c.List(nil, storage.Limit(3)) + if err != nil { + bt.HandleErr(err, "Failed to list Paths") + } + + if !bytes.Equal(testPaths[2], []byte("test/path/3")) { + bt.HandleErr(nil, "Expected test/path/3 to be last in list") } } diff --git a/storage/common.go b/storage/common.go index 1f2451bc1..706dd972b 100644 --- a/storage/common.go +++ b/storage/common.go @@ -12,16 +12,29 @@ type Value []byte // Keys is the type for a slice of keys in a `KeyValueStore` type Keys []Key +// Limit indicates how many keys to return when calling List +type Limit int + // KeyValueStore is an interface describing key/value stores like redis and boltdb type KeyValueStore interface { // Put adds a value to the provided key in the KeyValueStore, returning an error on failure. Put(Key, Value) error Get(Key) (Value, error) - List() (Keys, error) + List(Key, Limit) (Keys, error) Delete(Key) error Close() error } +// IsZero returns true if the value struct is it's zero value +func (v *Value) IsZero() (_ bool) { + return len(*v) == 0 +} + +// IsZero returns true if the key struct is it's zero value +func (k *Key) IsZero() (_ bool) { + return len(*k) == 0 +} + // MarshalBinary implements the encoding.BinaryMarshaler interface for the Value type func (v *Value) MarshalBinary() (_ []byte, _ error) { return *v, nil diff --git a/storage/redis/client.go b/storage/redis/client.go index c0e603f71..7064f1e9f 100644 --- a/storage/redis/client.go +++ b/storage/redis/client.go @@ -4,6 +4,7 @@ package redis import ( + "fmt" "time" "github.com/go-redis/redis" @@ -49,6 +50,11 @@ func NewClient(address, password string, db int) (storage.KeyValueStore, error) func (c *redisClient) Get(key storage.Key) (storage.Value, error) { b, err := c.db.Get(string(key)).Bytes() if err != nil { + if err.Error() == "redis: nil" { + return nil, nil + } + + // TODO: log return nil, Error.New("get error", err) } @@ -72,18 +78,32 @@ func (c *redisClient) Put(key storage.Key, value storage.Value) error { } // List returns either a list of keys for which boltdb has values or an error. -func (c *redisClient) List() (_ storage.Keys, _ error) { - results, err := c.db.Keys("*").Result() - if err != nil { - return nil, Error.New("list error", err) +func (c *redisClient) List(startingKey storage.Key, limit storage.Limit) (storage.Keys, error) { + var noOrderKeys []string + if startingKey != nil { + _, cursor, err := c.db.Scan(0, fmt.Sprintf("%s", startingKey), int64(limit)).Result() + if err != nil { + return nil, Error.New("list error with starting key", err) + } + keys, _, err := c.db.Scan(cursor, "", int64(limit)).Result() + if err != nil { + return nil, Error.New("list error with starting key", err) + } + noOrderKeys = keys + } else if startingKey == nil { + keys, _, err := c.db.Scan(0, "", int64(limit)).Result() + if err != nil { + return nil, Error.New("list error without starting key", err) + } + noOrderKeys = keys } - keys := make(storage.Keys, len(results)) - for i, k := range results { - keys[i] = storage.Key(k) + listKeys := make(storage.Keys, len(noOrderKeys)) + for i, k := range noOrderKeys { + listKeys[i] = storage.Key(k) } - return keys, nil + return listKeys, nil } // Delete deletes a key/value pair from redis, for a given the key diff --git a/storage/redis/client_test.go b/storage/redis/client_test.go new file mode 100644 index 000000000..4255e7ec7 --- /dev/null +++ b/storage/redis/client_test.go @@ -0,0 +1,83 @@ +// Copyright (C) 2018 Storj Labs, Inc. +// See LICENSE for copying information. + +package redis + +import ( + "testing" + + "storj.io/storj/internal/test" + "storj.io/storj/storage" +) + +type RedisClientTest struct { + *testing.T + c storage.KeyValueStore +} + +func NewRedisClientTest(t *testing.T) *RedisClientTest { + kv := make(test.KvStore) + c := test.NewMockKeyValueStore(kv) + return &RedisClientTest{ + T: t, + c: c, + } +} + +func (rt *RedisClientTest) Close() { + rt.c.Close() +} + +func (rt *RedisClientTest) HandleErr(err error, msg string) { + rt.Error(msg) + if err != nil { + panic(err) + } + panic(msg) +} + +func TestListWithoutStartKey(t *testing.T) { + rt := NewRedisClientTest(t) + defer rt.Close() + + if err := rt.c.Put(storage.Key([]byte("path/1")), []byte("pointer1")); err != nil { + rt.HandleErr(err, "Failed to put") + } + if err := rt.c.Put(storage.Key([]byte("path/2")), []byte("pointer2")); err != nil { + rt.HandleErr(err, "Failed to put") + } + if err := rt.c.Put(storage.Key([]byte("path/3")), []byte("pointer3")); err != nil { + rt.HandleErr(err, "Failed to put") + } + + _, err := rt.c.List(nil, storage.Limit(3)) + if err != nil { + rt.HandleErr(err, "Failed to list") + } +} + +func TestListWithStartKey(t *testing.T) { + rt := NewRedisClientTest(t) + defer rt.Close() + + if err := rt.c.Put(storage.Key([]byte("path/1")), []byte("pointer1")); err != nil { + rt.HandleErr(err, "Failed to put") + } + if err := rt.c.Put(storage.Key([]byte("path/2")), []byte("pointer2")); err != nil { + rt.HandleErr(err, "Failed to put") + } + if err := rt.c.Put(storage.Key([]byte("path/3")), []byte("pointer3")); err != nil { + rt.HandleErr(err, "Failed to put") + } + if err := rt.c.Put(storage.Key([]byte("path/4")), []byte("pointer4")); err != nil { + rt.HandleErr(err, "Failed to put") + } + if err := rt.c.Put(storage.Key([]byte("path/5")), []byte("pointer5")); err != nil { + rt.HandleErr(err, "Failed to put") + } + + _, err := rt.c.List([]byte("path/2"), storage.Limit(2)) + if err != nil { + rt.HandleErr(err, "Failed to list") + } +}