satellite/discovery: remove discovery related code (#3175)

This commit is contained in:
Jennifer Li Johnson 2019-10-14 10:57:01 -04:00 committed by GitHub
parent 96aeedcdee
commit b185dbbee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 42 additions and 1146 deletions

View File

@ -154,6 +154,7 @@ func networkTest(flags *Flags, command string, args []string) error {
if printCommands {
fmt.Fprintf(processes.Output, "exec: %v\n", strings.Join(cmd.Args, " "))
}
time.Sleep(6 * time.Second) //hack: this is so the contact chore can send the satellite the capacity info on its second iteration after 5s
errRun := cmd.Run()
cancel()

View File

@ -91,9 +91,6 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C
planet.Start(ctx)
// make sure nodes are refreshed in db
planet.Satellites[0].Discovery.Service.Refresh.TriggerWait()
test(t, ctx, planet)
})
}

View File

@ -23,7 +23,6 @@ import (
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/discovery"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/mailservice"
@ -120,11 +119,6 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
},
UpdateStatsBatchSize: 100,
},
Discovery: discovery.Config{
RefreshInterval: 1 * time.Second,
RefreshLimit: 100,
RefreshConcurrency: 2,
},
Metainfo: metainfo.Config{
DatabaseURL: "", // not used
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024

View File

@ -33,12 +33,8 @@ func RunPlanet(t *testing.T, run func(ctx *testcontext.Context, planet *testplan
)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
// make sure nodes are refreshed in db
planet.Satellites[0].Discovery.Service.Refresh.TriggerWait()
run(ctx, planet)
}

View File

@ -4,13 +4,10 @@
package pb
import (
context "context"
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
drpc "storj.io/drpc"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -55,7 +52,7 @@ func (x Restriction_Operator) String() string {
}
func (Restriction_Operator) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{6, 0}
return fileDescriptor_61fc82527fbe24ad, []int{1, 0}
}
type Restriction_Operand int32
@ -80,216 +77,9 @@ func (x Restriction_Operand) String() string {
}
func (Restriction_Operand) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{6, 1}
return fileDescriptor_61fc82527fbe24ad, []int{1, 1}
}
type QueryRequest struct {
Sender *Node `protobuf:"bytes,1,opt,name=sender,proto3" json:"sender,omitempty"`
Target *Node `protobuf:"bytes,2,opt,name=target,proto3" json:"target,omitempty"`
Limit int64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"`
Pingback bool `protobuf:"varint,4,opt,name=pingback,proto3" json:"pingback,omitempty"`
Vouchers []*Voucher `protobuf:"bytes,5,rep,name=vouchers,proto3" json:"vouchers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *QueryRequest) Reset() { *m = QueryRequest{} }
func (m *QueryRequest) String() string { return proto.CompactTextString(m) }
func (*QueryRequest) ProtoMessage() {}
func (*QueryRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{0}
}
func (m *QueryRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryRequest.Unmarshal(m, b)
}
func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_QueryRequest.Marshal(b, m, deterministic)
}
func (m *QueryRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_QueryRequest.Merge(m, src)
}
func (m *QueryRequest) XXX_Size() int {
return xxx_messageInfo_QueryRequest.Size(m)
}
func (m *QueryRequest) XXX_DiscardUnknown() {
xxx_messageInfo_QueryRequest.DiscardUnknown(m)
}
var xxx_messageInfo_QueryRequest proto.InternalMessageInfo
func (m *QueryRequest) GetSender() *Node {
if m != nil {
return m.Sender
}
return nil
}
func (m *QueryRequest) GetTarget() *Node {
if m != nil {
return m.Target
}
return nil
}
func (m *QueryRequest) GetLimit() int64 {
if m != nil {
return m.Limit
}
return 0
}
func (m *QueryRequest) GetPingback() bool {
if m != nil {
return m.Pingback
}
return false
}
func (m *QueryRequest) GetVouchers() []*Voucher {
if m != nil {
return m.Vouchers
}
return nil
}
type QueryResponse struct {
Sender *Node `protobuf:"bytes,1,opt,name=sender,proto3" json:"sender,omitempty"`
Response []*Node `protobuf:"bytes,2,rep,name=response,proto3" json:"response,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *QueryResponse) Reset() { *m = QueryResponse{} }
func (m *QueryResponse) String() string { return proto.CompactTextString(m) }
func (*QueryResponse) ProtoMessage() {}
func (*QueryResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{1}
}
func (m *QueryResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryResponse.Unmarshal(m, b)
}
func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_QueryResponse.Marshal(b, m, deterministic)
}
func (m *QueryResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_QueryResponse.Merge(m, src)
}
func (m *QueryResponse) XXX_Size() int {
return xxx_messageInfo_QueryResponse.Size(m)
}
func (m *QueryResponse) XXX_DiscardUnknown() {
xxx_messageInfo_QueryResponse.DiscardUnknown(m)
}
var xxx_messageInfo_QueryResponse proto.InternalMessageInfo
func (m *QueryResponse) GetSender() *Node {
if m != nil {
return m.Sender
}
return nil
}
func (m *QueryResponse) GetResponse() []*Node {
if m != nil {
return m.Response
}
return nil
}
type PingRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PingRequest) Reset() { *m = PingRequest{} }
func (m *PingRequest) String() string { return proto.CompactTextString(m) }
func (*PingRequest) ProtoMessage() {}
func (*PingRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{2}
}
func (m *PingRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingRequest.Unmarshal(m, b)
}
func (m *PingRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PingRequest.Marshal(b, m, deterministic)
}
func (m *PingRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_PingRequest.Merge(m, src)
}
func (m *PingRequest) XXX_Size() int {
return xxx_messageInfo_PingRequest.Size(m)
}
func (m *PingRequest) XXX_DiscardUnknown() {
xxx_messageInfo_PingRequest.DiscardUnknown(m)
}
var xxx_messageInfo_PingRequest proto.InternalMessageInfo
type PingResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PingResponse) Reset() { *m = PingResponse{} }
func (m *PingResponse) String() string { return proto.CompactTextString(m) }
func (*PingResponse) ProtoMessage() {}
func (*PingResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{3}
}
func (m *PingResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingResponse.Unmarshal(m, b)
}
func (m *PingResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PingResponse.Marshal(b, m, deterministic)
}
func (m *PingResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_PingResponse.Merge(m, src)
}
func (m *PingResponse) XXX_Size() int {
return xxx_messageInfo_PingResponse.Size(m)
}
func (m *PingResponse) XXX_DiscardUnknown() {
xxx_messageInfo_PingResponse.DiscardUnknown(m)
}
var xxx_messageInfo_PingResponse proto.InternalMessageInfo
// TODO: add fields that validate who is requesting the info
type InfoRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *InfoRequest) Reset() { *m = InfoRequest{} }
func (m *InfoRequest) String() string { return proto.CompactTextString(m) }
func (*InfoRequest) ProtoMessage() {}
func (*InfoRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{4}
}
func (m *InfoRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InfoRequest.Unmarshal(m, b)
}
func (m *InfoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_InfoRequest.Marshal(b, m, deterministic)
}
func (m *InfoRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_InfoRequest.Merge(m, src)
}
func (m *InfoRequest) XXX_Size() int {
return xxx_messageInfo_InfoRequest.Size(m)
}
func (m *InfoRequest) XXX_DiscardUnknown() {
xxx_messageInfo_InfoRequest.DiscardUnknown(m)
}
var xxx_messageInfo_InfoRequest proto.InternalMessageInfo
type InfoResponse struct {
Type NodeType `protobuf:"varint,2,opt,name=type,proto3,enum=node.NodeType" json:"type,omitempty"`
Operator *NodeOperator `protobuf:"bytes,3,opt,name=operator,proto3" json:"operator,omitempty"`
@ -304,7 +94,7 @@ func (m *InfoResponse) Reset() { *m = InfoResponse{} }
func (m *InfoResponse) String() string { return proto.CompactTextString(m) }
func (*InfoResponse) ProtoMessage() {}
func (*InfoResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{5}
return fileDescriptor_61fc82527fbe24ad, []int{0}
}
func (m *InfoResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InfoResponse.Unmarshal(m, b)
@ -365,7 +155,7 @@ func (m *Restriction) Reset() { *m = Restriction{} }
func (m *Restriction) String() string { return proto.CompactTextString(m) }
func (*Restriction) ProtoMessage() {}
func (*Restriction) Descriptor() ([]byte, []int) {
return fileDescriptor_61fc82527fbe24ad, []int{6}
return fileDescriptor_61fc82527fbe24ad, []int{1}
}
func (m *Restriction) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Restriction.Unmarshal(m, b)
@ -409,11 +199,6 @@ func (m *Restriction) GetValue() int64 {
func init() {
proto.RegisterEnum("overlay.Restriction_Operator", Restriction_Operator_name, Restriction_Operator_value)
proto.RegisterEnum("overlay.Restriction_Operand", Restriction_Operand_name, Restriction_Operand_value)
proto.RegisterType((*QueryRequest)(nil), "overlay.QueryRequest")
proto.RegisterType((*QueryResponse)(nil), "overlay.QueryResponse")
proto.RegisterType((*PingRequest)(nil), "overlay.PingRequest")
proto.RegisterType((*PingResponse)(nil), "overlay.PingResponse")
proto.RegisterType((*InfoRequest)(nil), "overlay.InfoRequest")
proto.RegisterType((*InfoResponse)(nil), "overlay.InfoResponse")
proto.RegisterType((*Restriction)(nil), "overlay.Restriction")
}
@ -421,317 +206,26 @@ func init() {
func init() { proto.RegisterFile("overlay.proto", fileDescriptor_61fc82527fbe24ad) }
var fileDescriptor_61fc82527fbe24ad = []byte{
// 518 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x8b, 0xd3, 0x40,
0x14, 0xc7, 0x77, 0xfa, 0x33, 0xbe, 0xfe, 0x20, 0x3b, 0xec, 0x4a, 0x28, 0x0a, 0x25, 0x07, 0x29,
0xa8, 0x39, 0x64, 0x65, 0x41, 0x6f, 0xae, 0x8d, 0x6b, 0x71, 0x59, 0xdd, 0x31, 0xac, 0xa0, 0x07,
0x49, 0x93, 0x31, 0x06, 0xeb, 0x4c, 0x9c, 0x4c, 0x0b, 0xf9, 0xaf, 0xbc, 0x79, 0xf4, 0xff, 0xf2,
0x24, 0x33, 0x99, 0xa4, 0x71, 0x55, 0xf0, 0x94, 0x79, 0xef, 0xfb, 0x79, 0x9d, 0x6f, 0x5e, 0xbf,
0x81, 0x09, 0xdf, 0x51, 0xb1, 0x89, 0x4a, 0x2f, 0x17, 0x5c, 0x72, 0x3c, 0x34, 0xe5, 0x0c, 0x52,
0x9e, 0xf2, 0xaa, 0x39, 0x03, 0xc6, 0x13, 0x6a, 0xce, 0xd3, 0x1d, 0xdf, 0xc6, 0x9f, 0xa8, 0x28,
0xaa, 0xda, 0xfd, 0x8e, 0x60, 0x7c, 0xb5, 0xa5, 0xa2, 0x24, 0xf4, 0xeb, 0x96, 0x16, 0x12, 0xbb,
0x30, 0x28, 0x28, 0x4b, 0xa8, 0x70, 0xd0, 0x1c, 0x2d, 0x46, 0x3e, 0x78, 0x7a, 0xfa, 0x92, 0x27,
0x94, 0x18, 0x45, 0x31, 0x32, 0x12, 0x29, 0x95, 0x4e, 0xe7, 0x4f, 0xa6, 0x52, 0xf0, 0x11, 0xf4,
0x37, 0xd9, 0x97, 0x4c, 0x3a, 0xdd, 0x39, 0x5a, 0x74, 0x49, 0x55, 0xe0, 0x19, 0x58, 0x79, 0xc6,
0xd2, 0x75, 0x14, 0x7f, 0x76, 0x7a, 0x73, 0xb4, 0xb0, 0x48, 0x53, 0xe3, 0x87, 0x60, 0xd5, 0xe6,
0x9c, 0xfe, 0xbc, 0xbb, 0x18, 0xf9, 0x87, 0x5e, 0xe3, 0xf6, 0xba, 0x3a, 0x90, 0x06, 0x71, 0xdf,
0xc3, 0xc4, 0x18, 0x2f, 0x72, 0xce, 0x0a, 0xfa, 0x5f, 0xce, 0xef, 0x81, 0x25, 0x0c, 0xef, 0x74,
0xf4, 0x1d, 0x6d, 0xaa, 0xd1, 0xdc, 0x09, 0x8c, 0x5e, 0x67, 0x2c, 0x35, 0x4b, 0x71, 0xa7, 0x30,
0xae, 0xca, 0xbd, 0xbc, 0x62, 0x1f, 0x79, 0x2d, 0xff, 0x40, 0x30, 0xae, 0xea, 0xc6, 0x4a, 0x4f,
0x96, 0x39, 0xd5, 0xeb, 0x99, 0xfa, 0xd3, 0xfd, 0x15, 0x61, 0x99, 0x53, 0xa2, 0x35, 0xec, 0x81,
0xc5, 0x73, 0x2a, 0x22, 0xc9, 0x85, 0xde, 0xd1, 0xc8, 0xc7, 0x7b, 0xee, 0x95, 0x51, 0x48, 0xc3,
0x28, 0x3e, 0x8e, 0xf2, 0x28, 0xce, 0x64, 0xa9, 0x57, 0xf7, 0x1b, 0xff, 0xcc, 0x28, 0xa4, 0x61,
0xf0, 0x7d, 0x18, 0xee, 0xa8, 0x28, 0x32, 0xce, 0x9c, 0xbe, 0xc6, 0x0f, 0xf7, 0xf8, 0x75, 0x25,
0x90, 0x9a, 0x70, 0x7f, 0x22, 0x18, 0x11, 0x5a, 0x48, 0x91, 0xc5, 0x32, 0xe3, 0x0c, 0x3f, 0x6e,
0x99, 0x43, 0xfa, 0x25, 0xee, 0x7a, 0x75, 0xd2, 0x5a, 0x9c, 0xf7, 0x17, 0x9f, 0xa7, 0x30, 0xd4,
0x67, 0x96, 0x98, 0xd7, 0xbf, 0xf3, 0xef, 0x49, 0x96, 0x90, 0x1a, 0x56, 0x81, 0xd9, 0x45, 0x9b,
0x2d, 0xad, 0x03, 0xa3, 0x0b, 0xf7, 0x11, 0x58, 0xf5, 0x1d, 0x78, 0x00, 0x9d, 0x8b, 0xd0, 0x3e,
0x50, 0xcf, 0xe0, 0xca, 0x46, 0xea, 0x79, 0x1e, 0xda, 0x1d, 0x3c, 0x84, 0xee, 0x45, 0x18, 0xd8,
0x5d, 0x75, 0x38, 0x0f, 0x03, 0xbb, 0xe7, 0x3e, 0x80, 0xa1, 0xf9, 0x7d, 0x8c, 0x61, 0xfa, 0x9c,
0x04, 0xc1, 0x87, 0xb3, 0xa7, 0x97, 0xcb, 0xb7, 0xab, 0x65, 0xf8, 0xc2, 0x3e, 0xc0, 0x13, 0xb8,
0xa5, 0x7b, 0xcb, 0xd5, 0x9b, 0x97, 0x36, 0xf2, 0xbf, 0x21, 0xe8, 0xab, 0xad, 0x14, 0xf8, 0x14,
0xfa, 0x3a, 0x53, 0xf8, 0xb8, 0xf1, 0xdc, 0xfe, 0x38, 0x66, 0xb7, 0x6f, 0xb6, 0xcd, 0xff, 0x7d,
0x02, 0x3d, 0x95, 0x0f, 0x7c, 0xd4, 0xe8, 0xad, 0xf4, 0xcc, 0x8e, 0x6f, 0x74, 0xcd, 0xd0, 0x13,
0xb5, 0x72, 0x4d, 0xa8, 0xec, 0xb4, 0x66, 0x5b, 0xd1, 0x6a, 0xcd, 0xb6, 0x03, 0x76, 0xd6, 0x7b,
0xd7, 0xc9, 0xd7, 0xeb, 0x81, 0xfe, 0x86, 0x4f, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0xae, 0xb0,
0xad, 0x04, 0x05, 0x04, 0x00, 0x00,
}
type DRPCNodesClient interface {
DRPCConn() drpc.Conn
Query(ctx context.Context, in *QueryRequest) (*QueryResponse, error)
Ping(ctx context.Context, in *PingRequest) (*PingResponse, error)
RequestInfo(ctx context.Context, in *InfoRequest) (*InfoResponse, error)
}
type drpcNodesClient struct {
cc drpc.Conn
}
func NewDRPCNodesClient(cc drpc.Conn) DRPCNodesClient {
return &drpcNodesClient{cc}
}
func (c *drpcNodesClient) DRPCConn() drpc.Conn { return c.cc }
func (c *drpcNodesClient) Query(ctx context.Context, in *QueryRequest) (*QueryResponse, error) {
out := new(QueryResponse)
err := c.cc.Invoke(ctx, "/overlay.Nodes/Query", in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcNodesClient) Ping(ctx context.Context, in *PingRequest) (*PingResponse, error) {
out := new(PingResponse)
err := c.cc.Invoke(ctx, "/overlay.Nodes/Ping", in, out)
if err != nil {
return nil, err
}
return out, nil
}
func (c *drpcNodesClient) RequestInfo(ctx context.Context, in *InfoRequest) (*InfoResponse, error) {
out := new(InfoResponse)
err := c.cc.Invoke(ctx, "/overlay.Nodes/RequestInfo", in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCNodesServer interface {
Query(context.Context, *QueryRequest) (*QueryResponse, error)
Ping(context.Context, *PingRequest) (*PingResponse, error)
RequestInfo(context.Context, *InfoRequest) (*InfoResponse, error)
}
type DRPCNodesDescription struct{}
func (DRPCNodesDescription) NumMethods() int { return 3 }
func (DRPCNodesDescription) Method(n int) (string, drpc.Handler, interface{}, bool) {
switch n {
case 0:
return "/overlay.Nodes/Query",
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCNodesServer).
Query(
ctx,
in1.(*QueryRequest),
)
}, DRPCNodesServer.Query, true
case 1:
return "/overlay.Nodes/Ping",
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCNodesServer).
Ping(
ctx,
in1.(*PingRequest),
)
}, DRPCNodesServer.Ping, true
case 2:
return "/overlay.Nodes/RequestInfo",
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCNodesServer).
RequestInfo(
ctx,
in1.(*InfoRequest),
)
}, DRPCNodesServer.RequestInfo, true
default:
return "", nil, nil, false
}
}
func DRPCRegisterNodes(srv drpc.Server, impl DRPCNodesServer) {
srv.Register(impl, DRPCNodesDescription{})
}
type DRPCNodes_QueryStream interface {
drpc.Stream
SendAndClose(*QueryResponse) error
}
type drpcNodesQueryStream struct {
drpc.Stream
}
func (x *drpcNodesQueryStream) SendAndClose(m *QueryResponse) error {
if err := x.MsgSend(m); err != nil {
return err
}
return x.CloseSend()
}
type DRPCNodes_PingStream interface {
drpc.Stream
SendAndClose(*PingResponse) error
}
type drpcNodesPingStream struct {
drpc.Stream
}
func (x *drpcNodesPingStream) SendAndClose(m *PingResponse) error {
if err := x.MsgSend(m); err != nil {
return err
}
return x.CloseSend()
}
type DRPCNodes_RequestInfoStream interface {
drpc.Stream
SendAndClose(*InfoResponse) error
}
type drpcNodesRequestInfoStream struct {
drpc.Stream
}
func (x *drpcNodesRequestInfoStream) SendAndClose(m *InfoResponse) error {
if err := x.MsgSend(m); err != nil {
return err
}
return x.CloseSend()
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// NodesClient is the client API for Nodes service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type NodesClient interface {
Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error)
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
RequestInfo(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error)
}
type nodesClient struct {
cc *grpc.ClientConn
}
func NewNodesClient(cc *grpc.ClientConn) NodesClient {
return &nodesClient{cc}
}
func (c *nodesClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) {
out := new(QueryResponse)
err := c.cc.Invoke(ctx, "/overlay.Nodes/Query", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *nodesClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) {
out := new(PingResponse)
err := c.cc.Invoke(ctx, "/overlay.Nodes/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *nodesClient) RequestInfo(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) {
out := new(InfoResponse)
err := c.cc.Invoke(ctx, "/overlay.Nodes/RequestInfo", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// NodesServer is the server API for Nodes service.
type NodesServer interface {
Query(context.Context, *QueryRequest) (*QueryResponse, error)
Ping(context.Context, *PingRequest) (*PingResponse, error)
RequestInfo(context.Context, *InfoRequest) (*InfoResponse, error)
}
func RegisterNodesServer(s *grpc.Server, srv NodesServer) {
s.RegisterService(&_Nodes_serviceDesc, srv)
}
func _Nodes_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(QueryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NodesServer).Query(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/overlay.Nodes/Query",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NodesServer).Query(ctx, req.(*QueryRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Nodes_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NodesServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/overlay.Nodes/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NodesServer).Ping(ctx, req.(*PingRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Nodes_RequestInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InfoRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NodesServer).RequestInfo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/overlay.Nodes/RequestInfo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NodesServer).RequestInfo(ctx, req.(*InfoRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Nodes_serviceDesc = grpc.ServiceDesc{
ServiceName: "overlay.Nodes",
HandlerType: (*NodesServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Query",
Handler: _Nodes_Query_Handler,
},
{
MethodName: "Ping",
Handler: _Nodes_Ping_Handler,
},
{
MethodName: "RequestInfo",
Handler: _Nodes_RequestInfo_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "overlay.proto",
// 325 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x91, 0x41, 0x4b, 0xeb, 0x40,
0x10, 0xc7, 0xbb, 0x49, 0xda, 0xf4, 0x4d, 0x5f, 0xcb, 0xbe, 0xe5, 0x1d, 0x42, 0x51, 0x28, 0x39,
0x15, 0x94, 0x1c, 0xaa, 0x08, 0x1e, 0xad, 0x8d, 0xb5, 0x58, 0x2a, 0xae, 0x41, 0xc1, 0x8b, 0xa4,
0xcd, 0x5a, 0x02, 0x65, 0x67, 0x49, 0x62, 0x21, 0x9f, 0xce, 0xef, 0xe5, 0x49, 0xb2, 0x49, 0xda,
0x22, 0x7a, 0x9a, 0xff, 0xcc, 0xff, 0xb7, 0xbb, 0x33, 0xb3, 0xd0, 0xc5, 0xad, 0x48, 0x36, 0x61,
0xee, 0xa9, 0x04, 0x33, 0x64, 0x76, 0x95, 0xf6, 0x61, 0x8d, 0x6b, 0x2c, 0x8b, 0x7d, 0x90, 0x18,
0x89, 0x52, 0xbb, 0x1f, 0x04, 0xfe, 0xce, 0xe4, 0x1b, 0x72, 0x91, 0x2a, 0x94, 0xa9, 0x60, 0x2e,
0x58, 0x59, 0xae, 0x84, 0x63, 0x0c, 0xc8, 0xb0, 0x37, 0xea, 0x79, 0x9a, 0x5d, 0x60, 0x24, 0x82,
0x5c, 0x09, 0xae, 0x3d, 0xe6, 0x41, 0x1b, 0x95, 0x48, 0xc2, 0x0c, 0x13, 0xc7, 0x1c, 0x90, 0x61,
0x67, 0xc4, 0xf6, 0xdc, 0x7d, 0xe5, 0xf0, 0x1d, 0x53, 0xf0, 0xab, 0x50, 0x85, 0xab, 0x38, 0xcb,
0x1d, 0xeb, 0x3b, 0x7f, 0x5d, 0x39, 0x7c, 0xc7, 0xb0, 0x13, 0xb0, 0xb7, 0x22, 0x49, 0x63, 0x94,
0x4e, 0x53, 0xe3, 0xff, 0xf6, 0xf8, 0x53, 0x69, 0xf0, 0x9a, 0x70, 0x3f, 0x09, 0x74, 0xb8, 0x48,
0xb3, 0x24, 0x5e, 0x65, 0x31, 0x4a, 0x76, 0x79, 0xd0, 0x1c, 0xd1, 0x43, 0x1c, 0x7b, 0xf5, 0x52,
0x0e, 0x38, 0xef, 0x87, 0x3e, 0x2f, 0xc0, 0xd6, 0x5a, 0x46, 0xd5, 0xf8, 0x47, 0xbf, 0x9f, 0x94,
0x11, 0xaf, 0x61, 0xf6, 0x1f, 0x9a, 0xdb, 0x70, 0xf3, 0x2e, 0xf4, 0x32, 0x4c, 0x5e, 0x26, 0xee,
0x39, 0xb4, 0xeb, 0x37, 0x58, 0x0b, 0x8c, 0x79, 0x40, 0x1b, 0x45, 0xf4, 0x1f, 0x28, 0x29, 0xe2,
0x34, 0xa0, 0x06, 0xb3, 0xc1, 0x9c, 0x07, 0x3e, 0x35, 0x0b, 0x31, 0x0d, 0x7c, 0x6a, 0xb9, 0xa7,
0x60, 0x57, 0xf7, 0x33, 0x06, 0xbd, 0x1b, 0xee, 0xfb, 0xaf, 0xe3, 0xab, 0xc5, 0xe4, 0x79, 0x36,
0x09, 0x6e, 0x69, 0x83, 0x75, 0xe1, 0x8f, 0xae, 0x4d, 0x66, 0x8f, 0x77, 0x94, 0x8c, 0xad, 0x17,
0x43, 0x2d, 0x97, 0x2d, 0xfd, 0x97, 0x67, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x39, 0x65, 0xff,
0x9d, 0xfd, 0x01, 0x00, 0x00,
}

View File

@ -6,35 +6,9 @@ option go_package = "pb";
import "gogo.proto";
import "node.proto";
import "vouchers.proto";
package overlay;
service Nodes {
rpc Query(QueryRequest) returns (QueryResponse);
rpc Ping(PingRequest) returns (PingResponse);
rpc RequestInfo(InfoRequest) returns (InfoResponse);
}
message QueryRequest {
node.Node sender = 1;
node.Node target = 2;
int64 limit = 3;
bool pingback = 4;
repeated vouchers.Voucher vouchers = 5;
}
message QueryResponse {
node.Node sender = 1;
repeated node.Node response = 2;
}
message PingRequest {};
message PingResponse {};
// TODO: add fields that validate who is requesting the info
message InfoRequest {}
message InfoResponse {
node.NodeType type = 2;
node.NodeOperator operator = 3;

View File

@ -38,9 +38,6 @@ type (
// NodeStatsClient is an alias to the drpc client interface
NodeStatsClient = pb.DRPCNodeStatsClient
// NodesClient is an alias to the drpc client interface
NodesClient = pb.DRPCNodesClient
// OrdersClient is an alias to the drpc client interface
OrdersClient = pb.DRPCOrdersClient
@ -140,16 +137,6 @@ func (c *Conn) NodeStatsClient() NodeStatsClient {
return NewNodeStatsClient(c.raw)
}
// NewNodesClient returns the drpc version of a NodesClient
func NewNodesClient(rc *RawConn) NodesClient {
return pb.NewDRPCNodesClient(rc)
}
// NodesClient returns a NodesClient for this connection
func (c *Conn) NodesClient() NodesClient {
return NewNodesClient(c.raw)
}
// NewOrdersClient returns the drpc version of a OrdersClient
func NewOrdersClient(rc *RawConn) OrdersClient {
return pb.NewDRPCOrdersClient(rc)

View File

@ -39,9 +39,6 @@ type (
// NodeStatsClient is an alias to the grpc client interface
NodeStatsClient = pb.NodeStatsClient
// NodesClient is an alias to the grpc client interface
NodesClient = pb.NodesClient
// OrdersClient is an alias to the grpc client interface
OrdersClient = pb.OrdersClient
@ -141,16 +138,6 @@ func (c *Conn) NodeStatsClient() NodeStatsClient {
return NewNodeStatsClient(c.raw)
}
// NewNodesClient returns the grpc version of a NodesClient
func NewNodesClient(rc *RawConn) NodesClient {
return pb.NewNodesClient(rc)
}
// NodesClient returns a NodesClient for this connection
func (c *Conn) NodesClient() NodesClient {
return NewNodesClient(c.raw)
}
// NewOrdersClient returns the grpc version of a OrdersClient
func NewOrdersClient(rc *RawConn) OrdersClient {
return pb.NewOrdersClient(rc)

View File

@ -26,8 +26,7 @@ type Config struct {
func (sc Config) Run(ctx context.Context, log *zap.Logger, identity *identity.FullIdentity, revDB extensions.RevocationDB, interceptor grpc.UnaryServerInterceptor, services ...Service) (err error) {
defer mon.Task()(&ctx)(&err)
// Ensure revDB is not nil, since we call Close() below we do not want a
// panic
// Ensure revDB is not nil, since we call Close() below we do not want a panic
if revDB == nil {
return Error.New("revDB cannot be nil in call to Run")
}

View File

@ -5112,62 +5112,6 @@
}
],
"messages": [
{
"name": "QueryRequest",
"fields": [
{
"id": 1,
"name": "sender",
"type": "node.Node"
},
{
"id": 2,
"name": "target",
"type": "node.Node"
},
{
"id": 3,
"name": "limit",
"type": "int64"
},
{
"id": 4,
"name": "pingback",
"type": "bool"
},
{
"id": 5,
"name": "vouchers",
"type": "vouchers.Voucher",
"is_repeated": true
}
]
},
{
"name": "QueryResponse",
"fields": [
{
"id": 1,
"name": "sender",
"type": "node.Node"
},
{
"id": 2,
"name": "response",
"type": "node.Node",
"is_repeated": true
}
]
},
{
"name": "PingRequest"
},
{
"name": "PingResponse"
},
{
"name": "InfoRequest"
},
{
"name": "InfoResponse",
"fields": [
@ -5214,37 +5158,12 @@
]
}
],
"services": [
{
"name": "Nodes",
"rpcs": [
{
"name": "Query",
"in_type": "QueryRequest",
"out_type": "QueryResponse"
},
{
"name": "Ping",
"in_type": "PingRequest",
"out_type": "PingResponse"
},
{
"name": "RequestInfo",
"in_type": "InfoRequest",
"out_type": "InfoResponse"
}
]
}
],
"imports": [
{
"path": "gogo.proto"
},
{
"path": "node.proto"
},
{
"path": "vouchers.proto"
}
],
"package": {

View File

@ -81,9 +81,8 @@ type API struct {
Version *version.Service
Contact struct {
Service *contact.Service
Endpoint *contact.Endpoint
KEndpoint *contact.KademliaEndpoint
Service *contact.Service
Endpoint *contact.Endpoint
}
Overlay struct {
@ -216,11 +215,8 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB ex
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
peer.Contact.KEndpoint = contact.NewKademliaEndpoint(peer.Log.Named("contact:nodes_service_endpoint"))
pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint)
pb.RegisterNodesServer(peer.Server.GRPC(), peer.Contact.KEndpoint)
pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint)
pb.DRPCRegisterNodes(peer.Server.DRPC(), peer.Contact.KEndpoint)
}
{ // setup vouchers

View File

@ -48,20 +48,3 @@ func TestSatelliteContactEndpoint(t *testing.T) {
require.Equal(t, ident.PeerIdentity(), peerID)
})
}
func TestFetchInfo(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeDossier := planet.StorageNodes[0].Local()
node := pb.Node{Id: nodeDossier.Id, Address: nodeDossier.Address}
resp, err := planet.Satellites[0].Contact.Service.FetchInfo(ctx, node)
require.NotNil(t, resp)
require.NoError(t, err)
require.Equal(t, nodeDossier.Type, resp.Type)
require.Equal(t, &nodeDossier.Operator, resp.Operator)
require.Equal(t, &nodeDossier.Capacity, resp.Capacity)
require.Equal(t, nodeDossier.Version.GetVersion(), resp.Version.GetVersion())
})
}

View File

@ -1,42 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"context"
"go.uber.org/zap"
"storj.io/storj/pkg/pb"
)
// KademliaEndpoint implements the NodesServer Interface for backwards compatibility
type KademliaEndpoint struct {
log *zap.Logger
}
// NewKademliaEndpoint returns a new endpoint
func NewKademliaEndpoint(log *zap.Logger) *KademliaEndpoint {
return &KademliaEndpoint{
log: log,
}
}
// Query is a node to node communication query
func (endpoint *KademliaEndpoint) Query(ctx context.Context, req *pb.QueryRequest) (_ *pb.QueryResponse, err error) {
defer mon.Task()(&ctx)(&err)
return &pb.QueryResponse{}, nil
}
// Ping provides an easy way to verify a node is online and accepting requests
func (endpoint *KademliaEndpoint) Ping(ctx context.Context, req *pb.PingRequest) (_ *pb.PingResponse, err error) {
defer mon.Task()(&ctx)(&err)
return &pb.PingResponse{}, nil
}
// RequestInfo returns the node info
func (endpoint *KademliaEndpoint) RequestInfo(ctx context.Context, req *pb.InfoRequest) (_ *pb.InfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
return &pb.InfoResponse{}, nil
}

View File

@ -4,14 +4,12 @@
package contact
import (
"context"
"sync"
"github.com/zeebo/errs"
"go.uber.org/zap"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc"
"storj.io/storj/satellite/overlay"
)
@ -26,12 +24,6 @@ type Config struct {
ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""`
}
// Conn represents a connection
type Conn struct {
conn *rpc.Conn
client rpc.NodesClient
}
// Service is the contact service between storage nodes and satellites.
// It is responsible for updating general node information like address, capacity, and uptime.
// It is also responsible for updating peer identity information for verifying signatures from that node.
@ -66,41 +58,5 @@ func (service *Service) Local() overlay.NodeDossier {
return *service.self
}
// FetchInfo connects to a node and returns its node info.
func (service *Service) FetchInfo(ctx context.Context, target pb.Node) (_ *pb.InfoResponse, err error) {
conn, err := service.dialNode(ctx, target)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, conn.Close()) }()
resp, err := conn.client.RequestInfo(ctx, &pb.InfoRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
// dialNode dials the specified node.
func (service *Service) dialNode(ctx context.Context, target pb.Node) (_ *Conn, err error) {
defer mon.Task()(&ctx)(&err)
conn, err := service.dialer.DialNode(ctx, &target)
if err != nil {
return nil, err
}
return &Conn{
conn: conn,
client: conn.NodesClient(),
}, err
}
// Close disconnects this connection.
func (conn *Conn) Close() error {
return conn.conn.Close()
}
// Close closes resources
func (service *Service) Close() error { return nil }

View File

@ -1,130 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package discovery
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/overlay"
)
var (
mon = monkit.Package()
// Error is a general error class of this package
Error = errs.Class("discovery error")
)
// Config loads on the configuration values for the cache
type Config struct {
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"`
RefreshLimit int `help:"the amount of nodes read from the overlay in a single pagination call" default:"100"`
RefreshConcurrency int `help:"the amount of nodes refreshed in parallel" default:"8"`
}
// Discovery struct loads on cache, kad
//
// architecture: Chore
type Discovery struct {
log *zap.Logger
cache *overlay.Service
contact *contact.Service
refreshLimit int
refreshConcurrency int
Refresh sync2.Cycle
}
// New returns a new discovery service.
func New(logger *zap.Logger, ol *overlay.Service, contact *contact.Service, config Config) *Discovery {
discovery := &Discovery{
log: logger,
cache: ol,
contact: contact,
refreshLimit: config.RefreshLimit,
refreshConcurrency: config.RefreshConcurrency,
}
discovery.Refresh.SetInterval(config.RefreshInterval)
return discovery
}
// Close closes resources
func (discovery *Discovery) Close() error {
discovery.Refresh.Close()
return nil
}
// Run runs the discovery service
func (discovery *Discovery) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var group errgroup.Group
discovery.Refresh.Start(ctx, &group, func(ctx context.Context) error {
err := discovery.refresh(ctx)
if err != nil {
discovery.log.Error("error with cache refresh: ", zap.Error(err))
}
return nil
})
return group.Wait()
}
// refresh updates the cache db with the current DHT.
func (discovery *Discovery) refresh(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
limiter := sync2.NewLimiter(discovery.refreshConcurrency)
var offset int64
for {
list, more, err := discovery.cache.PaginateQualified(ctx, offset, discovery.refreshLimit)
if err != nil {
return Error.Wrap(err)
}
if len(list) == 0 {
break
}
offset += int64(len(list))
for _, node := range list {
node := node
limiter.Go(ctx, func() {
info, err := discovery.contact.FetchInfo(ctx, *node)
if ctx.Err() != nil {
return
}
if err != nil {
discovery.log.Info("could not ping node", zap.Stringer("ID", node.Id), zap.Error(err))
return
}
if _, err = discovery.cache.UpdateNodeInfo(ctx, node.Id, info); err != nil {
discovery.log.Warn("could not update node info", zap.Stringer("ID", node.GetAddress()))
}
})
}
if !more {
break
}
}
limiter.Wait()
return nil
}

View File

@ -1,27 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package discovery_test
import (
"testing"
"github.com/stretchr/testify/assert"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
)
func TestCache_Refresh(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
for _, storageNode := range planet.StorageNodes {
node, err := satellite.Overlay.Service.Get(ctx, storageNode.ID())
if assert.NoError(t, err) {
assert.Equal(t, storageNode.Addr(), node.Address.Address)
}
}
})
}

View File

@ -296,7 +296,6 @@ func TestNodeInfo(t *testing.T) {
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.StorageNodes[0].Storage2.Monitor.Loop.Pause()
planet.Satellites[0].Discovery.Service.Refresh.Pause()
node, err := planet.Satellites[0].Overlay.Service.Get(ctx, planet.StorageNodes[0].ID())
require.NoError(t, err)

View File

@ -38,7 +38,6 @@ import (
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/dbcleanup"
"storj.io/storj/satellite/discovery"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
@ -109,9 +108,8 @@ type Config struct {
Identity identity.Config
Server server.Config
Contact contact.Config
Overlay overlay.Config
Discovery discovery.Config
Contact contact.Config
Overlay overlay.Config
Metainfo metainfo.Config
Orders orders.Config
@ -155,9 +153,8 @@ type Peer struct {
// services and endpoints
Contact struct {
Service *contact.Service
Endpoint *contact.Endpoint
KEndpoint *contact.KademliaEndpoint
Service *contact.Service
Endpoint *contact.Endpoint
}
Overlay struct {
@ -166,10 +163,6 @@ type Peer struct {
Inspector *overlay.Inspector
}
Discovery struct {
Service *discovery.Discovery
}
Metainfo struct {
Database metainfo.PointerDB // TODO: move into pointerDB
Service *metainfo.Service
@ -321,17 +314,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
peer.Contact.KEndpoint = contact.NewKademliaEndpoint(peer.Log.Named("contact:nodes_service_endpoint"))
pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint)
pb.RegisterNodesServer(peer.Server.GRPC(), peer.Contact.KEndpoint)
pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint)
pb.DRPCRegisterNodes(peer.Server.DRPC(), peer.Contact.KEndpoint)
}
{ // setup discovery
log.Debug("Setting up discovery")
config := config.Discovery
peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Contact.Service, config)
}
{ // setup vouchers
@ -684,9 +668,6 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Discovery.Service.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx))
})
@ -789,10 +770,6 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Repair.Checker.Close())
}
if peer.Discovery.Service != nil {
errlist.Add(peer.Discovery.Service.Close())
}
if peer.Contact.Service != nil {
errlist.Add(peer.Contact.Service.Close())
}

View File

@ -51,8 +51,6 @@ func TestDataRepair(t *testing.T) {
// first, upload some remote data
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
@ -186,8 +184,6 @@ func TestCorruptDataRepair_Failed(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
@ -308,8 +304,6 @@ func TestCorruptDataRepair_Succeed(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
@ -425,8 +419,6 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) {
// first, upload some remote data
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Stop()
@ -509,8 +501,6 @@ func TestRepairMultipleDisqualified(t *testing.T) {
// first, upload some remote data
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
satellite.Repair.Checker.Loop.Pause()
satellite.Repair.Repairer.Loop.Pause()
@ -624,8 +614,6 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
@ -715,8 +703,6 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplinkPeer := planet.Uplinks[0]
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
@ -834,8 +820,6 @@ func TestDataRepairUploadLimit(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
// stop discovery service so that we do not get a race condition when we delete nodes from overlay
satellite.Discovery.Service.Refresh.Stop()
// stop audit to prevent possible interactions i.e. repair timeout problems
satellite.Audit.Worker.Loop.Pause()
satellite.Repair.Checker.Loop.Pause()

View File

@ -94,15 +94,6 @@ contact.external-address: ""
# If set, a path to write a process trace SVG to
# debug.trace-out: ""
# the amount of nodes refreshed in parallel
# discovery.refresh-concurrency: 8
# the interval at which the cache refreshes itself in seconds
# discovery.refresh-interval: 1s
# the amount of nodes read from the overlay in a single pagination call
# discovery.refresh-limit: 100
# the number of nodes to concurrently send garbage collection bloom filters to
# garbage-collection.concurrent-sends: 1

View File

@ -9,12 +9,9 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc/rpcstatus"
"storj.io/storj/storagenode"
)
func TestStoragenodeContactEndpoint(t *testing.T) {
@ -79,50 +76,6 @@ func TestNodeInfoUpdated(t *testing.T) {
})
}
func TestRequestInfoEndpointTrustedSatellite(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeDossier := planet.StorageNodes[0].Local()
// Satellite Trusted
conn, err := planet.Satellites[0].Dialer.DialNode(ctx, &nodeDossier.Node)
require.NoError(t, err)
defer ctx.Check(conn.Close)
resp, err := conn.NodesClient().RequestInfo(ctx, &pb.InfoRequest{})
require.NotNil(t, resp)
require.NoError(t, err)
require.Equal(t, nodeDossier.Type, resp.Type)
require.Equal(t, &nodeDossier.Operator, resp.Operator)
require.Equal(t, &nodeDossier.Capacity, resp.Capacity)
require.Equal(t, nodeDossier.Version.GetVersion(), resp.Version.GetVersion())
})
}
func TestRequestInfoEndpointUntrustedSatellite(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Storage.WhitelistedSatellites = nil
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeDossier := planet.StorageNodes[0].Local()
// Satellite Untrusted
conn, err := planet.Satellites[0].Dialer.DialNode(ctx, &nodeDossier.Node)
require.NoError(t, err)
defer ctx.Check(conn.Close)
resp, err := conn.NodesClient().RequestInfo(ctx, &pb.InfoRequest{})
require.Nil(t, resp)
require.Error(t, err)
require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied))
})
}
func TestLocalAndUpdateSelf(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,

View File

@ -1,75 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"context"
"go.uber.org/zap"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc/rpcstatus"
"storj.io/storj/pkg/storj"
)
// SatelliteIDVerifier checks if the connection is from a trusted satellite
type SatelliteIDVerifier interface {
VerifySatelliteID(ctx context.Context, id storj.NodeID) error
}
// KademliaEndpoint implements the NodesServer Interface for backwards compatibility
type KademliaEndpoint struct {
log *zap.Logger
service *Service
trust SatelliteIDVerifier
}
// NewKademliaEndpoint returns a new endpoint
func NewKademliaEndpoint(log *zap.Logger, service *Service, trust SatelliteIDVerifier) *KademliaEndpoint {
return &KademliaEndpoint{
log: log,
service: service,
trust: trust,
}
}
// Query is a node to node communication query
func (endpoint *KademliaEndpoint) Query(ctx context.Context, req *pb.QueryRequest) (_ *pb.QueryResponse, err error) {
defer mon.Task()(&ctx)(&err)
return &pb.QueryResponse{}, nil
}
// Ping provides an easy way to verify a node is online and accepting requests
func (endpoint *KademliaEndpoint) Ping(ctx context.Context, req *pb.PingRequest) (_ *pb.PingResponse, err error) {
defer mon.Task()(&ctx)(&err)
return &pb.PingResponse{}, nil
}
// RequestInfo returns the node info
func (endpoint *KademliaEndpoint) RequestInfo(ctx context.Context, req *pb.InfoRequest) (_ *pb.InfoResponse, err error) {
defer mon.Task()(&ctx)(&err)
self := endpoint.service.Local()
if endpoint.trust == nil {
return nil, rpcstatus.Error(rpcstatus.Internal, "missing trust")
}
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
err = endpoint.trust.VerifySatelliteID(ctx, peer.ID)
if err != nil {
return nil, rpcstatus.Errorf(rpcstatus.PermissionDenied, "untrusted peer %v", peer.ID)
}
return &pb.InfoResponse{
Type: self.Type,
Operator: &self.Operator,
Capacity: &self.Capacity,
Version: &self.Version,
}, nil
}

View File

@ -25,7 +25,7 @@ type Config struct {
ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""`
// Chore config values
Interval time.Duration `help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"30s"`
Interval time.Duration `help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"5s"`
}
// Service is the contact service between storage nodes and satellites

View File

@ -29,8 +29,6 @@ func TestInspectorStats(t *testing.T) {
planet.Start(ctx)
planet.Satellites[0].Discovery.Service.Refresh.TriggerWait()
var availableBandwidth int64
var availableSpace int64
for _, storageNode := range planet.StorageNodes {

View File

@ -20,17 +20,12 @@ func TestMonitor(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
var freeBandwidth int64
var freeBandwidthInit int64
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Monitor.Loop.Pause()
info, err := satellite.Contact.Service.FetchInfo(ctx, storageNode.Local().Node)
require.NoError(t, err)
// assume that all storage nodes have the same initial values
freeBandwidth = info.Capacity.FreeBandwidth
freeBandwidthInit = storageNode.Local().Capacity.FreeBandwidth
}
expectedData := testrand.Bytes(100 * memory.KiB)
@ -42,13 +37,11 @@ func TestMonitor(t *testing.T) {
for _, storageNode := range planet.StorageNodes {
storageNode.Storage2.Monitor.Loop.TriggerWait()
info, err := satellite.Contact.Service.FetchInfo(ctx, storageNode.Local().Node)
require.NoError(t, err)
freeBandwidthNew := storageNode.Local().Capacity.FreeBandwidth
stats, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{})
require.NoError(t, err)
if stats.UsedSpace > 0 {
assert.Equal(t, freeBandwidth-stats.UsedBandwidth, info.Capacity.FreeBandwidth)
assert.Equal(t, freeBandwidthInit-stats.UsedBandwidth, freeBandwidthNew)
nodeAssertions++
}
}

View File

@ -123,7 +123,6 @@ type Peer struct {
Service *contact.Service
Chore *contact.Chore
Endpoint *contact.Endpoint
KEndpoint *contact.KademliaEndpoint
PingStats *contact.PingStats
}
@ -232,11 +231,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self)
peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Storage2.Trust, peer.Dialer, peer.Contact.Service)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.PingStats)
peer.Contact.KEndpoint = contact.NewKademliaEndpoint(peer.Log.Named("contact:nodes_service_endpoint"), peer.Contact.Service, peer.Storage2.Trust)
pb.RegisterContactServer(peer.Server.GRPC(), peer.Contact.Endpoint)
pb.RegisterNodesServer(peer.Server.GRPC(), peer.Contact.KEndpoint)
pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint)
pb.DRPCRegisterNodes(peer.Server.DRPC(), peer.Contact.KEndpoint)
}
{ // setup storage
@ -412,6 +408,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Version.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Contact.Chore.Run(ctx))
})
@ -421,16 +420,12 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Orders.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.CacheService.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.RetainService.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Bandwidth.Run(ctx))
})

View File

@ -27,9 +27,6 @@ func TestGetSignee(t *testing.T) {
planet.Start(ctx)
// make sure nodes are refreshed in db
planet.Satellites[0].Discovery.Service.Refresh.TriggerWait()
trust := planet.StorageNodes[0].Storage2.Trust
canceledContext, cancel := context.WithCancel(ctx)