From b185dbbee210430058cc8b57b4fe5eb6e03bc4f6 Mon Sep 17 00:00:00 2001 From: Jennifer Li Johnson Date: Mon, 14 Oct 2019 10:57:01 -0400 Subject: [PATCH] satellite/discovery: remove discovery related code (#3175) --- cmd/storj-sim/network.go | 1 + internal/testplanet/run.go | 3 - internal/testplanet/satellite.go | 6 - lib/uplinkc/testdata_test.go | 4 - pkg/pb/overlay.pb.go | 558 +------------------- pkg/pb/overlay.proto | 26 - pkg/rpc/compat_drpc.go | 13 - pkg/rpc/compat_grpc.go | 13 - pkg/server/config.go | 3 +- proto.lock | 81 --- satellite/api.go | 8 +- satellite/contact/contact_test.go | 17 - satellite/contact/kademlia.go | 42 -- satellite/contact/service.go | 44 -- satellite/discovery/service.go | 130 ----- satellite/discovery/service_test.go | 27 - satellite/overlay/service_test.go | 1 - satellite/peer.go | 31 +- satellite/repair/repair_test.go | 16 - scripts/testdata/satellite-config.yaml.lock | 9 - storagenode/contact/contact_test.go | 47 -- storagenode/contact/kademlia.go | 75 --- storagenode/contact/service.go | 2 +- storagenode/inspector/inspector_test.go | 2 - storagenode/monitor/monitor_test.go | 15 +- storagenode/peer.go | 11 +- storagenode/trust/service_test.go | 3 - 27 files changed, 42 insertions(+), 1146 deletions(-) delete mode 100644 satellite/contact/kademlia.go delete mode 100644 satellite/discovery/service.go delete mode 100644 satellite/discovery/service_test.go delete mode 100644 storagenode/contact/kademlia.go diff --git a/cmd/storj-sim/network.go b/cmd/storj-sim/network.go index 7da49973e..38b1ae0f1 100644 --- a/cmd/storj-sim/network.go +++ b/cmd/storj-sim/network.go @@ -154,6 +154,7 @@ func networkTest(flags *Flags, command string, args []string) error { if printCommands { fmt.Fprintf(processes.Output, "exec: %v\n", strings.Join(cmd.Args, " ")) } + time.Sleep(6 * time.Second) //hack: this is so the contact chore can send the satellite the capacity info on its second iteration after 5s errRun := cmd.Run() cancel() diff --git a/internal/testplanet/run.go b/internal/testplanet/run.go index cc188427d..553d86649 100644 --- a/internal/testplanet/run.go +++ b/internal/testplanet/run.go @@ -91,9 +91,6 @@ func Run(t *testing.T, config Config, test func(t *testing.T, ctx *testcontext.C planet.Start(ctx) - // make sure nodes are refreshed in db - planet.Satellites[0].Discovery.Service.Refresh.TriggerWait() - test(t, ctx, planet) }) } diff --git a/internal/testplanet/satellite.go b/internal/testplanet/satellite.go index 1358e4b4a..d00e36acd 100644 --- a/internal/testplanet/satellite.go +++ b/internal/testplanet/satellite.go @@ -23,7 +23,6 @@ import ( "storj.io/storj/satellite/console" "storj.io/storj/satellite/console/consoleweb" "storj.io/storj/satellite/dbcleanup" - "storj.io/storj/satellite/discovery" "storj.io/storj/satellite/gc" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/mailservice" @@ -120,11 +119,6 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) { }, UpdateStatsBatchSize: 100, }, - Discovery: discovery.Config{ - RefreshInterval: 1 * time.Second, - RefreshLimit: 100, - RefreshConcurrency: 2, - }, Metainfo: metainfo.Config{ DatabaseURL: "", // not used MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024 diff --git a/lib/uplinkc/testdata_test.go b/lib/uplinkc/testdata_test.go index 7d8f3df05..3e3c86984 100644 --- a/lib/uplinkc/testdata_test.go +++ b/lib/uplinkc/testdata_test.go @@ -33,12 +33,8 @@ func RunPlanet(t *testing.T, run func(ctx *testcontext.Context, planet *testplan ) require.NoError(t, err) defer ctx.Check(planet.Shutdown) - planet.Start(ctx) - // make sure nodes are refreshed in db - planet.Satellites[0].Discovery.Service.Refresh.TriggerWait() - run(ctx, planet) } diff --git a/pkg/pb/overlay.pb.go b/pkg/pb/overlay.pb.go index 550338a91..09fc7ae36 100644 --- a/pkg/pb/overlay.pb.go +++ b/pkg/pb/overlay.pb.go @@ -4,13 +4,10 @@ package pb import ( - context "context" fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" - grpc "google.golang.org/grpc" math "math" - drpc "storj.io/drpc" ) // Reference imports to suppress errors if they are not otherwise used. @@ -55,7 +52,7 @@ func (x Restriction_Operator) String() string { } func (Restriction_Operator) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{6, 0} + return fileDescriptor_61fc82527fbe24ad, []int{1, 0} } type Restriction_Operand int32 @@ -80,216 +77,9 @@ func (x Restriction_Operand) String() string { } func (Restriction_Operand) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{6, 1} + return fileDescriptor_61fc82527fbe24ad, []int{1, 1} } -type QueryRequest struct { - Sender *Node `protobuf:"bytes,1,opt,name=sender,proto3" json:"sender,omitempty"` - Target *Node `protobuf:"bytes,2,opt,name=target,proto3" json:"target,omitempty"` - Limit int64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` - Pingback bool `protobuf:"varint,4,opt,name=pingback,proto3" json:"pingback,omitempty"` - Vouchers []*Voucher `protobuf:"bytes,5,rep,name=vouchers,proto3" json:"vouchers,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *QueryRequest) Reset() { *m = QueryRequest{} } -func (m *QueryRequest) String() string { return proto.CompactTextString(m) } -func (*QueryRequest) ProtoMessage() {} -func (*QueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{0} -} -func (m *QueryRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_QueryRequest.Unmarshal(m, b) -} -func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_QueryRequest.Marshal(b, m, deterministic) -} -func (m *QueryRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_QueryRequest.Merge(m, src) -} -func (m *QueryRequest) XXX_Size() int { - return xxx_messageInfo_QueryRequest.Size(m) -} -func (m *QueryRequest) XXX_DiscardUnknown() { - xxx_messageInfo_QueryRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_QueryRequest proto.InternalMessageInfo - -func (m *QueryRequest) GetSender() *Node { - if m != nil { - return m.Sender - } - return nil -} - -func (m *QueryRequest) GetTarget() *Node { - if m != nil { - return m.Target - } - return nil -} - -func (m *QueryRequest) GetLimit() int64 { - if m != nil { - return m.Limit - } - return 0 -} - -func (m *QueryRequest) GetPingback() bool { - if m != nil { - return m.Pingback - } - return false -} - -func (m *QueryRequest) GetVouchers() []*Voucher { - if m != nil { - return m.Vouchers - } - return nil -} - -type QueryResponse struct { - Sender *Node `protobuf:"bytes,1,opt,name=sender,proto3" json:"sender,omitempty"` - Response []*Node `protobuf:"bytes,2,rep,name=response,proto3" json:"response,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *QueryResponse) Reset() { *m = QueryResponse{} } -func (m *QueryResponse) String() string { return proto.CompactTextString(m) } -func (*QueryResponse) ProtoMessage() {} -func (*QueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{1} -} -func (m *QueryResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_QueryResponse.Unmarshal(m, b) -} -func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_QueryResponse.Marshal(b, m, deterministic) -} -func (m *QueryResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_QueryResponse.Merge(m, src) -} -func (m *QueryResponse) XXX_Size() int { - return xxx_messageInfo_QueryResponse.Size(m) -} -func (m *QueryResponse) XXX_DiscardUnknown() { - xxx_messageInfo_QueryResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_QueryResponse proto.InternalMessageInfo - -func (m *QueryResponse) GetSender() *Node { - if m != nil { - return m.Sender - } - return nil -} - -func (m *QueryResponse) GetResponse() []*Node { - if m != nil { - return m.Response - } - return nil -} - -type PingRequest struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *PingRequest) Reset() { *m = PingRequest{} } -func (m *PingRequest) String() string { return proto.CompactTextString(m) } -func (*PingRequest) ProtoMessage() {} -func (*PingRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{2} -} -func (m *PingRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_PingRequest.Unmarshal(m, b) -} -func (m *PingRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_PingRequest.Marshal(b, m, deterministic) -} -func (m *PingRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_PingRequest.Merge(m, src) -} -func (m *PingRequest) XXX_Size() int { - return xxx_messageInfo_PingRequest.Size(m) -} -func (m *PingRequest) XXX_DiscardUnknown() { - xxx_messageInfo_PingRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_PingRequest proto.InternalMessageInfo - -type PingResponse struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *PingResponse) Reset() { *m = PingResponse{} } -func (m *PingResponse) String() string { return proto.CompactTextString(m) } -func (*PingResponse) ProtoMessage() {} -func (*PingResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{3} -} -func (m *PingResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_PingResponse.Unmarshal(m, b) -} -func (m *PingResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_PingResponse.Marshal(b, m, deterministic) -} -func (m *PingResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_PingResponse.Merge(m, src) -} -func (m *PingResponse) XXX_Size() int { - return xxx_messageInfo_PingResponse.Size(m) -} -func (m *PingResponse) XXX_DiscardUnknown() { - xxx_messageInfo_PingResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_PingResponse proto.InternalMessageInfo - -// TODO: add fields that validate who is requesting the info -type InfoRequest struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *InfoRequest) Reset() { *m = InfoRequest{} } -func (m *InfoRequest) String() string { return proto.CompactTextString(m) } -func (*InfoRequest) ProtoMessage() {} -func (*InfoRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{4} -} -func (m *InfoRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_InfoRequest.Unmarshal(m, b) -} -func (m *InfoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_InfoRequest.Marshal(b, m, deterministic) -} -func (m *InfoRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_InfoRequest.Merge(m, src) -} -func (m *InfoRequest) XXX_Size() int { - return xxx_messageInfo_InfoRequest.Size(m) -} -func (m *InfoRequest) XXX_DiscardUnknown() { - xxx_messageInfo_InfoRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_InfoRequest proto.InternalMessageInfo - type InfoResponse struct { Type NodeType `protobuf:"varint,2,opt,name=type,proto3,enum=node.NodeType" json:"type,omitempty"` Operator *NodeOperator `protobuf:"bytes,3,opt,name=operator,proto3" json:"operator,omitempty"` @@ -304,7 +94,7 @@ func (m *InfoResponse) Reset() { *m = InfoResponse{} } func (m *InfoResponse) String() string { return proto.CompactTextString(m) } func (*InfoResponse) ProtoMessage() {} func (*InfoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{5} + return fileDescriptor_61fc82527fbe24ad, []int{0} } func (m *InfoResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_InfoResponse.Unmarshal(m, b) @@ -365,7 +155,7 @@ func (m *Restriction) Reset() { *m = Restriction{} } func (m *Restriction) String() string { return proto.CompactTextString(m) } func (*Restriction) ProtoMessage() {} func (*Restriction) Descriptor() ([]byte, []int) { - return fileDescriptor_61fc82527fbe24ad, []int{6} + return fileDescriptor_61fc82527fbe24ad, []int{1} } func (m *Restriction) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Restriction.Unmarshal(m, b) @@ -409,11 +199,6 @@ func (m *Restriction) GetValue() int64 { func init() { proto.RegisterEnum("overlay.Restriction_Operator", Restriction_Operator_name, Restriction_Operator_value) proto.RegisterEnum("overlay.Restriction_Operand", Restriction_Operand_name, Restriction_Operand_value) - proto.RegisterType((*QueryRequest)(nil), "overlay.QueryRequest") - proto.RegisterType((*QueryResponse)(nil), "overlay.QueryResponse") - proto.RegisterType((*PingRequest)(nil), "overlay.PingRequest") - proto.RegisterType((*PingResponse)(nil), "overlay.PingResponse") - proto.RegisterType((*InfoRequest)(nil), "overlay.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "overlay.InfoResponse") proto.RegisterType((*Restriction)(nil), "overlay.Restriction") } @@ -421,317 +206,26 @@ func init() { func init() { proto.RegisterFile("overlay.proto", fileDescriptor_61fc82527fbe24ad) } var fileDescriptor_61fc82527fbe24ad = []byte{ - // 518 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x8b, 0xd3, 0x40, - 0x14, 0xc7, 0x77, 0xfa, 0x33, 0xbe, 0xfe, 0x20, 0x3b, 0xec, 0x4a, 0x28, 0x0a, 0x25, 0x07, 0x29, - 0xa8, 0x39, 0x64, 0x65, 0x41, 0x6f, 0xae, 0x8d, 0x6b, 0x71, 0x59, 0xdd, 0x31, 0xac, 0xa0, 0x07, - 0x49, 0x93, 0x31, 0x06, 0xeb, 0x4c, 0x9c, 0x4c, 0x0b, 0xf9, 0xaf, 0xbc, 0x79, 0xf4, 0xff, 0xf2, - 0x24, 0x33, 0x99, 0xa4, 0x71, 0x55, 0xf0, 0x94, 0x79, 0xef, 0xfb, 0x79, 0x9d, 0x6f, 0x5e, 0xbf, - 0x81, 0x09, 0xdf, 0x51, 0xb1, 0x89, 0x4a, 0x2f, 0x17, 0x5c, 0x72, 0x3c, 0x34, 0xe5, 0x0c, 0x52, - 0x9e, 0xf2, 0xaa, 0x39, 0x03, 0xc6, 0x13, 0x6a, 0xce, 0xd3, 0x1d, 0xdf, 0xc6, 0x9f, 0xa8, 0x28, - 0xaa, 0xda, 0xfd, 0x8e, 0x60, 0x7c, 0xb5, 0xa5, 0xa2, 0x24, 0xf4, 0xeb, 0x96, 0x16, 0x12, 0xbb, - 0x30, 0x28, 0x28, 0x4b, 0xa8, 0x70, 0xd0, 0x1c, 0x2d, 0x46, 0x3e, 0x78, 0x7a, 0xfa, 0x92, 0x27, - 0x94, 0x18, 0x45, 0x31, 0x32, 0x12, 0x29, 0x95, 0x4e, 0xe7, 0x4f, 0xa6, 0x52, 0xf0, 0x11, 0xf4, - 0x37, 0xd9, 0x97, 0x4c, 0x3a, 0xdd, 0x39, 0x5a, 0x74, 0x49, 0x55, 0xe0, 0x19, 0x58, 0x79, 0xc6, - 0xd2, 0x75, 0x14, 0x7f, 0x76, 0x7a, 0x73, 0xb4, 0xb0, 0x48, 0x53, 0xe3, 0x87, 0x60, 0xd5, 0xe6, - 0x9c, 0xfe, 0xbc, 0xbb, 0x18, 0xf9, 0x87, 0x5e, 0xe3, 0xf6, 0xba, 0x3a, 0x90, 0x06, 0x71, 0xdf, - 0xc3, 0xc4, 0x18, 0x2f, 0x72, 0xce, 0x0a, 0xfa, 0x5f, 0xce, 0xef, 0x81, 0x25, 0x0c, 0xef, 0x74, - 0xf4, 0x1d, 0x6d, 0xaa, 0xd1, 0xdc, 0x09, 0x8c, 0x5e, 0x67, 0x2c, 0x35, 0x4b, 0x71, 0xa7, 0x30, - 0xae, 0xca, 0xbd, 0xbc, 0x62, 0x1f, 0x79, 0x2d, 0xff, 0x40, 0x30, 0xae, 0xea, 0xc6, 0x4a, 0x4f, - 0x96, 0x39, 0xd5, 0xeb, 0x99, 0xfa, 0xd3, 0xfd, 0x15, 0x61, 0x99, 0x53, 0xa2, 0x35, 0xec, 0x81, - 0xc5, 0x73, 0x2a, 0x22, 0xc9, 0x85, 0xde, 0xd1, 0xc8, 0xc7, 0x7b, 0xee, 0x95, 0x51, 0x48, 0xc3, - 0x28, 0x3e, 0x8e, 0xf2, 0x28, 0xce, 0x64, 0xa9, 0x57, 0xf7, 0x1b, 0xff, 0xcc, 0x28, 0xa4, 0x61, - 0xf0, 0x7d, 0x18, 0xee, 0xa8, 0x28, 0x32, 0xce, 0x9c, 0xbe, 0xc6, 0x0f, 0xf7, 0xf8, 0x75, 0x25, - 0x90, 0x9a, 0x70, 0x7f, 0x22, 0x18, 0x11, 0x5a, 0x48, 0x91, 0xc5, 0x32, 0xe3, 0x0c, 0x3f, 0x6e, - 0x99, 0x43, 0xfa, 0x25, 0xee, 0x7a, 0x75, 0xd2, 0x5a, 0x9c, 0xf7, 0x17, 0x9f, 0xa7, 0x30, 0xd4, - 0x67, 0x96, 0x98, 0xd7, 0xbf, 0xf3, 0xef, 0x49, 0x96, 0x90, 0x1a, 0x56, 0x81, 0xd9, 0x45, 0x9b, - 0x2d, 0xad, 0x03, 0xa3, 0x0b, 0xf7, 0x11, 0x58, 0xf5, 0x1d, 0x78, 0x00, 0x9d, 0x8b, 0xd0, 0x3e, - 0x50, 0xcf, 0xe0, 0xca, 0x46, 0xea, 0x79, 0x1e, 0xda, 0x1d, 0x3c, 0x84, 0xee, 0x45, 0x18, 0xd8, - 0x5d, 0x75, 0x38, 0x0f, 0x03, 0xbb, 0xe7, 0x3e, 0x80, 0xa1, 0xf9, 0x7d, 0x8c, 0x61, 0xfa, 0x9c, - 0x04, 0xc1, 0x87, 0xb3, 0xa7, 0x97, 0xcb, 0xb7, 0xab, 0x65, 0xf8, 0xc2, 0x3e, 0xc0, 0x13, 0xb8, - 0xa5, 0x7b, 0xcb, 0xd5, 0x9b, 0x97, 0x36, 0xf2, 0xbf, 0x21, 0xe8, 0xab, 0xad, 0x14, 0xf8, 0x14, - 0xfa, 0x3a, 0x53, 0xf8, 0xb8, 0xf1, 0xdc, 0xfe, 0x38, 0x66, 0xb7, 0x6f, 0xb6, 0xcd, 0xff, 0x7d, - 0x02, 0x3d, 0x95, 0x0f, 0x7c, 0xd4, 0xe8, 0xad, 0xf4, 0xcc, 0x8e, 0x6f, 0x74, 0xcd, 0xd0, 0x13, - 0xb5, 0x72, 0x4d, 0xa8, 0xec, 0xb4, 0x66, 0x5b, 0xd1, 0x6a, 0xcd, 0xb6, 0x03, 0x76, 0xd6, 0x7b, - 0xd7, 0xc9, 0xd7, 0xeb, 0x81, 0xfe, 0x86, 0x4f, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0xae, 0xb0, - 0xad, 0x04, 0x05, 0x04, 0x00, 0x00, -} - -type DRPCNodesClient interface { - DRPCConn() drpc.Conn - - Query(ctx context.Context, in *QueryRequest) (*QueryResponse, error) - Ping(ctx context.Context, in *PingRequest) (*PingResponse, error) - RequestInfo(ctx context.Context, in *InfoRequest) (*InfoResponse, error) -} - -type drpcNodesClient struct { - cc drpc.Conn -} - -func NewDRPCNodesClient(cc drpc.Conn) DRPCNodesClient { - return &drpcNodesClient{cc} -} - -func (c *drpcNodesClient) DRPCConn() drpc.Conn { return c.cc } - -func (c *drpcNodesClient) Query(ctx context.Context, in *QueryRequest) (*QueryResponse, error) { - out := new(QueryResponse) - err := c.cc.Invoke(ctx, "/overlay.Nodes/Query", in, out) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *drpcNodesClient) Ping(ctx context.Context, in *PingRequest) (*PingResponse, error) { - out := new(PingResponse) - err := c.cc.Invoke(ctx, "/overlay.Nodes/Ping", in, out) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *drpcNodesClient) RequestInfo(ctx context.Context, in *InfoRequest) (*InfoResponse, error) { - out := new(InfoResponse) - err := c.cc.Invoke(ctx, "/overlay.Nodes/RequestInfo", in, out) - if err != nil { - return nil, err - } - return out, nil -} - -type DRPCNodesServer interface { - Query(context.Context, *QueryRequest) (*QueryResponse, error) - Ping(context.Context, *PingRequest) (*PingResponse, error) - RequestInfo(context.Context, *InfoRequest) (*InfoResponse, error) -} - -type DRPCNodesDescription struct{} - -func (DRPCNodesDescription) NumMethods() int { return 3 } - -func (DRPCNodesDescription) Method(n int) (string, drpc.Handler, interface{}, bool) { - switch n { - case 0: - return "/overlay.Nodes/Query", - func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { - return srv.(DRPCNodesServer). - Query( - ctx, - in1.(*QueryRequest), - ) - }, DRPCNodesServer.Query, true - case 1: - return "/overlay.Nodes/Ping", - func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { - return srv.(DRPCNodesServer). - Ping( - ctx, - in1.(*PingRequest), - ) - }, DRPCNodesServer.Ping, true - case 2: - return "/overlay.Nodes/RequestInfo", - func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { - return srv.(DRPCNodesServer). - RequestInfo( - ctx, - in1.(*InfoRequest), - ) - }, DRPCNodesServer.RequestInfo, true - default: - return "", nil, nil, false - } -} - -func DRPCRegisterNodes(srv drpc.Server, impl DRPCNodesServer) { - srv.Register(impl, DRPCNodesDescription{}) -} - -type DRPCNodes_QueryStream interface { - drpc.Stream - SendAndClose(*QueryResponse) error -} - -type drpcNodesQueryStream struct { - drpc.Stream -} - -func (x *drpcNodesQueryStream) SendAndClose(m *QueryResponse) error { - if err := x.MsgSend(m); err != nil { - return err - } - return x.CloseSend() -} - -type DRPCNodes_PingStream interface { - drpc.Stream - SendAndClose(*PingResponse) error -} - -type drpcNodesPingStream struct { - drpc.Stream -} - -func (x *drpcNodesPingStream) SendAndClose(m *PingResponse) error { - if err := x.MsgSend(m); err != nil { - return err - } - return x.CloseSend() -} - -type DRPCNodes_RequestInfoStream interface { - drpc.Stream - SendAndClose(*InfoResponse) error -} - -type drpcNodesRequestInfoStream struct { - drpc.Stream -} - -func (x *drpcNodesRequestInfoStream) SendAndClose(m *InfoResponse) error { - if err := x.MsgSend(m); err != nil { - return err - } - return x.CloseSend() -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// NodesClient is the client API for Nodes service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type NodesClient interface { - Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) - Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) - RequestInfo(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) -} - -type nodesClient struct { - cc *grpc.ClientConn -} - -func NewNodesClient(cc *grpc.ClientConn) NodesClient { - return &nodesClient{cc} -} - -func (c *nodesClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) { - out := new(QueryResponse) - err := c.cc.Invoke(ctx, "/overlay.Nodes/Query", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *nodesClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { - out := new(PingResponse) - err := c.cc.Invoke(ctx, "/overlay.Nodes/Ping", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *nodesClient) RequestInfo(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) { - out := new(InfoResponse) - err := c.cc.Invoke(ctx, "/overlay.Nodes/RequestInfo", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// NodesServer is the server API for Nodes service. -type NodesServer interface { - Query(context.Context, *QueryRequest) (*QueryResponse, error) - Ping(context.Context, *PingRequest) (*PingResponse, error) - RequestInfo(context.Context, *InfoRequest) (*InfoResponse, error) -} - -func RegisterNodesServer(s *grpc.Server, srv NodesServer) { - s.RegisterService(&_Nodes_serviceDesc, srv) -} - -func _Nodes_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(QueryRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(NodesServer).Query(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/overlay.Nodes/Query", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(NodesServer).Query(ctx, req.(*QueryRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Nodes_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PingRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(NodesServer).Ping(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/overlay.Nodes/Ping", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(NodesServer).Ping(ctx, req.(*PingRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Nodes_RequestInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(InfoRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(NodesServer).RequestInfo(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/overlay.Nodes/RequestInfo", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(NodesServer).RequestInfo(ctx, req.(*InfoRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Nodes_serviceDesc = grpc.ServiceDesc{ - ServiceName: "overlay.Nodes", - HandlerType: (*NodesServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Query", - Handler: _Nodes_Query_Handler, - }, - { - MethodName: "Ping", - Handler: _Nodes_Ping_Handler, - }, - { - MethodName: "RequestInfo", - Handler: _Nodes_RequestInfo_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "overlay.proto", + // 325 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x91, 0x41, 0x4b, 0xeb, 0x40, + 0x10, 0xc7, 0xbb, 0x49, 0xda, 0xf4, 0x4d, 0x5f, 0xcb, 0xbe, 0xe5, 0x1d, 0x42, 0x51, 0x28, 0x39, + 0x15, 0x94, 0x1c, 0xaa, 0x08, 0x1e, 0xad, 0x8d, 0xb5, 0x58, 0x2a, 0xae, 0x41, 0xc1, 0x8b, 0xa4, + 0xcd, 0x5a, 0x02, 0x65, 0x67, 0x49, 0x62, 0x21, 0x9f, 0xce, 0xef, 0xe5, 0x49, 0xb2, 0x49, 0xda, + 0x22, 0x7a, 0x9a, 0xff, 0xcc, 0xff, 0xb7, 0xbb, 0x33, 0xb3, 0xd0, 0xc5, 0xad, 0x48, 0x36, 0x61, + 0xee, 0xa9, 0x04, 0x33, 0x64, 0x76, 0x95, 0xf6, 0x61, 0x8d, 0x6b, 0x2c, 0x8b, 0x7d, 0x90, 0x18, + 0x89, 0x52, 0xbb, 0x1f, 0x04, 0xfe, 0xce, 0xe4, 0x1b, 0x72, 0x91, 0x2a, 0x94, 0xa9, 0x60, 0x2e, + 0x58, 0x59, 0xae, 0x84, 0x63, 0x0c, 0xc8, 0xb0, 0x37, 0xea, 0x79, 0x9a, 0x5d, 0x60, 0x24, 0x82, + 0x5c, 0x09, 0xae, 0x3d, 0xe6, 0x41, 0x1b, 0x95, 0x48, 0xc2, 0x0c, 0x13, 0xc7, 0x1c, 0x90, 0x61, + 0x67, 0xc4, 0xf6, 0xdc, 0x7d, 0xe5, 0xf0, 0x1d, 0x53, 0xf0, 0xab, 0x50, 0x85, 0xab, 0x38, 0xcb, + 0x1d, 0xeb, 0x3b, 0x7f, 0x5d, 0x39, 0x7c, 0xc7, 0xb0, 0x13, 0xb0, 0xb7, 0x22, 0x49, 0x63, 0x94, + 0x4e, 0x53, 0xe3, 0xff, 0xf6, 0xf8, 0x53, 0x69, 0xf0, 0x9a, 0x70, 0x3f, 0x09, 0x74, 0xb8, 0x48, + 0xb3, 0x24, 0x5e, 0x65, 0x31, 0x4a, 0x76, 0x79, 0xd0, 0x1c, 0xd1, 0x43, 0x1c, 0x7b, 0xf5, 0x52, + 0x0e, 0x38, 0xef, 0x87, 0x3e, 0x2f, 0xc0, 0xd6, 0x5a, 0x46, 0xd5, 0xf8, 0x47, 0xbf, 0x9f, 0x94, + 0x11, 0xaf, 0x61, 0xf6, 0x1f, 0x9a, 0xdb, 0x70, 0xf3, 0x2e, 0xf4, 0x32, 0x4c, 0x5e, 0x26, 0xee, + 0x39, 0xb4, 0xeb, 0x37, 0x58, 0x0b, 0x8c, 0x79, 0x40, 0x1b, 0x45, 0xf4, 0x1f, 0x28, 0x29, 0xe2, + 0x34, 0xa0, 0x06, 0xb3, 0xc1, 0x9c, 0x07, 0x3e, 0x35, 0x0b, 0x31, 0x0d, 0x7c, 0x6a, 0xb9, 0xa7, + 0x60, 0x57, 0xf7, 0x33, 0x06, 0xbd, 0x1b, 0xee, 0xfb, 0xaf, 0xe3, 0xab, 0xc5, 0xe4, 0x79, 0x36, + 0x09, 0x6e, 0x69, 0x83, 0x75, 0xe1, 0x8f, 0xae, 0x4d, 0x66, 0x8f, 0x77, 0x94, 0x8c, 0xad, 0x17, + 0x43, 0x2d, 0x97, 0x2d, 0xfd, 0x97, 0x67, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x39, 0x65, 0xff, + 0x9d, 0xfd, 0x01, 0x00, 0x00, } diff --git a/pkg/pb/overlay.proto b/pkg/pb/overlay.proto index 124e11e03..55778a315 100644 --- a/pkg/pb/overlay.proto +++ b/pkg/pb/overlay.proto @@ -6,35 +6,9 @@ option go_package = "pb"; import "gogo.proto"; import "node.proto"; -import "vouchers.proto"; package overlay; -service Nodes { - rpc Query(QueryRequest) returns (QueryResponse); - rpc Ping(PingRequest) returns (PingResponse); - rpc RequestInfo(InfoRequest) returns (InfoResponse); -} - -message QueryRequest { - node.Node sender = 1; - node.Node target = 2; - int64 limit = 3; - bool pingback = 4; - repeated vouchers.Voucher vouchers = 5; -} - -message QueryResponse { - node.Node sender = 1; - repeated node.Node response = 2; -} - -message PingRequest {}; -message PingResponse {}; - -// TODO: add fields that validate who is requesting the info -message InfoRequest {} - message InfoResponse { node.NodeType type = 2; node.NodeOperator operator = 3; diff --git a/pkg/rpc/compat_drpc.go b/pkg/rpc/compat_drpc.go index aa16f62eb..6e4bf955d 100644 --- a/pkg/rpc/compat_drpc.go +++ b/pkg/rpc/compat_drpc.go @@ -38,9 +38,6 @@ type ( // NodeStatsClient is an alias to the drpc client interface NodeStatsClient = pb.DRPCNodeStatsClient - // NodesClient is an alias to the drpc client interface - NodesClient = pb.DRPCNodesClient - // OrdersClient is an alias to the drpc client interface OrdersClient = pb.DRPCOrdersClient @@ -140,16 +137,6 @@ func (c *Conn) NodeStatsClient() NodeStatsClient { return NewNodeStatsClient(c.raw) } -// NewNodesClient returns the drpc version of a NodesClient -func NewNodesClient(rc *RawConn) NodesClient { - return pb.NewDRPCNodesClient(rc) -} - -// NodesClient returns a NodesClient for this connection -func (c *Conn) NodesClient() NodesClient { - return NewNodesClient(c.raw) -} - // NewOrdersClient returns the drpc version of a OrdersClient func NewOrdersClient(rc *RawConn) OrdersClient { return pb.NewDRPCOrdersClient(rc) diff --git a/pkg/rpc/compat_grpc.go b/pkg/rpc/compat_grpc.go index 32103311e..4500b1300 100644 --- a/pkg/rpc/compat_grpc.go +++ b/pkg/rpc/compat_grpc.go @@ -39,9 +39,6 @@ type ( // NodeStatsClient is an alias to the grpc client interface NodeStatsClient = pb.NodeStatsClient - // NodesClient is an alias to the grpc client interface - NodesClient = pb.NodesClient - // OrdersClient is an alias to the grpc client interface OrdersClient = pb.OrdersClient @@ -141,16 +138,6 @@ func (c *Conn) NodeStatsClient() NodeStatsClient { return NewNodeStatsClient(c.raw) } -// NewNodesClient returns the grpc version of a NodesClient -func NewNodesClient(rc *RawConn) NodesClient { - return pb.NewNodesClient(rc) -} - -// NodesClient returns a NodesClient for this connection -func (c *Conn) NodesClient() NodesClient { - return NewNodesClient(c.raw) -} - // NewOrdersClient returns the grpc version of a OrdersClient func NewOrdersClient(rc *RawConn) OrdersClient { return pb.NewOrdersClient(rc) diff --git a/pkg/server/config.go b/pkg/server/config.go index 72de3f6d7..bc42e4a8c 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -26,8 +26,7 @@ type Config struct { func (sc Config) Run(ctx context.Context, log *zap.Logger, identity *identity.FullIdentity, revDB extensions.RevocationDB, interceptor grpc.UnaryServerInterceptor, services ...Service) (err error) { defer mon.Task()(&ctx)(&err) - // Ensure revDB is not nil, since we call Close() below we do not want a - // panic + // Ensure revDB is not nil, since we call Close() below we do not want a panic if revDB == nil { return Error.New("revDB cannot be nil in call to Run") } diff --git a/proto.lock b/proto.lock index e72b07e3e..9ac157bf6 100644 --- a/proto.lock +++ b/proto.lock @@ -5112,62 +5112,6 @@ } ], "messages": [ - { - "name": "QueryRequest", - "fields": [ - { - "id": 1, - "name": "sender", - "type": "node.Node" - }, - { - "id": 2, - "name": "target", - "type": "node.Node" - }, - { - "id": 3, - "name": "limit", - "type": "int64" - }, - { - "id": 4, - "name": "pingback", - "type": "bool" - }, - { - "id": 5, - "name": "vouchers", - "type": "vouchers.Voucher", - "is_repeated": true - } - ] - }, - { - "name": "QueryResponse", - "fields": [ - { - "id": 1, - "name": "sender", - "type": "node.Node" - }, - { - "id": 2, - "name": "response", - "type": "node.Node", - "is_repeated": true - } - ] - }, - { - "name": "PingRequest" - }, - { - "name": "PingResponse" - }, - { - "name": "InfoRequest" - }, { "name": "InfoResponse", "fields": [ @@ -5214,37 +5158,12 @@ ] } ], - "services": [ - { - "name": "Nodes", - "rpcs": [ - { - "name": "Query", - "in_type": "QueryRequest", - "out_type": "QueryResponse" - }, - { - "name": "Ping", - "in_type": "PingRequest", - "out_type": "PingResponse" - }, - { - "name": "RequestInfo", - "in_type": "InfoRequest", - "out_type": "InfoResponse" - } - ] - } - ], "imports": [ { "path": "gogo.proto" }, { "path": "node.proto" - }, - { - "path": "vouchers.proto" } ], "package": { diff --git a/satellite/api.go b/satellite/api.go index a63a2f62b..06ffdfb63 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -81,9 +81,8 @@ type API struct { Version *version.Service Contact struct { - Service *contact.Service - Endpoint *contact.Endpoint - KEndpoint *contact.KademliaEndpoint + Service *contact.Service + Endpoint *contact.Endpoint } Overlay struct { @@ -216,11 +215,8 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB ex } peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) - peer.Contact.KEndpoint = contact.NewKademliaEndpoint(peer.Log.Named("contact:nodes_service_endpoint")) pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint) - pb.RegisterNodesServer(peer.Server.GRPC(), peer.Contact.KEndpoint) pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint) - pb.DRPCRegisterNodes(peer.Server.DRPC(), peer.Contact.KEndpoint) } { // setup vouchers diff --git a/satellite/contact/contact_test.go b/satellite/contact/contact_test.go index 64ad14fb5..5ab6bd4f6 100644 --- a/satellite/contact/contact_test.go +++ b/satellite/contact/contact_test.go @@ -48,20 +48,3 @@ func TestSatelliteContactEndpoint(t *testing.T) { require.Equal(t, ident.PeerIdentity(), peerID) }) } - -func TestFetchInfo(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - nodeDossier := planet.StorageNodes[0].Local() - node := pb.Node{Id: nodeDossier.Id, Address: nodeDossier.Address} - - resp, err := planet.Satellites[0].Contact.Service.FetchInfo(ctx, node) - require.NotNil(t, resp) - require.NoError(t, err) - require.Equal(t, nodeDossier.Type, resp.Type) - require.Equal(t, &nodeDossier.Operator, resp.Operator) - require.Equal(t, &nodeDossier.Capacity, resp.Capacity) - require.Equal(t, nodeDossier.Version.GetVersion(), resp.Version.GetVersion()) - }) -} diff --git a/satellite/contact/kademlia.go b/satellite/contact/kademlia.go deleted file mode 100644 index 484121bed..000000000 --- a/satellite/contact/kademlia.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package contact - -import ( - "context" - - "go.uber.org/zap" - - "storj.io/storj/pkg/pb" -) - -// KademliaEndpoint implements the NodesServer Interface for backwards compatibility -type KademliaEndpoint struct { - log *zap.Logger -} - -// NewKademliaEndpoint returns a new endpoint -func NewKademliaEndpoint(log *zap.Logger) *KademliaEndpoint { - return &KademliaEndpoint{ - log: log, - } -} - -// Query is a node to node communication query -func (endpoint *KademliaEndpoint) Query(ctx context.Context, req *pb.QueryRequest) (_ *pb.QueryResponse, err error) { - defer mon.Task()(&ctx)(&err) - return &pb.QueryResponse{}, nil -} - -// Ping provides an easy way to verify a node is online and accepting requests -func (endpoint *KademliaEndpoint) Ping(ctx context.Context, req *pb.PingRequest) (_ *pb.PingResponse, err error) { - defer mon.Task()(&ctx)(&err) - return &pb.PingResponse{}, nil -} - -// RequestInfo returns the node info -func (endpoint *KademliaEndpoint) RequestInfo(ctx context.Context, req *pb.InfoRequest) (_ *pb.InfoResponse, err error) { - defer mon.Task()(&ctx)(&err) - return &pb.InfoResponse{}, nil -} diff --git a/satellite/contact/service.go b/satellite/contact/service.go index e17bb8908..912baafe1 100644 --- a/satellite/contact/service.go +++ b/satellite/contact/service.go @@ -4,14 +4,12 @@ package contact import ( - "context" "sync" "github.com/zeebo/errs" "go.uber.org/zap" "gopkg.in/spacemonkeygo/monkit.v2" - "storj.io/storj/pkg/pb" "storj.io/storj/pkg/rpc" "storj.io/storj/satellite/overlay" ) @@ -26,12 +24,6 @@ type Config struct { ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""` } -// Conn represents a connection -type Conn struct { - conn *rpc.Conn - client rpc.NodesClient -} - // Service is the contact service between storage nodes and satellites. // It is responsible for updating general node information like address, capacity, and uptime. // It is also responsible for updating peer identity information for verifying signatures from that node. @@ -66,41 +58,5 @@ func (service *Service) Local() overlay.NodeDossier { return *service.self } -// FetchInfo connects to a node and returns its node info. -func (service *Service) FetchInfo(ctx context.Context, target pb.Node) (_ *pb.InfoResponse, err error) { - conn, err := service.dialNode(ctx, target) - if err != nil { - return nil, err - } - defer func() { err = errs.Combine(err, conn.Close()) }() - - resp, err := conn.client.RequestInfo(ctx, &pb.InfoRequest{}) - if err != nil { - return nil, err - } - - return resp, nil -} - -// dialNode dials the specified node. -func (service *Service) dialNode(ctx context.Context, target pb.Node) (_ *Conn, err error) { - defer mon.Task()(&ctx)(&err) - - conn, err := service.dialer.DialNode(ctx, &target) - if err != nil { - return nil, err - } - - return &Conn{ - conn: conn, - client: conn.NodesClient(), - }, err -} - -// Close disconnects this connection. -func (conn *Conn) Close() error { - return conn.conn.Close() -} - // Close closes resources func (service *Service) Close() error { return nil } diff --git a/satellite/discovery/service.go b/satellite/discovery/service.go deleted file mode 100644 index 735ce4988..000000000 --- a/satellite/discovery/service.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package discovery - -import ( - "context" - "time" - - "github.com/zeebo/errs" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - monkit "gopkg.in/spacemonkeygo/monkit.v2" - - "storj.io/storj/internal/sync2" - "storj.io/storj/satellite/contact" - "storj.io/storj/satellite/overlay" -) - -var ( - mon = monkit.Package() - - // Error is a general error class of this package - Error = errs.Class("discovery error") -) - -// Config loads on the configuration values for the cache -type Config struct { - RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"1s"` - RefreshLimit int `help:"the amount of nodes read from the overlay in a single pagination call" default:"100"` - RefreshConcurrency int `help:"the amount of nodes refreshed in parallel" default:"8"` -} - -// Discovery struct loads on cache, kad -// -// architecture: Chore -type Discovery struct { - log *zap.Logger - cache *overlay.Service - contact *contact.Service - - refreshLimit int - refreshConcurrency int - - Refresh sync2.Cycle -} - -// New returns a new discovery service. -func New(logger *zap.Logger, ol *overlay.Service, contact *contact.Service, config Config) *Discovery { - discovery := &Discovery{ - log: logger, - cache: ol, - contact: contact, - refreshLimit: config.RefreshLimit, - refreshConcurrency: config.RefreshConcurrency, - } - - discovery.Refresh.SetInterval(config.RefreshInterval) - return discovery -} - -// Close closes resources -func (discovery *Discovery) Close() error { - discovery.Refresh.Close() - return nil -} - -// Run runs the discovery service -func (discovery *Discovery) Run(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - var group errgroup.Group - discovery.Refresh.Start(ctx, &group, func(ctx context.Context) error { - err := discovery.refresh(ctx) - if err != nil { - discovery.log.Error("error with cache refresh: ", zap.Error(err)) - } - return nil - }) - return group.Wait() -} - -// refresh updates the cache db with the current DHT. -func (discovery *Discovery) refresh(ctx context.Context) (err error) { - defer mon.Task()(&ctx)(&err) - - limiter := sync2.NewLimiter(discovery.refreshConcurrency) - - var offset int64 - - for { - list, more, err := discovery.cache.PaginateQualified(ctx, offset, discovery.refreshLimit) - if err != nil { - return Error.Wrap(err) - } - - if len(list) == 0 { - break - } - - offset += int64(len(list)) - - for _, node := range list { - node := node - - limiter.Go(ctx, func() { - info, err := discovery.contact.FetchInfo(ctx, *node) - if ctx.Err() != nil { - return - } - - if err != nil { - discovery.log.Info("could not ping node", zap.Stringer("ID", node.Id), zap.Error(err)) - return - } - - if _, err = discovery.cache.UpdateNodeInfo(ctx, node.Id, info); err != nil { - discovery.log.Warn("could not update node info", zap.Stringer("ID", node.GetAddress())) - } - }) - } - - if !more { - break - } - } - - limiter.Wait() - return nil -} diff --git a/satellite/discovery/service_test.go b/satellite/discovery/service_test.go deleted file mode 100644 index 121c68256..000000000 --- a/satellite/discovery/service_test.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package discovery_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "storj.io/storj/internal/testcontext" - "storj.io/storj/internal/testplanet" -) - -func TestCache_Refresh(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - satellite := planet.Satellites[0] - for _, storageNode := range planet.StorageNodes { - node, err := satellite.Overlay.Service.Get(ctx, storageNode.ID()) - if assert.NoError(t, err) { - assert.Equal(t, storageNode.Addr(), node.Address.Address) - } - } - }) -} diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 5f897df52..41b61419c 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -296,7 +296,6 @@ func TestNodeInfo(t *testing.T) { SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { planet.StorageNodes[0].Storage2.Monitor.Loop.Pause() - planet.Satellites[0].Discovery.Service.Refresh.Pause() node, err := planet.Satellites[0].Overlay.Service.Get(ctx, planet.StorageNodes[0].ID()) require.NoError(t, err) diff --git a/satellite/peer.go b/satellite/peer.go index a3a24e63c..2f558c4d1 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -38,7 +38,6 @@ import ( "storj.io/storj/satellite/console/consoleweb" "storj.io/storj/satellite/contact" "storj.io/storj/satellite/dbcleanup" - "storj.io/storj/satellite/discovery" "storj.io/storj/satellite/gc" "storj.io/storj/satellite/gracefulexit" "storj.io/storj/satellite/inspector" @@ -109,9 +108,8 @@ type Config struct { Identity identity.Config Server server.Config - Contact contact.Config - Overlay overlay.Config - Discovery discovery.Config + Contact contact.Config + Overlay overlay.Config Metainfo metainfo.Config Orders orders.Config @@ -155,9 +153,8 @@ type Peer struct { // services and endpoints Contact struct { - Service *contact.Service - Endpoint *contact.Endpoint - KEndpoint *contact.KademliaEndpoint + Service *contact.Service + Endpoint *contact.Endpoint } Overlay struct { @@ -166,10 +163,6 @@ type Peer struct { Inspector *overlay.Inspector } - Discovery struct { - Service *discovery.Discovery - } - Metainfo struct { Database metainfo.PointerDB // TODO: move into pointerDB Service *metainfo.Service @@ -321,17 +314,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metainfo } peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) - peer.Contact.KEndpoint = contact.NewKademliaEndpoint(peer.Log.Named("contact:nodes_service_endpoint")) pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint) - pb.RegisterNodesServer(peer.Server.GRPC(), peer.Contact.KEndpoint) pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint) - pb.DRPCRegisterNodes(peer.Server.DRPC(), peer.Contact.KEndpoint) - } - - { // setup discovery - log.Debug("Setting up discovery") - config := config.Discovery - peer.Discovery.Service = discovery.New(peer.Log.Named("discovery"), peer.Overlay.Service, peer.Contact.Service, config) } { // setup vouchers @@ -684,9 +668,6 @@ func (peer *Peer) Run(ctx context.Context) (err error) { group.Go(func() error { return errs2.IgnoreCanceled(peer.Version.Run(ctx)) }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Discovery.Service.Run(ctx)) - }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Repair.Checker.Run(ctx)) }) @@ -789,10 +770,6 @@ func (peer *Peer) Close() error { errlist.Add(peer.Repair.Checker.Close()) } - if peer.Discovery.Service != nil { - errlist.Add(peer.Discovery.Service.Close()) - } - if peer.Contact.Service != nil { errlist.Add(peer.Contact.Service.Close()) } diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index d04d054f1..3281a61c1 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -51,8 +51,6 @@ func TestDataRepair(t *testing.T) { // first, upload some remote data uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Pause() @@ -186,8 +184,6 @@ func TestCorruptDataRepair_Failed(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Pause() @@ -308,8 +304,6 @@ func TestCorruptDataRepair_Succeed(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Pause() @@ -425,8 +419,6 @@ func TestRemoveIrreparableSegmentFromQueue(t *testing.T) { // first, upload some remote data uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Stop() @@ -509,8 +501,6 @@ func TestRepairMultipleDisqualified(t *testing.T) { // first, upload some remote data uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() satellite.Repair.Checker.Loop.Pause() satellite.Repair.Repairer.Loop.Pause() @@ -624,8 +614,6 @@ func TestDataRepairOverride_HigherLimit(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Pause() @@ -715,8 +703,6 @@ func TestDataRepairOverride_LowerLimit(t *testing.T) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { uplinkPeer := planet.Uplinks[0] satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Pause() @@ -834,8 +820,6 @@ func TestDataRepairUploadLimit(t *testing.T) { }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satellite := planet.Satellites[0] - // stop discovery service so that we do not get a race condition when we delete nodes from overlay - satellite.Discovery.Service.Refresh.Stop() // stop audit to prevent possible interactions i.e. repair timeout problems satellite.Audit.Worker.Loop.Pause() satellite.Repair.Checker.Loop.Pause() diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 1b195ac59..62735a689 100644 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -94,15 +94,6 @@ contact.external-address: "" # If set, a path to write a process trace SVG to # debug.trace-out: "" -# the amount of nodes refreshed in parallel -# discovery.refresh-concurrency: 8 - -# the interval at which the cache refreshes itself in seconds -# discovery.refresh-interval: 1s - -# the amount of nodes read from the overlay in a single pagination call -# discovery.refresh-limit: 100 - # the number of nodes to concurrently send garbage collection bloom filters to # garbage-collection.concurrent-sends: 1 diff --git a/storagenode/contact/contact_test.go b/storagenode/contact/contact_test.go index da12be84b..5b9914997 100644 --- a/storagenode/contact/contact_test.go +++ b/storagenode/contact/contact_test.go @@ -9,12 +9,9 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - "storj.io/storj/internal/errs2" "storj.io/storj/internal/testcontext" "storj.io/storj/internal/testplanet" "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/rpc/rpcstatus" - "storj.io/storj/storagenode" ) func TestStoragenodeContactEndpoint(t *testing.T) { @@ -79,50 +76,6 @@ func TestNodeInfoUpdated(t *testing.T) { }) } -func TestRequestInfoEndpointTrustedSatellite(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - nodeDossier := planet.StorageNodes[0].Local() - - // Satellite Trusted - conn, err := planet.Satellites[0].Dialer.DialNode(ctx, &nodeDossier.Node) - require.NoError(t, err) - defer ctx.Check(conn.Close) - - resp, err := conn.NodesClient().RequestInfo(ctx, &pb.InfoRequest{}) - require.NotNil(t, resp) - require.NoError(t, err) - require.Equal(t, nodeDossier.Type, resp.Type) - require.Equal(t, &nodeDossier.Operator, resp.Operator) - require.Equal(t, &nodeDossier.Capacity, resp.Capacity) - require.Equal(t, nodeDossier.Version.GetVersion(), resp.Version.GetVersion()) - }) -} - -func TestRequestInfoEndpointUntrustedSatellite(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0, - Reconfigure: testplanet.Reconfigure{ - StorageNode: func(index int, config *storagenode.Config) { - config.Storage.WhitelistedSatellites = nil - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - nodeDossier := planet.StorageNodes[0].Local() - - // Satellite Untrusted - conn, err := planet.Satellites[0].Dialer.DialNode(ctx, &nodeDossier.Node) - require.NoError(t, err) - defer ctx.Check(conn.Close) - - resp, err := conn.NodesClient().RequestInfo(ctx, &pb.InfoRequest{}) - require.Nil(t, resp) - require.Error(t, err) - require.True(t, errs2.IsRPC(err, rpcstatus.PermissionDenied)) - }) -} - func TestLocalAndUpdateSelf(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, diff --git a/storagenode/contact/kademlia.go b/storagenode/contact/kademlia.go deleted file mode 100644 index e2f71cf47..000000000 --- a/storagenode/contact/kademlia.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package contact - -import ( - "context" - - "go.uber.org/zap" - - "storj.io/storj/pkg/identity" - "storj.io/storj/pkg/pb" - "storj.io/storj/pkg/rpc/rpcstatus" - "storj.io/storj/pkg/storj" -) - -// SatelliteIDVerifier checks if the connection is from a trusted satellite -type SatelliteIDVerifier interface { - VerifySatelliteID(ctx context.Context, id storj.NodeID) error -} - -// KademliaEndpoint implements the NodesServer Interface for backwards compatibility -type KademliaEndpoint struct { - log *zap.Logger - service *Service - trust SatelliteIDVerifier -} - -// NewKademliaEndpoint returns a new endpoint -func NewKademliaEndpoint(log *zap.Logger, service *Service, trust SatelliteIDVerifier) *KademliaEndpoint { - return &KademliaEndpoint{ - log: log, - service: service, - trust: trust, - } -} - -// Query is a node to node communication query -func (endpoint *KademliaEndpoint) Query(ctx context.Context, req *pb.QueryRequest) (_ *pb.QueryResponse, err error) { - defer mon.Task()(&ctx)(&err) - return &pb.QueryResponse{}, nil -} - -// Ping provides an easy way to verify a node is online and accepting requests -func (endpoint *KademliaEndpoint) Ping(ctx context.Context, req *pb.PingRequest) (_ *pb.PingResponse, err error) { - defer mon.Task()(&ctx)(&err) - return &pb.PingResponse{}, nil -} - -// RequestInfo returns the node info -func (endpoint *KademliaEndpoint) RequestInfo(ctx context.Context, req *pb.InfoRequest) (_ *pb.InfoResponse, err error) { - defer mon.Task()(&ctx)(&err) - self := endpoint.service.Local() - - if endpoint.trust == nil { - return nil, rpcstatus.Error(rpcstatus.Internal, "missing trust") - } - - peer, err := identity.PeerIdentityFromContext(ctx) - if err != nil { - return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) - } - - err = endpoint.trust.VerifySatelliteID(ctx, peer.ID) - if err != nil { - return nil, rpcstatus.Errorf(rpcstatus.PermissionDenied, "untrusted peer %v", peer.ID) - } - - return &pb.InfoResponse{ - Type: self.Type, - Operator: &self.Operator, - Capacity: &self.Capacity, - Version: &self.Version, - }, nil -} diff --git a/storagenode/contact/service.go b/storagenode/contact/service.go index 858ee63be..5a1307756 100644 --- a/storagenode/contact/service.go +++ b/storagenode/contact/service.go @@ -25,7 +25,7 @@ type Config struct { ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""` // Chore config values - Interval time.Duration `help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"30s"` + Interval time.Duration `help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"5s"` } // Service is the contact service between storage nodes and satellites diff --git a/storagenode/inspector/inspector_test.go b/storagenode/inspector/inspector_test.go index 77583cb86..50c620fc6 100644 --- a/storagenode/inspector/inspector_test.go +++ b/storagenode/inspector/inspector_test.go @@ -29,8 +29,6 @@ func TestInspectorStats(t *testing.T) { planet.Start(ctx) - planet.Satellites[0].Discovery.Service.Refresh.TriggerWait() - var availableBandwidth int64 var availableSpace int64 for _, storageNode := range planet.StorageNodes { diff --git a/storagenode/monitor/monitor_test.go b/storagenode/monitor/monitor_test.go index 386bd190c..d62a2cba2 100644 --- a/storagenode/monitor/monitor_test.go +++ b/storagenode/monitor/monitor_test.go @@ -20,17 +20,12 @@ func TestMonitor(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - satellite := planet.Satellites[0] - - var freeBandwidth int64 + var freeBandwidthInit int64 for _, storageNode := range planet.StorageNodes { storageNode.Storage2.Monitor.Loop.Pause() - info, err := satellite.Contact.Service.FetchInfo(ctx, storageNode.Local().Node) - require.NoError(t, err) - // assume that all storage nodes have the same initial values - freeBandwidth = info.Capacity.FreeBandwidth + freeBandwidthInit = storageNode.Local().Capacity.FreeBandwidth } expectedData := testrand.Bytes(100 * memory.KiB) @@ -42,13 +37,11 @@ func TestMonitor(t *testing.T) { for _, storageNode := range planet.StorageNodes { storageNode.Storage2.Monitor.Loop.TriggerWait() - info, err := satellite.Contact.Service.FetchInfo(ctx, storageNode.Local().Node) - require.NoError(t, err) - + freeBandwidthNew := storageNode.Local().Capacity.FreeBandwidth stats, err := storageNode.Storage2.Inspector.Stats(ctx, &pb.StatsRequest{}) require.NoError(t, err) if stats.UsedSpace > 0 { - assert.Equal(t, freeBandwidth-stats.UsedBandwidth, info.Capacity.FreeBandwidth) + assert.Equal(t, freeBandwidthInit-stats.UsedBandwidth, freeBandwidthNew) nodeAssertions++ } } diff --git a/storagenode/peer.go b/storagenode/peer.go index 7bd7a23de..014086b91 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -123,7 +123,6 @@ type Peer struct { Service *contact.Service Chore *contact.Chore Endpoint *contact.Endpoint - KEndpoint *contact.KademliaEndpoint PingStats *contact.PingStats } @@ -232,11 +231,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self) peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Storage2.Trust, peer.Dialer, peer.Contact.Service) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.PingStats) - peer.Contact.KEndpoint = contact.NewKademliaEndpoint(peer.Log.Named("contact:nodes_service_endpoint"), peer.Contact.Service, peer.Storage2.Trust) pb.RegisterContactServer(peer.Server.GRPC(), peer.Contact.Endpoint) - pb.RegisterNodesServer(peer.Server.GRPC(), peer.Contact.KEndpoint) pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint) - pb.DRPCRegisterNodes(peer.Server.DRPC(), peer.Contact.KEndpoint) } { // setup storage @@ -412,6 +408,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) { group.Go(func() error { return errs2.IgnoreCanceled(peer.Version.Run(ctx)) }) + group.Go(func() error { + return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx)) + }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Contact.Chore.Run(ctx)) }) @@ -421,16 +420,12 @@ func (peer *Peer) Run(ctx context.Context) (err error) { group.Go(func() error { return errs2.IgnoreCanceled(peer.Storage2.Orders.Run(ctx)) }) - group.Go(func() error { - return errs2.IgnoreCanceled(peer.Storage2.Monitor.Run(ctx)) - }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Storage2.CacheService.Run(ctx)) }) group.Go(func() error { return errs2.IgnoreCanceled(peer.Storage2.RetainService.Run(ctx)) }) - group.Go(func() error { return errs2.IgnoreCanceled(peer.Bandwidth.Run(ctx)) }) diff --git a/storagenode/trust/service_test.go b/storagenode/trust/service_test.go index d21c35487..246a618b0 100644 --- a/storagenode/trust/service_test.go +++ b/storagenode/trust/service_test.go @@ -27,9 +27,6 @@ func TestGetSignee(t *testing.T) { planet.Start(ctx) - // make sure nodes are refreshed in db - planet.Satellites[0].Discovery.Service.Refresh.TriggerWait() - trust := planet.StorageNodes[0].Storage2.Trust canceledContext, cancel := context.WithCancel(ctx)