Remove overlay client (#1510)
This commit is contained in:
parent
a24c74c502
commit
56251570ef
@ -13,12 +13,10 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/auth/signing"
|
||||
"storj.io/storj/pkg/cfgstruct"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
@ -153,18 +151,6 @@ func (uplink *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*pi
|
||||
return piecestore.NewClient(uplink.Log.Named("uplink>piecestore"), signer, conn, piecestore.DefaultConfig), nil
|
||||
}
|
||||
|
||||
// DialOverlay dials destination and returns an overlay.Client
|
||||
func (uplink *Uplink) DialOverlay(destination Peer) (overlay.Client, error) {
|
||||
info := destination.Local()
|
||||
conn, err := uplink.Transport.DialNode(context.Background(), &info, grpc.WithBlock())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: handle disconnect
|
||||
return overlay.NewClientFrom(pb.NewOverlayClient(conn)), nil
|
||||
}
|
||||
|
||||
// Upload data to specific satellite
|
||||
func (uplink *Uplink) Upload(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path, data []byte) error {
|
||||
config := uplink.getConfig(satellite)
|
||||
|
@ -1,120 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package overlay
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
// Client is the interface that defines an overlay client.
|
||||
//
|
||||
// Choose returns a list of storage NodeID's that fit the provided criteria.
|
||||
// limit is the maximum number of nodes to be returned.
|
||||
// space is the storage and bandwidth requested consumption in bytes.
|
||||
//
|
||||
// Lookup finds a Node with the provided identifier.
|
||||
|
||||
// ClientError creates class of errors for stack traces
|
||||
var ClientError = errs.Class("Client Error")
|
||||
|
||||
//Client implements the Overlay Client interface
|
||||
type Client interface {
|
||||
Choose(ctx context.Context, op Options) ([]*pb.Node, error)
|
||||
Lookup(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error)
|
||||
BulkLookup(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
|
||||
}
|
||||
|
||||
// client is the overlay concrete implementation of the client interface
|
||||
type client struct {
|
||||
conn pb.OverlayClient
|
||||
}
|
||||
|
||||
// Options contains parameters for selecting nodes
|
||||
type Options struct {
|
||||
Amount int
|
||||
Space int64
|
||||
Bandwidth int64
|
||||
Uptime float64
|
||||
UptimeCount int64
|
||||
AuditSuccess float64
|
||||
AuditCount int64
|
||||
Excluded storj.NodeIDList
|
||||
}
|
||||
|
||||
// NewClient returns a new intialized Overlay Client
|
||||
func NewClient(tc transport.Client, address string) (Client, error) {
|
||||
return NewClientContext(context.TODO(), tc, address)
|
||||
}
|
||||
|
||||
// NewClientContext returns a new intialized Overlay Client
|
||||
func NewClientContext(ctx context.Context, tc transport.Client, address string) (Client, error) {
|
||||
conn, err := tc.DialAddress(ctx, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &client{
|
||||
conn: pb.NewOverlayClient(conn),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewClientFrom returns a new overlay.Client from a connection
|
||||
func NewClientFrom(conn pb.OverlayClient) Client { return &client{conn} }
|
||||
|
||||
// a compiler trick to make sure *client implements Client
|
||||
var _ Client = (*client)(nil)
|
||||
|
||||
// Choose returns nodes based on Options
|
||||
func (client *client) Choose(ctx context.Context, op Options) ([]*pb.Node, error) {
|
||||
var exIDs storj.NodeIDList
|
||||
exIDs = append(exIDs, op.Excluded...)
|
||||
// TODO(coyle): We will also need to communicate with the reputation service here
|
||||
resp, err := client.conn.FindStorageNodes(ctx, &pb.FindStorageNodesRequest{
|
||||
Opts: &pb.OverlayOptions{
|
||||
Amount: int64(op.Amount),
|
||||
Restrictions: &pb.NodeRestrictions{FreeDisk: op.Space, FreeBandwidth: op.Bandwidth},
|
||||
ExcludedNodes: exIDs,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return resp.GetNodes(), nil
|
||||
}
|
||||
|
||||
// Lookup provides a Node with the given ID
|
||||
func (client *client) Lookup(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) {
|
||||
resp, err := client.conn.Lookup(ctx, &pb.LookupRequest{NodeId: nodeID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.GetNode(), nil
|
||||
}
|
||||
|
||||
// BulkLookup provides a list of Nodes with the given IDs
|
||||
func (client *client) BulkLookup(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) {
|
||||
var reqs pb.LookupRequests
|
||||
for _, v := range nodeIDs {
|
||||
reqs.LookupRequest = append(reqs.LookupRequest, &pb.LookupRequest{NodeId: v})
|
||||
}
|
||||
resp, err := client.conn.BulkLookup(ctx, &reqs)
|
||||
|
||||
if err != nil {
|
||||
return nil, ClientError.Wrap(err)
|
||||
}
|
||||
|
||||
var nodes []*pb.Node
|
||||
for _, v := range resp.LookupResponse {
|
||||
nodes = append(nodes, v.Node)
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
@ -1,191 +0,0 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package overlay_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
func TestChoose(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 8, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
cases := []struct {
|
||||
limit int
|
||||
space int64
|
||||
bandwidth int64
|
||||
}{
|
||||
{
|
||||
limit: 4,
|
||||
space: 0,
|
||||
bandwidth: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, v := range cases {
|
||||
newNodes, err := oc.Choose(ctx, overlay.Options{
|
||||
Amount: v.limit,
|
||||
Space: v.space,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, newNodes, v.limit)
|
||||
for _, n := range newNodes {
|
||||
assert.True(t, n.GetRestrictions().GetFreeDisk() >= v.space)
|
||||
assert.True(t, n.GetRestrictions().GetFreeBandwidth() >= v.bandwidth)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestLookup(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
nid1 := planet.StorageNodes[0].ID()
|
||||
|
||||
cases := []struct {
|
||||
nodeID storj.NodeID
|
||||
expectErr bool
|
||||
}{
|
||||
{
|
||||
nodeID: nid1,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
nodeID: storj.NodeID{1},
|
||||
expectErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, v := range cases {
|
||||
n, err := oc.Lookup(ctx, v.nodeID)
|
||||
if v.expectErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
if assert.NotNil(t, n) {
|
||||
assert.Equal(t, v.nodeID.String(), n.Id.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestBulkLookup(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
nid1 := planet.StorageNodes[0].ID()
|
||||
nid2 := planet.StorageNodes[1].ID()
|
||||
nid3 := planet.StorageNodes[2].ID()
|
||||
|
||||
cases := []struct {
|
||||
nodeIDs storj.NodeIDList
|
||||
expectedCalls int
|
||||
}{
|
||||
{
|
||||
nodeIDs: storj.NodeIDList{nid1, nid2, nid3},
|
||||
expectedCalls: 1,
|
||||
},
|
||||
}
|
||||
for _, v := range cases {
|
||||
resNodes, err := oc.BulkLookup(ctx, v.nodeIDs)
|
||||
assert.NoError(t, err)
|
||||
for i, n := range resNodes {
|
||||
if assert.NotNil(t, n) {
|
||||
assert.Equal(t, v.nodeIDs[i], n.Id)
|
||||
}
|
||||
}
|
||||
assert.Equal(t, len(v.nodeIDs), len(resNodes))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestBulkLookupV2(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
oc, err := planet.Uplinks[0].DialOverlay(planet.Satellites[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
nid1 := planet.StorageNodes[0].ID()
|
||||
nid2 := planet.StorageNodes[1].ID()
|
||||
nid3 := planet.StorageNodes[2].ID()
|
||||
nid4 := storj.NodeID{4}
|
||||
nid5 := storj.NodeID{5}
|
||||
|
||||
n1 := &pb.Node{Id: nid1}
|
||||
n2 := &pb.Node{Id: nid2}
|
||||
n3 := &pb.Node{Id: nid3}
|
||||
|
||||
{ // empty id
|
||||
_, err := oc.BulkLookup(ctx, storj.NodeIDList{})
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
{ // valid ids
|
||||
idList := storj.NodeIDList{nid1, nid2, nid3}
|
||||
ns, err := oc.BulkLookup(ctx, idList)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i, n := range ns {
|
||||
if assert.NotNil(t, n) {
|
||||
assert.Equal(t, idList[i], n.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{ // missing ids
|
||||
idList := storj.NodeIDList{nid4, nid5}
|
||||
ns, err := oc.BulkLookup(ctx, idList)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, []*pb.Node{nil, nil}, ns)
|
||||
}
|
||||
|
||||
{ // different order and missing
|
||||
idList := storj.NodeIDList{nid3, nid4, nid1, nid2, nid5}
|
||||
ns, err := oc.BulkLookup(ctx, idList)
|
||||
assert.NoError(t, err)
|
||||
|
||||
expectedNodes := []*pb.Node{n3, nil, n1, n2, nil}
|
||||
for i, n := range ns {
|
||||
if n == nil {
|
||||
assert.Nil(t, n)
|
||||
} else {
|
||||
if assert.NotNil(t, n) {
|
||||
assert.Equal(t, expectedNodes[i].Id, n.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: ./pkg/overlay/client.go
|
||||
|
||||
// Package mock_overlay is a generated GoMock package.
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
|
||||
x "storj.io/storj/pkg/overlay"
|
||||
pb "storj.io/storj/pkg/pb"
|
||||
storj "storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// MockClient is a mock of Client interface
|
||||
type MockClient struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockClientMockRecorder
|
||||
}
|
||||
|
||||
// MockClientMockRecorder is the mock recorder for MockClient
|
||||
type MockClientMockRecorder struct {
|
||||
mock *MockClient
|
||||
}
|
||||
|
||||
// NewMockClient creates a new mock instance
|
||||
func NewMockClient(ctrl *gomock.Controller) *MockClient {
|
||||
mock := &MockClient{ctrl: ctrl}
|
||||
mock.recorder = &MockClientMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockClient) EXPECT() *MockClientMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Choose mocks base method
|
||||
func (m *MockClient) Choose(ctx context.Context, op x.Options) ([]*pb.Node, error) {
|
||||
ret := m.ctrl.Call(m, "Choose", ctx, op)
|
||||
ret0, _ := ret[0].([]*pb.Node)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Choose indicates an expected call of Choose
|
||||
func (mr *MockClientMockRecorder) Choose(ctx, op interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Choose", reflect.TypeOf((*MockClient)(nil).Choose), ctx, op)
|
||||
}
|
||||
|
||||
// Lookup mocks base method
|
||||
func (m *MockClient) Lookup(ctx context.Context, nodeID storj.NodeID) (*pb.Node, error) {
|
||||
ret := m.ctrl.Call(m, "Lookup", ctx, nodeID)
|
||||
ret0, _ := ret[0].(*pb.Node)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Lookup indicates an expected call of Lookup
|
||||
func (mr *MockClientMockRecorder) Lookup(ctx, nodeID interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lookup", reflect.TypeOf((*MockClient)(nil).Lookup), ctx, nodeID)
|
||||
}
|
||||
|
||||
// BulkLookup mocks base method
|
||||
func (m *MockClient) BulkLookup(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) {
|
||||
ret := m.ctrl.Call(m, "BulkLookup", ctx, nodeIDs)
|
||||
ret0, _ := ret[0].([]*pb.Node)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// BulkLookup indicates an expected call of BulkLookup
|
||||
func (mr *MockClientMockRecorder) BulkLookup(ctx, nodeIDs interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BulkLookup", reflect.TypeOf((*MockClient)(nil).BulkLookup), ctx, nodeIDs)
|
||||
}
|
Loading…
Reference in New Issue
Block a user