Overlay Bulk Lookup (#279)

* implements bulk node lookup from overlay cache
This commit is contained in:
Jennifer Li Johnson 2018-09-11 00:52:14 -04:00 committed by GitHub
parent 7a906285e3
commit d0f87f0de1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 716 additions and 96 deletions

View File

@ -24,6 +24,7 @@ func (k *KvStore) Empty() bool {
type MockKeyValueStore struct {
Data KvStore
GetCalled int
GetAllCalled int
PutCalled int
ListCalled int
ReverseListCalled int
@ -79,6 +80,7 @@ func (store *MockKeyValueStore) List(first storage.Key, limit int) (storage.Keys
// GetAll is a noop to adhere to the interface
func (store *MockKeyValueStore) GetAll(keys storage.Keys) (values storage.Values, err error) {
store.GetAllCalled++
result := storage.Values{}
for _, v := range keys {
result = append(result, store.Data[v.String()])

View File

@ -64,7 +64,7 @@ func NewBoltOverlayCache(dbPath string, DHT dht.DHT) (*Cache, error) {
}, nil
}
// Get looks up the provided nodeID from the redis cache
// Get looks up the provided nodeID from the overlay cache
func (o *Cache) Get(ctx context.Context, key string) (*overlay.Node, error) {
b, err := o.DB.Get([]byte(key))
if err != nil {
@ -83,6 +83,35 @@ func (o *Cache) Get(ctx context.Context, key string) (*overlay.Node, error) {
return na, nil
}
// GetAll looks up the provided nodeIDs from the overlay cache
func (o *Cache) GetAll(ctx context.Context, keys []string) ([]*overlay.Node, error) {
if len(keys) == 0 {
return nil, OverlayError.New("no keys provided")
}
var ks storage.Keys
for _, v := range keys {
ks = append(ks, storage.Key(v))
}
vs, err := o.DB.GetAll(ks)
if err != nil {
return nil, err
}
var ns []*overlay.Node
for _, v := range vs {
if v == nil {
ns = append(ns, nil)
continue
}
na := &overlay.Node{}
err := proto.Unmarshal(v, na)
if err != nil {
return nil, OverlayError.New("could not unmarshal non-nil node: %v", err)
}
ns = append(ns, na)
}
return ns, nil
}
// Put adds a nodeID to the redis cache with a binary representation of proto defined Node
func (o *Cache) Put(nodeID string, value overlay.Node) error {
data, err := proto.Marshal(&value)

View File

@ -33,6 +33,7 @@ var (
type dbClient int
type responses map[dbClient]*overlay.Node
type responsesB map[dbClient][]*overlay.Node
type errors map[dbClient]*errs.Class
const (
@ -186,7 +187,141 @@ var (
}()},
},
}
getAllCases = []struct {
testID string
expectedTimesCalled int
keys []string
expectedResponses responsesB
expectedErrors errors
data test.KvStore
}{
{testID: "valid GetAll",
expectedTimesCalled: 1,
keys: []string{"key1"},
expectedResponses: func() responsesB {
n1 := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}}
ns := []*overlay.Node{n1}
return responsesB{
mock: ns,
bolt: ns,
_redis: ns,
}
}(),
expectedErrors: errors{
mock: nil,
bolt: nil,
_redis: nil,
},
data: test.KvStore{
"key1": func() storage.Value {
na := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}}
d, err := proto.Marshal(na)
if err != nil {
panic(err)
}
return d
}(),
},
},
{testID: "valid GetAll",
expectedTimesCalled: 1,
keys: []string{"key1", "key2"},
expectedResponses: func() responsesB {
n1 := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}}
n2 := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9998"}}
ns := []*overlay.Node{n1, n2}
return responsesB{
mock: ns,
bolt: ns,
_redis: ns,
}
}(),
expectedErrors: errors{
mock: nil,
bolt: nil,
_redis: nil,
},
data: test.KvStore{
"key1": func() storage.Value {
na := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}}
d, err := proto.Marshal(na)
if err != nil {
panic(err)
}
return d
}(),
"key2": func() storage.Value {
na := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9998"}}
d, err := proto.Marshal(na)
if err != nil {
panic(err)
}
return d
}(),
},
},
{testID: "mix of valid and nil nodes returned",
expectedTimesCalled: 1,
keys: []string{"key1", "key3"},
expectedResponses: func() responsesB {
n1 := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}}
ns := []*overlay.Node{n1, nil}
return responsesB{
mock: ns,
bolt: ns,
_redis: ns,
}
}(),
expectedErrors: errors{
mock: nil,
bolt: nil,
_redis: nil,
},
data: test.KvStore{
"key1": func() storage.Value {
na := &overlay.Node{Address: &overlay.NodeAddress{Transport: overlay.NodeTransport_TCP, Address: "127.0.0.1:9999"}}
d, err := proto.Marshal(na)
if err != nil {
panic(err)
}
return d
}(),
},
},
{testID: "empty string keys",
expectedTimesCalled: 1,
keys: []string{"", ""},
expectedResponses: func() responsesB {
ns := []*overlay.Node{nil, nil}
return responsesB{
mock: ns,
bolt: ns,
_redis: ns,
}
}(),
expectedErrors: errors{
mock: nil,
bolt: nil,
_redis: nil,
},
},
{testID: "empty keys",
expectedTimesCalled: 0,
keys: []string{},
expectedResponses: func() responsesB {
return responsesB{
mock: nil,
bolt: nil,
_redis: nil,
}
}(),
expectedErrors: errors{
mock: &OverlayError,
bolt: &OverlayError,
_redis: &OverlayError,
},
},
}
putCases = []struct {
testID string
expectedTimesCalled int
@ -282,6 +417,24 @@ func TestRedisGet(t *testing.T) {
}
}
func TestRedisGetAll(t *testing.T) {
redisAddr, cleanup, err := redisserver.Start()
if err != nil {
t.Fatal(err)
}
defer cleanup()
for _, c := range getAllCases {
t.Run(c.testID, func(t *testing.T) {
db := redisTestClient(t, redisAddr, c.data)
oc := Cache{DB: db}
resp, err := oc.GetAll(ctx, c.keys)
assertErrClass(t, c.expectedErrors[_redis], err)
assert.Equal(t, c.expectedResponses[_redis], resp)
})
}
}
func assertErrClass(t *testing.T, class *errs.Class, err error) {
if class != nil {
assert.True(t, class.Has(err))
@ -330,6 +483,18 @@ func TestBoltGet(t *testing.T) {
}
}
func TestBoltGetAll(t *testing.T) {
for _, c := range getAllCases {
t.Run(c.testID, func(t *testing.T) {
db, cleanup := boltTestClient(t, c.data)
defer cleanup()
oc := Cache{DB: db}
resp, err := oc.GetAll(ctx, c.keys)
assertErrClass(t, c.expectedErrors[bolt], err)
assert.Equal(t, c.expectedResponses[bolt], resp)
})
}
}
func TestBoltPut(t *testing.T) {
for _, c := range putCases {
t.Run(c.testID, func(t *testing.T) {
@ -368,6 +533,23 @@ func TestMockGet(t *testing.T) {
}
}
func TestMockGetAll(t *testing.T) {
for _, c := range getAllCases {
t.Run(c.testID, func(t *testing.T) {
db := test.NewMockKeyValueStore(c.data)
oc := Cache{DB: db}
assert.Equal(t, 0, db.GetAllCalled)
resp, err := oc.GetAll(ctx, c.keys)
assertErrClass(t, c.expectedErrors[mock], err)
assert.Equal(t, c.expectedResponses[mock], resp)
assert.Equal(t, c.expectedTimesCalled, db.GetAllCalled)
})
}
}
func TestMockPut(t *testing.T) {
for _, c := range putCases {
t.Run(c.testID, func(t *testing.T) {

View File

@ -6,6 +6,8 @@ package overlay
import (
"context"
"github.com/zeebo/errs"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/provider"
proto "storj.io/storj/protos/overlay"
@ -18,9 +20,15 @@ import (
// space is the storage and bandwidth requested consumption in bytes.
//
// Lookup finds a Node with the provided identifier.
// ClientError creates class of errors for stack traces
var ClientError = errs.Class("Client Error")
//Client implements the Overlay Client interface
type Client interface {
Choose(ctx context.Context, limit int, space int64) ([]*proto.Node, error)
Lookup(ctx context.Context, nodeID dht.NodeID) (*proto.Node, error)
BulkLookup(ctx context.Context, nodeIDs []dht.NodeID) ([]*proto.Node, error)
}
// Overlay is the overlay concrete implementation of the client interface
@ -62,7 +70,7 @@ func (o *Overlay) Choose(ctx context.Context, amount int, space int64) ([]*proto
return resp.GetNodes(), nil
}
// Lookup provides a Node with the given address
// Lookup provides a Node with the given ID
func (o *Overlay) Lookup(ctx context.Context, nodeID dht.NodeID) (*proto.Node, error) {
resp, err := o.client.Lookup(ctx, &proto.LookupRequest{NodeID: nodeID.String()})
if err != nil {
@ -71,3 +79,22 @@ func (o *Overlay) Lookup(ctx context.Context, nodeID dht.NodeID) (*proto.Node, e
return resp.GetNode(), nil
}
//BulkLookup provides a list of Nodes with the given IDs
func (o *Overlay) BulkLookup(ctx context.Context, nodeIDs []dht.NodeID) ([]*proto.Node, error) {
var reqs proto.LookupRequests
for _, v := range nodeIDs {
reqs.Lookuprequest = append(reqs.Lookuprequest, &proto.LookupRequest{NodeID: v.String()})
}
resp, err := o.client.BulkLookup(ctx, &reqs)
if err != nil {
return nil, ClientError.Wrap(err)
}
var nodes []*proto.Node
for _, v := range resp.Lookupresponse {
nodes = append(nodes, v.Node)
}
return nodes, nil
}

View File

@ -5,13 +5,16 @@ package overlay
import (
"context"
"fmt"
"net"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebo/errs"
"google.golang.org/grpc"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/provider"
proto "storj.io/storj/protos/overlay"
)
@ -127,7 +130,147 @@ func TestLookup(t *testing.T) {
}
}
func TestBulkLookup(t *testing.T) {
cases := []struct {
nodeIDs []dht.NodeID
expectedCalls int
}{
{
nodeIDs: []dht.NodeID{mockNodeID{}, mockNodeID{}, mockNodeID{}},
expectedCalls: 1,
},
}
for _, v := range cases {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err)
srv, mock, err := newTestServer(ctx)
assert.NoError(t, err)
go srv.Serve(lis)
defer srv.Stop()
ca, err := provider.NewCA(ctx, 12, 4)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
oc, err := NewOverlayClient(identity, lis.Addr().String())
assert.NoError(t, err)
assert.NotNil(t, oc)
assert.NotEmpty(t, oc.client)
_, err = oc.BulkLookup(ctx, v.nodeIDs)
assert.NoError(t, err)
assert.Equal(t, mock.bulkLookupCalled, v.expectedCalls)
}
}
func TestBulkLookupV2(t *testing.T) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err)
srv, s, err := newServer(ctx)
assert.NoError(t, err)
go srv.Serve(lis)
defer srv.Stop()
ca, err := provider.NewCA(ctx, 12, 4)
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
oc, err := NewOverlayClient(identity, lis.Addr().String())
assert.NoError(t, err)
assert.NotNil(t, oc)
assert.NotEmpty(t, oc.client)
n1 := &proto.Node{Id: "n1"}
n2 := &proto.Node{Id: "n2"}
n3 := &proto.Node{Id: "n3"}
nodes := []*proto.Node{n1, n2, n3}
for _, n := range nodes {
s.cache.Put(n.Id, *n)
}
cases := []struct {
testID string
nodeIDs []dht.NodeID
responses []*proto.Node
errors *errs.Class
}{
{testID: "empty id",
nodeIDs: []dht.NodeID{},
responses: nil,
errors: &ClientError,
},
{testID: "valid ids",
nodeIDs: func() []dht.NodeID {
id1 := kademlia.StringToNodeID("n1")
id2 := kademlia.StringToNodeID("n2")
id3 := kademlia.StringToNodeID("n3")
return []dht.NodeID{id1, id2, id3}
}(),
responses: nodes,
errors: nil,
},
{testID: "missing ids",
nodeIDs: func() []dht.NodeID {
id1 := kademlia.StringToNodeID("n4")
id2 := kademlia.StringToNodeID("n5")
return []dht.NodeID{id1, id2}
}(),
responses: []*proto.Node{nil, nil},
errors: nil,
},
{testID: "random order and nil",
nodeIDs: func() []dht.NodeID {
id1 := kademlia.StringToNodeID("n1")
id2 := kademlia.StringToNodeID("n2")
id3 := kademlia.StringToNodeID("n3")
id4 := kademlia.StringToNodeID("n4")
return []dht.NodeID{id2, id1, id3, id4}
}(),
responses: func() []*proto.Node {
return []*proto.Node{nodes[1], nodes[0], nodes[2], nil}
}(),
errors: nil,
},
}
for _, c := range cases {
t.Run(c.testID, func(t *testing.T) {
ns, err := oc.BulkLookup(ctx, c.nodeIDs)
assertErrClass(t, c.errors, err)
assert.Equal(t, c.responses, ns)
})
}
}
func newServer(ctx context.Context) (*grpc.Server, *Server, error) {
ca, err := provider.NewCA(ctx, 12, 4)
if err != nil {
return nil, nil, err
}
identity, err := ca.NewIdentity()
if err != nil {
return nil, nil, err
}
identOpt, err := identity.ServerOption()
if err != nil {
return nil, nil, err
}
grpcServer := grpc.NewServer(identOpt)
cache, err := NewRedisOverlayCache("127.0.0.1:6379", "", 1, nil)
if err != nil {
return nil, nil, err
}
s := &Server{cache: cache}
proto.RegisterOverlayServer(grpcServer, s)
return grpcServer, s, nil
}
func newTestServer(ctx context.Context) (*grpc.Server, *mockOverlayServer, error) {
ca, err := provider.NewCA(ctx, 12, 4)
if err != nil {
@ -153,6 +296,7 @@ func newTestServer(ctx context.Context) (*grpc.Server, *mockOverlayServer, error
type mockOverlayServer struct {
lookupCalled int
bulkLookupCalled int
FindStorageNodesCalled int
}
@ -165,3 +309,8 @@ func (o *mockOverlayServer) FindStorageNodes(ctx context.Context, req *proto.Fin
o.FindStorageNodesCalled++
return &proto.FindStorageNodesResponse{}, nil
}
func (o *mockOverlayServer) BulkLookup(ctx context.Context, reqs *proto.LookupRequests) (*proto.LookupResponses, error) {
o.bulkLookupCalled++
return &proto.LookupResponses{}, nil
}

View File

@ -5,6 +5,7 @@ package overlay
import (
"context"
// "fmt"
"strings"
"github.com/zeebo/errs"
@ -48,6 +49,18 @@ func (mo *MockOverlay) Lookup(ctx context.Context, req *proto.LookupRequest) (
return &proto.LookupResponse{Node: mo.nodes[req.NodeID]}, nil
}
//BulkLookup finds multiple storage nodes based on the requests
func (mo *MockOverlay) BulkLookup(ctx context.Context, reqs *proto.LookupRequests) (
*proto.LookupResponses, error) {
var responses []*proto.LookupResponse
for _, r := range reqs.Lookuprequest {
n := *mo.nodes[r.NodeID]
resp := &proto.LookupResponse{Node: &n}
responses = append(responses, resp)
}
return &proto.LookupResponses{Lookupresponse: responses}, nil
}
// MockConfig specifies static nodes for mock overlay
type MockConfig struct {
Nodes string `help:"a comma-separated list of <node-id>:<ip>:<port>" default:""`

View File

@ -6,9 +6,8 @@ package mock_overlay
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
dht "storj.io/storj/pkg/dht"
overlay "storj.io/storj/protos/overlay"
)
@ -36,6 +35,19 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder {
return m.recorder
}
// BulkLookup mocks base method
func (m *MockClient) BulkLookup(arg0 context.Context, arg1 []dht.NodeID) ([]*overlay.Node, error) {
ret := m.ctrl.Call(m, "BulkLookup", arg0, arg1)
ret0, _ := ret[0].([]*overlay.Node)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// BulkLookup indicates an expected call of BulkLookup
func (mr *MockClientMockRecorder) BulkLookup(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkLookup", reflect.TypeOf((*MockClient)(nil).BulkLookup), arg0, arg1)
}
// Choose mocks base method
func (m *MockClient) Choose(arg0 context.Context, arg1 int, arg2 int64) ([]*overlay.Node, error) {
ret := m.ctrl.Call(m, "Choose", arg0, arg1, arg2)

View File

@ -5,6 +5,7 @@ package overlay
import (
"context"
"fmt"
"net"
"testing"
@ -62,3 +63,28 @@ func TestOverlayLookup(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, r)
}
func TestOverlayBulkLookup(t *testing.T) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 0))
assert.NoError(t, err)
id, err := kademlia.NewID()
assert.NoError(t, err)
id2, err := kademlia.NewID()
assert.NoError(t, err)
srv := NewMockServer(test.KvStore{id.String(): NewNodeAddressValue(t, "127.0.0.1:9090")})
go srv.Serve(lis)
defer srv.Stop()
address := lis.Addr().String()
c, err := NewClient(address, grpc.WithInsecure())
assert.NoError(t, err)
req1 := &proto.LookupRequest{NodeID: id.String()}
req2 := &proto.LookupRequest{NodeID: id2.String()}
rs := &proto.LookupRequests{Lookuprequest: []*proto.LookupRequest{req1, req2}}
r, err := c.BulkLookup(context.Background(), rs)
assert.NoError(t, err)
assert.NotNil(t, r)
}

View File

@ -6,9 +6,11 @@ package overlay
import (
"context"
"fmt"
protob "github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/spacemonkeygo/monkit.v2"
@ -18,6 +20,9 @@ import (
"storj.io/storj/storage"
)
// ServerError creates class of errors for stack traces
var ServerError = errs.Class("Server Error")
// Server implements our overlay RPC service
type Server struct {
dht dht.DHT
@ -40,6 +45,16 @@ func (o *Server) Lookup(ctx context.Context, req *proto.LookupRequest) (*proto.L
}, nil
}
//BulkLookup finds the addresses of nodes in our overlay network
func (o *Server) BulkLookup(ctx context.Context, reqs *proto.LookupRequests) (*proto.LookupResponses, error) {
ns, err := o.cache.GetAll(ctx, lookupRequestsToNodeIDs(reqs))
if err != nil {
return nil, ServerError.New("could not get nodes requested %s\n", err)
}
return nodesToLookupResponses(ns), nil
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (o *Server) FindStorageNodes(ctx context.Context, req *proto.FindStorageNodesRequest) (resp *proto.FindStorageNodesResponse, err error) {
opts := req.GetOpts()
@ -138,3 +153,22 @@ func (o *Server) populate(ctx context.Context, starting storage.Key, maxNodes, r
return result, nextStart, nil
}
//lookupRequestsToNodeIDs returns the nodeIDs from the LookupRequests
func lookupRequestsToNodeIDs(reqs *proto.LookupRequests) []string {
var ids []string
for _, v := range reqs.Lookuprequest {
ids = append(ids, v.NodeID)
}
return ids
}
//nodesToLookupResponses returns LookupResponses from the nodes
func nodesToLookupResponses(nodes []*proto.Node) *proto.LookupResponses {
var rs []*proto.LookupResponse
for _, v := range nodes {
r := &proto.LookupResponse{Node: v}
rs = append(rs, r)
}
return &proto.LookupResponses{Lookupresponse: rs}
}

View File

@ -42,6 +42,10 @@ func (o *TestMockOverlay) Lookup(ctx context.Context, req *proto.LookupRequest)
return &proto.LookupResponse{}, nil
}
func (o *TestMockOverlay) BulkLookup(ctx context.Context, reqs *proto.LookupRequests) (*proto.LookupResponses, error) {
return &proto.LookupResponses{}, nil
}
func TestNewServerNilArgs(t *testing.T) {
server := NewServer(nil, nil, nil, nil)

View File

@ -119,8 +119,9 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*proto.Node, rs eestream.Re
func (ec *ecClient) Get(ctx context.Context, nodes []*proto.Node, es eestream.ErasureScheme,
pieceID client.PieceID, size int64) (rr ranger.RangeCloser, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodes) != es.TotalCount() {
return nil, Error.New("number of nodes do not match total count of erasure scheme")
return nil, Error.New("number of nodes (%v) do not match total count (%v) of erasure scheme", len(nodes), es.TotalCount())
}
paddedSize := calcPadded(size, es.DecodedBlockSize())
pieceSize := paddedSize / int64(es.RequiredCount())

View File

@ -133,7 +133,7 @@ TestLoop:
errString string
}{
{[]*proto.Node{}, 0, 0, true, []error{},
fmt.Sprintf("ecclient error: number of nodes (0) do not match total count (%d) of erasure scheme", n)},
fmt.Sprintf("ecclient error: number of nodes (0) do not match total count (%v) of erasure scheme", n)},
{[]*proto.Node{node0, node1, node2, node3}, 0, -1, true,
[]error{nil, nil, nil, nil},
"eestream error: negative max buffer memory"},
@ -217,7 +217,7 @@ TestLoop:
errString string
}{
{[]*proto.Node{}, 0, []error{}, "ecclient error: " +
"number of nodes do not match total count of erasure scheme"},
fmt.Sprintf("number of nodes (0) do not match total count (%v) of erasure scheme", n)},
{[]*proto.Node{node0, node1, node2, node3}, -1,
[]error{nil, nil, nil, nil},
"eestream error: negative max buffer memory"},

View File

@ -21,6 +21,7 @@ import (
"storj.io/storj/pkg/piecestore/rpc/client"
"storj.io/storj/pkg/pointerdb/pdbclient"
"storj.io/storj/pkg/ranger"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/storage/ec"
opb "storj.io/storj/protos/overlay"
ppb "storj.io/storj/protos/pointerdb"
@ -248,17 +249,15 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) (err error)
}
// lookupNodes calls Lookup to get node addresses from the overlay
func (s *segmentStore) lookupNodes(ctx context.Context, seg *ppb.RemoteSegment) (
nodes []*opb.Node, err error) {
nodes = make([]*opb.Node, len(seg.GetRemotePieces()))
for i, p := range seg.GetRemotePieces() {
node, err := s.oc.Lookup(ctx, kademlia.StringToNodeID(p.GetNodeId()))
if err != nil {
// TODO(kaloyan): better error handling: failing to lookup a few
// nodes should not fail the request
return nil, Error.Wrap(err)
}
nodes[i] = node
func (s *segmentStore) lookupNodes(ctx context.Context, seg *ppb.RemoteSegment) (nodes []*opb.Node, err error) {
pieces := seg.GetRemotePieces()
var nodeIds []dht.NodeID
for _, p := range pieces {
nodeIds = append(nodeIds, kademlia.StringToNodeID(p.GetNodeId()))
}
nodes, err = s.oc.BulkLookup(ctx, nodeIds)
if err != nil {
return nil, Error.Wrap(err)
}
return nodes, nil
}

View File

@ -282,6 +282,7 @@ func TestSegmentStoreGetRemote(t *testing.T) {
Size: tt.size,
Metadata: tt.metadata,
}, nil),
mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()),
mockEC.EXPECT().Get(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
),
@ -397,6 +398,7 @@ func TestSegmentStoreDeleteRemote(t *testing.T) {
Size: tt.size,
Metadata: tt.metadata,
}, nil),
mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()),
mockEC.EXPECT().Delete(
gomock.Any(), gomock.Any(), gomock.Any(),
),

View File

@ -62,3 +62,4 @@ func (errs combinedError) Error() string {
}
return ""
}

View File

@ -42,7 +42,7 @@ func (x NodeTransport) String() string {
return proto.EnumName(NodeTransport_name, int32(x))
}
func (NodeTransport) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{0}
return fileDescriptor_overlay_b80a72cb582d0132, []int{0}
}
// NodeType is an enum of possible node types
@ -66,7 +66,7 @@ func (x NodeType) String() string {
return proto.EnumName(NodeType_name, int32(x))
}
func (NodeType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{1}
return fileDescriptor_overlay_b80a72cb582d0132, []int{1}
}
type Restriction_Operator int32
@ -98,7 +98,7 @@ func (x Restriction_Operator) String() string {
return proto.EnumName(Restriction_Operator_name, int32(x))
}
func (Restriction_Operator) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{11, 0}
return fileDescriptor_overlay_b80a72cb582d0132, []int{13, 0}
}
type Restriction_Operand int32
@ -121,7 +121,7 @@ func (x Restriction_Operand) String() string {
return proto.EnumName(Restriction_Operand_name, int32(x))
}
func (Restriction_Operand) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{11, 1}
return fileDescriptor_overlay_b80a72cb582d0132, []int{13, 1}
}
// LookupRequest is is request message for the lookup rpc call
@ -136,7 +136,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} }
func (m *LookupRequest) String() string { return proto.CompactTextString(m) }
func (*LookupRequest) ProtoMessage() {}
func (*LookupRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{0}
return fileDescriptor_overlay_b80a72cb582d0132, []int{0}
}
func (m *LookupRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LookupRequest.Unmarshal(m, b)
@ -175,7 +175,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} }
func (m *LookupResponse) String() string { return proto.CompactTextString(m) }
func (*LookupResponse) ProtoMessage() {}
func (*LookupResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{1}
return fileDescriptor_overlay_b80a72cb582d0132, []int{1}
}
func (m *LookupResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LookupResponse.Unmarshal(m, b)
@ -202,6 +202,84 @@ func (m *LookupResponse) GetNode() *Node {
return nil
}
// LookupRequests is a list of LookupRequest
type LookupRequests struct {
Lookuprequest []*LookupRequest `protobuf:"bytes,1,rep,name=lookuprequest,proto3" json:"lookuprequest,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LookupRequests) Reset() { *m = LookupRequests{} }
func (m *LookupRequests) String() string { return proto.CompactTextString(m) }
func (*LookupRequests) ProtoMessage() {}
func (*LookupRequests) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_b80a72cb582d0132, []int{2}
}
func (m *LookupRequests) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LookupRequests.Unmarshal(m, b)
}
func (m *LookupRequests) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LookupRequests.Marshal(b, m, deterministic)
}
func (dst *LookupRequests) XXX_Merge(src proto.Message) {
xxx_messageInfo_LookupRequests.Merge(dst, src)
}
func (m *LookupRequests) XXX_Size() int {
return xxx_messageInfo_LookupRequests.Size(m)
}
func (m *LookupRequests) XXX_DiscardUnknown() {
xxx_messageInfo_LookupRequests.DiscardUnknown(m)
}
var xxx_messageInfo_LookupRequests proto.InternalMessageInfo
func (m *LookupRequests) GetLookuprequest() []*LookupRequest {
if m != nil {
return m.Lookuprequest
}
return nil
}
// LookupResponse is a list of LookupResponse
type LookupResponses struct {
Lookupresponse []*LookupResponse `protobuf:"bytes,1,rep,name=lookupresponse,proto3" json:"lookupresponse,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LookupResponses) Reset() { *m = LookupResponses{} }
func (m *LookupResponses) String() string { return proto.CompactTextString(m) }
func (*LookupResponses) ProtoMessage() {}
func (*LookupResponses) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_b80a72cb582d0132, []int{3}
}
func (m *LookupResponses) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LookupResponses.Unmarshal(m, b)
}
func (m *LookupResponses) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LookupResponses.Marshal(b, m, deterministic)
}
func (dst *LookupResponses) XXX_Merge(src proto.Message) {
xxx_messageInfo_LookupResponses.Merge(dst, src)
}
func (m *LookupResponses) XXX_Size() int {
return xxx_messageInfo_LookupResponses.Size(m)
}
func (m *LookupResponses) XXX_DiscardUnknown() {
xxx_messageInfo_LookupResponses.DiscardUnknown(m)
}
var xxx_messageInfo_LookupResponses proto.InternalMessageInfo
func (m *LookupResponses) GetLookupresponse() []*LookupResponse {
if m != nil {
return m.Lookupresponse
}
return nil
}
// FindStorageNodesResponse is is response message for the FindStorageNodes rpc call
type FindStorageNodesResponse struct {
Nodes []*Node `protobuf:"bytes,1,rep,name=nodes,proto3" json:"nodes,omitempty"`
@ -214,7 +292,7 @@ func (m *FindStorageNodesResponse) Reset() { *m = FindStorageNodesRespon
func (m *FindStorageNodesResponse) String() string { return proto.CompactTextString(m) }
func (*FindStorageNodesResponse) ProtoMessage() {}
func (*FindStorageNodesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{2}
return fileDescriptor_overlay_b80a72cb582d0132, []int{4}
}
func (m *FindStorageNodesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FindStorageNodesResponse.Unmarshal(m, b)
@ -255,7 +333,7 @@ func (m *FindStorageNodesRequest) Reset() { *m = FindStorageNodesRequest
func (m *FindStorageNodesRequest) String() string { return proto.CompactTextString(m) }
func (*FindStorageNodesRequest) ProtoMessage() {}
func (*FindStorageNodesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{3}
return fileDescriptor_overlay_b80a72cb582d0132, []int{5}
}
func (m *FindStorageNodesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FindStorageNodesRequest.Unmarshal(m, b)
@ -309,7 +387,7 @@ func (m *NodeAddress) Reset() { *m = NodeAddress{} }
func (m *NodeAddress) String() string { return proto.CompactTextString(m) }
func (*NodeAddress) ProtoMessage() {}
func (*NodeAddress) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{4}
return fileDescriptor_overlay_b80a72cb582d0132, []int{6}
}
func (m *NodeAddress) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeAddress.Unmarshal(m, b)
@ -359,7 +437,7 @@ func (m *OverlayOptions) Reset() { *m = OverlayOptions{} }
func (m *OverlayOptions) String() string { return proto.CompactTextString(m) }
func (*OverlayOptions) ProtoMessage() {}
func (*OverlayOptions) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{5}
return fileDescriptor_overlay_b80a72cb582d0132, []int{7}
}
func (m *OverlayOptions) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_OverlayOptions.Unmarshal(m, b)
@ -425,7 +503,7 @@ func (m *NodeRep) Reset() { *m = NodeRep{} }
func (m *NodeRep) String() string { return proto.CompactTextString(m) }
func (*NodeRep) ProtoMessage() {}
func (*NodeRep) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{6}
return fileDescriptor_overlay_b80a72cb582d0132, []int{8}
}
func (m *NodeRep) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeRep.Unmarshal(m, b)
@ -458,7 +536,7 @@ func (m *NodeRestrictions) Reset() { *m = NodeRestrictions{} }
func (m *NodeRestrictions) String() string { return proto.CompactTextString(m) }
func (*NodeRestrictions) ProtoMessage() {}
func (*NodeRestrictions) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{7}
return fileDescriptor_overlay_b80a72cb582d0132, []int{9}
}
func (m *NodeRestrictions) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NodeRestrictions.Unmarshal(m, b)
@ -507,7 +585,7 @@ func (m *Node) Reset() { *m = Node{} }
func (m *Node) String() string { return proto.CompactTextString(m) }
func (*Node) ProtoMessage() {}
func (*Node) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{8}
return fileDescriptor_overlay_b80a72cb582d0132, []int{10}
}
func (m *Node) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Node.Unmarshal(m, b)
@ -568,7 +646,7 @@ func (m *QueryRequest) Reset() { *m = QueryRequest{} }
func (m *QueryRequest) String() string { return proto.CompactTextString(m) }
func (*QueryRequest) ProtoMessage() {}
func (*QueryRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{9}
return fileDescriptor_overlay_b80a72cb582d0132, []int{11}
}
func (m *QueryRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryRequest.Unmarshal(m, b)
@ -621,7 +699,7 @@ func (m *QueryResponse) Reset() { *m = QueryResponse{} }
func (m *QueryResponse) String() string { return proto.CompactTextString(m) }
func (*QueryResponse) ProtoMessage() {}
func (*QueryResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{10}
return fileDescriptor_overlay_b80a72cb582d0132, []int{12}
}
func (m *QueryResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryResponse.Unmarshal(m, b)
@ -668,7 +746,7 @@ func (m *Restriction) Reset() { *m = Restriction{} }
func (m *Restriction) String() string { return proto.CompactTextString(m) }
func (*Restriction) ProtoMessage() {}
func (*Restriction) Descriptor() ([]byte, []int) {
return fileDescriptor_overlay_1471bb43182d3596, []int{11}
return fileDescriptor_overlay_b80a72cb582d0132, []int{13}
}
func (m *Restriction) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Restriction.Unmarshal(m, b)
@ -712,6 +790,8 @@ func (m *Restriction) GetValue() int64 {
func init() {
proto.RegisterType((*LookupRequest)(nil), "overlay.LookupRequest")
proto.RegisterType((*LookupResponse)(nil), "overlay.LookupResponse")
proto.RegisterType((*LookupRequests)(nil), "overlay.LookupRequests")
proto.RegisterType((*LookupResponses)(nil), "overlay.LookupResponses")
proto.RegisterType((*FindStorageNodesResponse)(nil), "overlay.FindStorageNodesResponse")
proto.RegisterType((*FindStorageNodesRequest)(nil), "overlay.FindStorageNodesRequest")
proto.RegisterType((*NodeAddress)(nil), "overlay.NodeAddress")
@ -742,6 +822,8 @@ const _ = grpc.SupportPackageIsVersion4
type OverlayClient interface {
// Lookup finds a nodes address from the network
Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error)
// BulkLookup finds nodes addresses from the network
BulkLookup(ctx context.Context, in *LookupRequests, opts ...grpc.CallOption) (*LookupResponses, error)
// FindStorageNodes finds a list of nodes in the network that meet the specified request parameters
FindStorageNodes(ctx context.Context, in *FindStorageNodesRequest, opts ...grpc.CallOption) (*FindStorageNodesResponse, error)
}
@ -763,6 +845,15 @@ func (c *overlayClient) Lookup(ctx context.Context, in *LookupRequest, opts ...g
return out, nil
}
func (c *overlayClient) BulkLookup(ctx context.Context, in *LookupRequests, opts ...grpc.CallOption) (*LookupResponses, error) {
out := new(LookupResponses)
err := c.cc.Invoke(ctx, "/overlay.Overlay/BulkLookup", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *overlayClient) FindStorageNodes(ctx context.Context, in *FindStorageNodesRequest, opts ...grpc.CallOption) (*FindStorageNodesResponse, error) {
out := new(FindStorageNodesResponse)
err := c.cc.Invoke(ctx, "/overlay.Overlay/FindStorageNodes", in, out, opts...)
@ -776,6 +867,8 @@ func (c *overlayClient) FindStorageNodes(ctx context.Context, in *FindStorageNod
type OverlayServer interface {
// Lookup finds a nodes address from the network
Lookup(context.Context, *LookupRequest) (*LookupResponse, error)
// BulkLookup finds nodes addresses from the network
BulkLookup(context.Context, *LookupRequests) (*LookupResponses, error)
// FindStorageNodes finds a list of nodes in the network that meet the specified request parameters
FindStorageNodes(context.Context, *FindStorageNodesRequest) (*FindStorageNodesResponse, error)
}
@ -802,6 +895,24 @@ func _Overlay_Lookup_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler)
}
func _Overlay_BulkLookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(LookupRequests)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(OverlayServer).BulkLookup(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/overlay.Overlay/BulkLookup",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(OverlayServer).BulkLookup(ctx, req.(*LookupRequests))
}
return interceptor(ctx, in, info, handler)
}
func _Overlay_FindStorageNodes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(FindStorageNodesRequest)
if err := dec(in); err != nil {
@ -828,6 +939,10 @@ var _Overlay_serviceDesc = grpc.ServiceDesc{
MethodName: "Lookup",
Handler: _Overlay_Lookup_Handler,
},
{
MethodName: "BulkLookup",
Handler: _Overlay_BulkLookup_Handler,
},
{
MethodName: "FindStorageNodes",
Handler: _Overlay_FindStorageNodes_Handler,
@ -901,57 +1016,61 @@ var _Nodes_serviceDesc = grpc.ServiceDesc{
Metadata: "overlay.proto",
}
func init() { proto.RegisterFile("overlay.proto", fileDescriptor_overlay_1471bb43182d3596) }
func init() { proto.RegisterFile("overlay.proto", fileDescriptor_overlay_b80a72cb582d0132) }
var fileDescriptor_overlay_1471bb43182d3596 = []byte{
// 771 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0x8e, 0xf3, 0xe7, 0x64, 0xf2, 0x23, 0x77, 0x55, 0x5a, 0x13, 0x41, 0xd5, 0x1a, 0x2a, 0xa0,
0x48, 0xa9, 0x94, 0x56, 0x95, 0x7a, 0x40, 0x55, 0x20, 0xa1, 0xaa, 0x08, 0x0d, 0xdd, 0x58, 0xe2,
0xc4, 0xc1, 0x89, 0xb7, 0xa9, 0x69, 0xe2, 0x35, 0xeb, 0x75, 0x21, 0xbc, 0x0b, 0x0f, 0x80, 0xc4,
0x03, 0x72, 0x42, 0xc8, 0xeb, 0xb5, 0x13, 0x27, 0x6d, 0x05, 0x27, 0x7b, 0x66, 0xbe, 0x99, 0xfd,
0xe6, 0x17, 0x6a, 0xf4, 0x86, 0xb0, 0x89, 0x35, 0x6b, 0x7a, 0x8c, 0x72, 0x8a, 0x54, 0x29, 0x36,
0xb6, 0xc6, 0x94, 0x8e, 0x27, 0x64, 0x5f, 0xa8, 0x87, 0xc1, 0xe5, 0xbe, 0x1d, 0x30, 0x8b, 0x3b,
0xd4, 0x8d, 0x80, 0xc6, 0x33, 0xa8, 0xf5, 0x28, 0xbd, 0x0e, 0x3c, 0x4c, 0xbe, 0x04, 0xc4, 0xe7,
0x68, 0x03, 0x8a, 0x2e, 0xb5, 0xc9, 0x59, 0x47, 0x57, 0xb6, 0x95, 0xe7, 0x65, 0x2c, 0x25, 0xe3,
0x00, 0xea, 0x31, 0xd0, 0xf7, 0xa8, 0xeb, 0x13, 0xb4, 0x03, 0xf9, 0xd0, 0x26, 0x70, 0x95, 0x56,
0xad, 0x19, 0x33, 0x38, 0xa7, 0x36, 0xc1, 0xc2, 0x64, 0x9c, 0x80, 0xfe, 0xd6, 0x71, 0xed, 0x01,
0xa7, 0xcc, 0x1a, 0x93, 0xd0, 0xe0, 0x27, 0xee, 0x4f, 0xa0, 0x10, 0x62, 0x7c, 0x5d, 0xd9, 0xce,
0xad, 0xfa, 0x47, 0x36, 0xe3, 0xa7, 0x02, 0x9b, 0xab, 0x11, 0x22, 0xa6, 0x5b, 0x00, 0x74, 0xf8,
0x99, 0x8c, 0xf8, 0xc0, 0xf9, 0x1e, 0xb1, 0xc8, 0xe1, 0x05, 0x0d, 0x6a, 0x43, 0x7d, 0x44, 0x5d,
0xce, 0xac, 0x11, 0xef, 0x11, 0x77, 0xcc, 0xaf, 0xf4, 0xac, 0x60, 0xfa, 0xb0, 0x19, 0xd5, 0xa4,
0x19, 0xd7, 0xa4, 0xd9, 0x91, 0x35, 0xc1, 0x4b, 0x0e, 0xe8, 0x25, 0xe4, 0xa9, 0xc7, 0x7d, 0x3d,
0x27, 0x1c, 0x37, 0x13, 0x8a, 0xfd, 0xe8, 0xdb, 0xf7, 0x42, 0x2f, 0x1f, 0x0b, 0x90, 0xf1, 0x09,
0x2a, 0x21, 0xbf, 0xb6, 0x6d, 0x33, 0xe2, 0xfb, 0xe8, 0x10, 0xca, 0x9c, 0x59, 0xae, 0xef, 0x51,
0xc6, 0x05, 0xbb, 0x7a, 0x6b, 0x23, 0x95, 0xa3, 0x19, 0x5b, 0xf1, 0x1c, 0x88, 0x74, 0x50, 0xad,
0x28, 0x80, 0x60, 0x5b, 0xc6, 0xb1, 0x68, 0xfc, 0x51, 0xa0, 0x9e, 0x7e, 0x17, 0x1d, 0x03, 0x4c,
0xad, 0x6f, 0x3d, 0x8b, 0x13, 0x77, 0x34, 0x93, 0x7d, 0xb8, 0x27, 0xbb, 0x05, 0x30, 0x3a, 0x82,
0xda, 0xd4, 0x71, 0x31, 0xf1, 0x02, 0x2e, 0x8c, 0xb2, 0x36, 0x5a, 0xba, 0x0b, 0xc4, 0xc3, 0x69,
0x18, 0x32, 0xa0, 0x3a, 0x75, 0xdc, 0x81, 0x47, 0x88, 0xfd, 0x6e, 0xe8, 0x45, 0x95, 0xc9, 0xe1,
0x94, 0x2e, 0x1c, 0x21, 0x6b, 0x4a, 0x03, 0x97, 0xeb, 0x79, 0x61, 0x95, 0x12, 0x7a, 0x05, 0x55,
0x46, 0x7c, 0xce, 0x9c, 0x91, 0xa0, 0xaf, 0x17, 0x24, 0xe1, 0xf4, 0x93, 0x73, 0x00, 0x4e, 0xc1,
0x8d, 0x32, 0xa8, 0x92, 0x94, 0x61, 0x82, 0xb6, 0x0c, 0x46, 0x4f, 0xa1, 0x76, 0xc9, 0x08, 0x79,
0x6d, 0xb9, 0xf6, 0x57, 0xc7, 0xe6, 0x57, 0x72, 0x22, 0xd2, 0x4a, 0xd4, 0x80, 0x52, 0xa8, 0xe8,
0x38, 0xfe, 0xb5, 0x48, 0x39, 0x87, 0x13, 0xd9, 0xf8, 0xa5, 0x40, 0x3e, 0x0c, 0x8b, 0xea, 0x90,
0x75, 0x6c, 0x39, 0xff, 0x59, 0xc7, 0x46, 0xcd, 0x74, 0x53, 0x2a, 0xad, 0xf5, 0x14, 0x67, 0xd9,
0xf1, 0xa4, 0x55, 0x68, 0x17, 0xf2, 0x7c, 0xe6, 0x11, 0x51, 0x9c, 0x7a, 0x6b, 0x2d, 0xdd, 0xf5,
0x99, 0x47, 0xb0, 0x30, 0xaf, 0xd4, 0x23, 0xff, 0x7f, 0xf5, 0x60, 0x50, 0xbd, 0x08, 0x08, 0x9b,
0xc5, 0xfb, 0xb0, 0x0b, 0x45, 0x9f, 0xb8, 0x36, 0x61, 0xb7, 0x6f, 0xa4, 0x34, 0x86, 0x30, 0x6e,
0xb1, 0x31, 0xe1, 0x32, 0x97, 0x65, 0x58, 0x64, 0x44, 0xeb, 0x50, 0x98, 0x38, 0x53, 0x87, 0xcb,
0x0e, 0x47, 0x82, 0x61, 0x41, 0x4d, 0xbe, 0x29, 0xb7, 0xf8, 0x1f, 0x1f, 0x7d, 0x01, 0x25, 0x26,
0x5d, 0xf4, 0xec, 0x6d, 0xfb, 0x9e, 0x98, 0x8d, 0xdf, 0x0a, 0x54, 0x16, 0xb2, 0x46, 0xc7, 0x50,
0xa2, 0x1e, 0x61, 0x16, 0xa7, 0x4c, 0xae, 0xd1, 0xe3, 0xc4, 0x75, 0x01, 0xd7, 0xec, 0x4b, 0x10,
0x4e, 0xe0, 0xe8, 0x08, 0x54, 0xf1, 0xef, 0xda, 0x22, 0xd7, 0x7a, 0xeb, 0xd1, 0xdd, 0x9e, 0xae,
0x8d, 0x63, 0x70, 0x98, 0xfb, 0x8d, 0x35, 0x09, 0x48, 0x9c, 0xbb, 0x10, 0x8c, 0x43, 0x28, 0xc5,
0x6f, 0xa0, 0x22, 0x64, 0x7b, 0xa6, 0x96, 0x09, 0xbf, 0xdd, 0x0b, 0x4d, 0x09, 0xbf, 0xa7, 0xa6,
0x96, 0x45, 0x2a, 0xe4, 0x7a, 0x66, 0x57, 0xcb, 0x85, 0x3f, 0xa7, 0x66, 0x57, 0xcb, 0x1b, 0x7b,
0xa0, 0xca, 0xf8, 0x68, 0x6d, 0x69, 0x42, 0xb5, 0x0c, 0xaa, 0xce, 0xc7, 0x51, 0x53, 0xf6, 0x74,
0xa8, 0xa5, 0x0e, 0x43, 0x18, 0xc5, 0x7c, 0xf3, 0x41, 0xcb, 0xec, 0x19, 0x50, 0x8a, 0x87, 0x07,
0x95, 0xa1, 0xd0, 0xee, 0xbc, 0x3f, 0x3b, 0xd7, 0x32, 0xa8, 0x02, 0xea, 0xc0, 0xec, 0xe3, 0xf6,
0x69, 0x57, 0x53, 0x5a, 0x3f, 0x14, 0x50, 0xe5, 0x81, 0x40, 0xc7, 0x50, 0x8c, 0xae, 0x35, 0x9a,
0xdf, 0x9c, 0xd4, 0x9d, 0x6f, 0x6c, 0xae, 0xe8, 0x65, 0x47, 0x3f, 0x82, 0xb6, 0x7c, 0x71, 0xd1,
0x76, 0x02, 0xbe, 0xe3, 0x18, 0x37, 0x76, 0xee, 0x41, 0x44, 0x81, 0x5b, 0x27, 0x50, 0x88, 0xa2,
0x1d, 0x41, 0x41, 0x0c, 0x11, 0x7a, 0x90, 0x38, 0x2d, 0x0e, 0x72, 0x63, 0x63, 0x59, 0x1d, 0x05,
0x18, 0x16, 0xc5, 0x49, 0x3b, 0xf8, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x12, 0x37, 0xf1, 0x9b, 0xec,
0x06, 0x00, 0x00,
var fileDescriptor_overlay_b80a72cb582d0132 = []byte{
// 834 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x5f, 0x6f, 0xdc, 0x44,
0x10, 0x8f, 0xef, 0xff, 0xcd, 0xe5, 0x8c, 0x3b, 0x2a, 0x89, 0x39, 0x41, 0x95, 0x2e, 0x54, 0x94,
0x20, 0x5d, 0xa5, 0x6b, 0x15, 0x29, 0x12, 0x28, 0x4a, 0x49, 0x88, 0x2a, 0x8e, 0x84, 0x6e, 0x2c,
0xf1, 0xc4, 0x83, 0x73, 0xde, 0x5e, 0x4d, 0xee, 0xbc, 0x66, 0x77, 0x5d, 0x38, 0xbe, 0x11, 0x12,
0x9f, 0x89, 0xcf, 0xc1, 0x13, 0x42, 0xde, 0x5d, 0xfb, 0xce, 0x4e, 0xae, 0x82, 0x27, 0x7b, 0x66,
0x7e, 0xf3, 0xdb, 0xf9, 0x0f, 0x43, 0xfe, 0x8e, 0x89, 0x45, 0xb8, 0x1a, 0xa7, 0x82, 0x2b, 0x8e,
0x5d, 0x2b, 0x8e, 0x1e, 0xcd, 0x39, 0x9f, 0x2f, 0xd8, 0x33, 0xad, 0xbe, 0xc9, 0xde, 0x3c, 0x8b,
0x32, 0x11, 0xaa, 0x98, 0x27, 0x06, 0x48, 0x3e, 0x87, 0xe1, 0x94, 0xf3, 0xdb, 0x2c, 0xa5, 0xec,
0x97, 0x8c, 0x49, 0x85, 0x7b, 0xd0, 0x49, 0x78, 0xc4, 0x5e, 0x9d, 0xf9, 0xce, 0x81, 0xf3, 0xb4,
0x4f, 0xad, 0x44, 0x9e, 0x83, 0x5b, 0x00, 0x65, 0xca, 0x13, 0xc9, 0xf0, 0x31, 0xb4, 0x72, 0x9b,
0xc6, 0x0d, 0x26, 0xc3, 0x71, 0x11, 0xc1, 0x25, 0x8f, 0x18, 0xd5, 0x26, 0x72, 0xb9, 0x76, 0xd2,
0xec, 0x12, 0xbf, 0x82, 0xe1, 0x42, 0x6b, 0x84, 0xd1, 0xf8, 0xce, 0x41, 0xf3, 0xe9, 0x60, 0xb2,
0x57, 0x7a, 0x57, 0xf0, 0xb4, 0x0a, 0x26, 0x14, 0x3e, 0xa8, 0x06, 0x21, 0xf1, 0x04, 0xdc, 0x02,
0x63, 0x54, 0x96, 0x71, 0xff, 0x0e, 0xa3, 0x31, 0xd3, 0x1a, 0x9c, 0x9c, 0x80, 0xff, 0x6d, 0x9c,
0x44, 0xd7, 0x8a, 0x8b, 0x70, 0xce, 0xf2, 0xe0, 0x65, 0x99, 0xe2, 0xa7, 0xd0, 0xce, 0xf3, 0x90,
0x96, 0xb3, 0x96, 0xa3, 0xb1, 0x91, 0x3f, 0x1c, 0xd8, 0xbf, 0xcb, 0x60, 0xaa, 0xf9, 0x08, 0x80,
0xdf, 0xfc, 0xcc, 0x66, 0xea, 0x3a, 0xfe, 0xdd, 0x54, 0xaa, 0x49, 0x37, 0x34, 0x78, 0x0a, 0xee,
0x8c, 0x27, 0x4a, 0x84, 0x33, 0x35, 0x65, 0xc9, 0x5c, 0xbd, 0xf5, 0x1b, 0xba, 0x9a, 0x1f, 0x8d,
0x4d, 0xdf, 0xc6, 0x45, 0xdf, 0xc6, 0x67, 0xb6, 0x6f, 0xb4, 0xe6, 0x80, 0x5f, 0x42, 0x8b, 0xa7,
0x4a, 0xfa, 0x4d, 0xed, 0xb8, 0x4e, 0xfb, 0xca, 0x7c, 0xaf, 0xd2, 0xdc, 0x4b, 0x52, 0x0d, 0x22,
0x3f, 0xc1, 0x20, 0x8f, 0xef, 0x34, 0x8a, 0x04, 0x93, 0x12, 0x5f, 0x40, 0x5f, 0x89, 0x30, 0x91,
0x29, 0x17, 0x4a, 0x47, 0xe7, 0x6e, 0x74, 0x22, 0x07, 0x06, 0x85, 0x95, 0xae, 0x81, 0xe8, 0x43,
0x37, 0x34, 0x04, 0x3a, 0xda, 0x3e, 0x2d, 0x44, 0xf2, 0x8f, 0x03, 0x6e, 0xf5, 0x5d, 0x3c, 0x06,
0x58, 0x86, 0xbf, 0x4d, 0x43, 0xc5, 0x92, 0xd9, 0xca, 0xce, 0xca, 0x7b, 0xb2, 0xdb, 0x00, 0xe3,
0x11, 0x0c, 0x97, 0x71, 0x42, 0x59, 0x9a, 0x29, 0x6d, 0xb4, 0xb5, 0xf1, 0xaa, 0x5d, 0x60, 0x29,
0xad, 0xc2, 0x90, 0xc0, 0xee, 0x32, 0x4e, 0xae, 0x53, 0xc6, 0xa2, 0xef, 0x6e, 0x52, 0x53, 0x99,
0x26, 0xad, 0xe8, 0xf2, 0x31, 0x0f, 0x97, 0x3c, 0x4b, 0x94, 0xdf, 0xd2, 0x56, 0x2b, 0xe1, 0xd7,
0xb0, 0x2b, 0x98, 0x54, 0x22, 0x9e, 0xe9, 0xf0, 0xfd, 0xb6, 0x0d, 0xb8, 0xfa, 0xe4, 0x1a, 0x40,
0x2b, 0x70, 0xd2, 0x87, 0xae, 0x0d, 0x8a, 0x04, 0xe0, 0xd5, 0xc1, 0xf8, 0x19, 0x0c, 0xdf, 0x08,
0xc6, 0x5e, 0x86, 0x49, 0xf4, 0x6b, 0x1c, 0xa9, 0xb7, 0x76, 0x22, 0xaa, 0x4a, 0x1c, 0x41, 0x2f,
0x57, 0x9c, 0xc5, 0xf2, 0x56, 0xa7, 0xdc, 0xa4, 0xa5, 0x4c, 0xfe, 0x74, 0xa0, 0x95, 0xd3, 0xa2,
0x0b, 0x8d, 0x38, 0xb2, 0x3b, 0xda, 0x88, 0x23, 0x1c, 0x57, 0x9b, 0x32, 0x98, 0x3c, 0xac, 0xc4,
0x6c, 0x3b, 0x5e, 0xb6, 0x0a, 0x9f, 0x40, 0x4b, 0xad, 0x52, 0xa6, 0x8b, 0xe3, 0x4e, 0x1e, 0x54,
0xbb, 0xbe, 0x4a, 0x19, 0xd5, 0xe6, 0x3b, 0xf5, 0x68, 0xfd, 0xbf, 0x7a, 0x08, 0xd8, 0x7d, 0x9d,
0x31, 0xb1, 0x2a, 0xf6, 0xe1, 0x09, 0x74, 0x24, 0x4b, 0x22, 0x26, 0xee, 0xbf, 0x1a, 0xd6, 0x98,
0xc3, 0x54, 0x28, 0xe6, 0x4c, 0xd9, 0x5c, 0xea, 0x30, 0x63, 0xc4, 0x87, 0xd0, 0x5e, 0xc4, 0xcb,
0x58, 0xd9, 0x0e, 0x1b, 0x81, 0x84, 0x30, 0xb4, 0x6f, 0xda, 0x2d, 0xfe, 0x8f, 0x8f, 0x7e, 0x01,
0xbd, 0xf2, 0x86, 0x34, 0xee, 0xdb, 0xf7, 0xd2, 0x4c, 0xfe, 0x76, 0x60, 0xb0, 0x91, 0x35, 0x1e,
0x43, 0x8f, 0xa7, 0x4c, 0x84, 0x8a, 0x0b, 0xbb, 0x46, 0x9f, 0x94, 0xae, 0x1b, 0xb8, 0xf1, 0x95,
0x05, 0xd1, 0x12, 0x8e, 0x47, 0xd0, 0xd5, 0xff, 0x49, 0xa4, 0x73, 0x75, 0x27, 0x1f, 0x6f, 0xf7,
0x4c, 0x22, 0x5a, 0x80, 0xf3, 0xdc, 0xdf, 0x85, 0x8b, 0x8c, 0x15, 0xb9, 0x6b, 0x81, 0xbc, 0x80,
0x5e, 0xf1, 0x06, 0x76, 0xa0, 0x31, 0x0d, 0xbc, 0x9d, 0xfc, 0x7b, 0xfe, 0xda, 0x73, 0xf2, 0xef,
0x45, 0xe0, 0x35, 0xb0, 0x0b, 0xcd, 0x69, 0x70, 0xee, 0x35, 0xf3, 0x9f, 0x8b, 0xe0, 0xdc, 0x6b,
0x91, 0x43, 0xe8, 0x5a, 0x7e, 0x7c, 0x50, 0x9b, 0x50, 0x6f, 0x07, 0x77, 0xd7, 0xe3, 0xe8, 0x39,
0x87, 0x3e, 0x0c, 0x2b, 0x87, 0x21, 0x67, 0x09, 0xbe, 0xf9, 0xc1, 0xdb, 0x39, 0x24, 0xd0, 0x2b,
0x86, 0x07, 0xfb, 0xd0, 0x3e, 0x3d, 0xfb, 0xfe, 0xd5, 0xa5, 0xb7, 0x83, 0x03, 0xe8, 0x5e, 0x07,
0x57, 0xf4, 0xf4, 0xe2, 0xdc, 0x73, 0x26, 0x7f, 0x39, 0xd0, 0xb5, 0x07, 0x02, 0x8f, 0xa1, 0x63,
0x4e, 0x33, 0x6e, 0xb9, 0xfe, 0xa3, 0x6d, 0x37, 0x1c, 0x4f, 0x00, 0x5e, 0x66, 0x8b, 0x5b, 0xeb,
0xbe, 0x7f, 0xbf, 0xbb, 0x1c, 0xf9, 0x5b, 0xfc, 0x25, 0xfe, 0x08, 0x5e, 0xfd, 0x64, 0xe3, 0x41,
0x89, 0xde, 0x72, 0xcd, 0x47, 0x8f, 0xdf, 0x83, 0x30, 0xcc, 0x93, 0x13, 0x68, 0x1b, 0xb6, 0x23,
0x68, 0xeb, 0x29, 0xc4, 0x0f, 0x4b, 0xa7, 0xcd, 0x4d, 0x18, 0xed, 0xd5, 0xd5, 0x86, 0xe0, 0xa6,
0xa3, 0x6f, 0xe2, 0xf3, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xd1, 0xe3, 0xe7, 0x32, 0xd1, 0x07,
0x00, 0x00,
}

View File

@ -16,6 +16,8 @@ enum NodeTransport {
service Overlay {
// Lookup finds a nodes address from the network
rpc Lookup(LookupRequest) returns (LookupResponse);
// BulkLookup finds nodes addresses from the network
rpc BulkLookup(LookupRequests) returns (LookupResponses);
// FindStorageNodes finds a list of nodes in the network that meet the specified request parameters
rpc FindStorageNodes(FindStorageNodesRequest) returns (FindStorageNodesResponse);
}
@ -34,6 +36,17 @@ message LookupResponse {
Node node = 1;
}
//LookupRequests is a list of LookupRequest
message LookupRequests {
repeated LookupRequest lookuprequest = 1;
}
//LookupResponse is a list of LookupResponse
message LookupResponses {
repeated LookupResponse lookupresponse = 1;
}
// FindStorageNodesResponse is is response message for the FindStorageNodes rpc call
message FindStorageNodesResponse {
repeated Node nodes = 1;

View File

@ -1,5 +1,5 @@
#!/bin/bash
#!/bin/bash
set -ueo pipefail
go install -v storj.io/storj/cmd/captplanet
captplanet setup

View File

@ -122,11 +122,14 @@ func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
err := client.view(func(bucket *bolt.Bucket) error {
for _, key := range keys {
val := bucket.Get([]byte(key))
if val == nil {
vals = append(vals, nil)
continue
}
vals = append(vals, storage.CloneValue(storage.Value(val)))
}
return nil
})
return vals, err
}

View File

@ -110,14 +110,18 @@ func (client *Client) GetAll(keys storage.Keys) (storage.Values, error) {
if err != nil {
return nil, err
}
values := []storage.Value{}
for _, result := range results {
s, ok := result.(string)
if !ok {
return nil, Error.New("invalid result type %T", result)
if result == nil {
values = append(values, nil)
} else {
s, ok := result.(string)
if !ok {
return nil, Error.New("invalid result type %T", result)
}
values = append(values, storage.Value(s))
}
values = append(values, storage.Value(s))
}
return values, nil
}