Ensure that we only choose storage nodes (#732)
This commit is contained in:
parent
934e9c8dac
commit
0ae05cf834
@ -92,6 +92,7 @@ func cmdAdd(cmd *cobra.Command, args []string) (err error) {
|
||||
zap.S().Infof("adding node ID: %s; Address: %s", i, a)
|
||||
err := c.Put(i, pb.Node{
|
||||
Id: i,
|
||||
// TODO: NodeType is missing
|
||||
Address: &pb.NodeAddress{
|
||||
Transport: 0,
|
||||
Address: a,
|
||||
|
@ -39,6 +39,7 @@ func main() {
|
||||
|
||||
// Set up connection with rpc server
|
||||
n := &pb.Node{
|
||||
// TODO: NodeType is missing
|
||||
Address: &pb.NodeAddress{
|
||||
Address: ":7777",
|
||||
Transport: 0,
|
||||
@ -90,7 +91,7 @@ func main() {
|
||||
id := psclient.NewPieceID()
|
||||
|
||||
allocationData := &pb.PayerBandwidthAllocation_Data{
|
||||
SatelliteId: []byte("OhHeyThisIsAnUnrealFakeSatellite"),
|
||||
SatelliteId: []byte("OhHeyThisIsAnUnrealFakeSatellite"),
|
||||
Action: pb.PayerBandwidthAllocation_PUT,
|
||||
CreatedUnixSec: time.Now().Unix(),
|
||||
}
|
||||
@ -154,7 +155,7 @@ func main() {
|
||||
}
|
||||
|
||||
allocationData := &pb.PayerBandwidthAllocation_Data{
|
||||
SatelliteId: []byte("OhHeyThisIsAnUnrealFakeSatellite"),
|
||||
SatelliteId: []byte("OhHeyThisIsAnUnrealFakeSatellite"),
|
||||
Action: pb.PayerBandwidthAllocation_GET,
|
||||
CreatedUnixSec: time.Now().Unix(),
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ type Node struct {
|
||||
}
|
||||
|
||||
// newNode creates a new node.
|
||||
func (planet *Planet) newNode(name string) (*Node, error) {
|
||||
func (planet *Planet) newNode(name string, nodeType pb.NodeType) (*Node, error) {
|
||||
identity, err := planet.newIdentity()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -66,7 +66,8 @@ func (planet *Planet) newNode(name string) (*Node, error) {
|
||||
}
|
||||
|
||||
node.Info = pb.Node{
|
||||
Id: node.Identity.ID.String(),
|
||||
Id: node.Identity.ID.String(),
|
||||
Type: nodeType,
|
||||
Address: &pb.NodeAddress{
|
||||
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
||||
Address: node.Listener.Addr().String(),
|
||||
|
@ -67,17 +67,17 @@ func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
planet.Satellites, err = planet.newNodes("satellite", satelliteCount)
|
||||
planet.Satellites, err = planet.newNodes("satellite", satelliteCount, pb.NodeType_ADMIN)
|
||||
if err != nil {
|
||||
return nil, utils.CombineErrors(err, planet.Shutdown())
|
||||
}
|
||||
|
||||
planet.StorageNodes, err = planet.newNodes("storage", storageNodeCount)
|
||||
planet.StorageNodes, err = planet.newNodes("storage", storageNodeCount, pb.NodeType_STORAGE)
|
||||
if err != nil {
|
||||
return nil, utils.CombineErrors(err, planet.Shutdown())
|
||||
}
|
||||
|
||||
planet.Uplinks, err = planet.newNodes("uplink", uplinkCount)
|
||||
planet.Uplinks, err = planet.newNodes("uplink", uplinkCount, pb.NodeType_ADMIN) // TODO: fix the node type here
|
||||
if err != nil {
|
||||
return nil, utils.CombineErrors(err, planet.Shutdown())
|
||||
}
|
||||
@ -211,10 +211,10 @@ func (planet *Planet) Shutdown() error {
|
||||
}
|
||||
|
||||
// newNodes creates initializes multiple nodes
|
||||
func (planet *Planet) newNodes(prefix string, count int) ([]*Node, error) {
|
||||
func (planet *Planet) newNodes(prefix string, count int, nodeType pb.NodeType) ([]*Node, error) {
|
||||
var xs []*Node
|
||||
for i := 0; i < count; i++ {
|
||||
node, err := planet.newNode(prefix + strconv.Itoa(i))
|
||||
node, err := planet.newNode(prefix+strconv.Itoa(i), nodeType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/accounting"
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
@ -38,7 +39,7 @@ func TestOnlineNodes(t *testing.T) {
|
||||
expectedOnline := []*pb.Node{}
|
||||
for i := 0; i < N; i++ {
|
||||
str := strconv.Itoa(i)
|
||||
n := &pb.Node{Id: str, Address: &pb.NodeAddress{Address: str}}
|
||||
n := &pb.Node{Id: str, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: str}}
|
||||
nodes = append(nodes, n)
|
||||
if i%(rand.Intn(5)+2) == 0 {
|
||||
id := node.IDFromString("id" + str)
|
||||
|
@ -210,7 +210,8 @@ func (m *mockDownloader) DownloadShares(ctx context.Context, pointer *pb.Pointer
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
nodes[i] = &pb.Node{
|
||||
Id: strconv.Itoa(i),
|
||||
Id: strconv.Itoa(i),
|
||||
Type: pb.NodeType_STORAGE,
|
||||
Address: &pb.NodeAddress{
|
||||
Address: strconv.Itoa(i),
|
||||
},
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/auth"
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/dht"
|
||||
@ -70,7 +71,7 @@ func TestIdentifyInjuredSegments(t *testing.T) {
|
||||
//nodes for cache
|
||||
selection := rand.Intn(4)
|
||||
for _, v := range ids[:selection] {
|
||||
n := &pb.Node{Id: v, Address: &pb.NodeAddress{Address: v}}
|
||||
n := &pb.Node{Id: v, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: v}}
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
pieces := []int32{0, 1, 2, 3}
|
||||
@ -117,7 +118,7 @@ func TestOfflineNodes(t *testing.T) {
|
||||
expectedOffline := []int32{}
|
||||
for i := 0; i < N; i++ {
|
||||
str := strconv.Itoa(i)
|
||||
n := &pb.Node{Id: str, Address: &pb.NodeAddress{Address: str}}
|
||||
n := &pb.Node{Id: str, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: str}}
|
||||
nodes = append(nodes, n)
|
||||
if i%(rand.Intn(5)+2) == 0 {
|
||||
id := node.IDFromString("id" + str)
|
||||
@ -182,7 +183,7 @@ func BenchmarkIdentifyInjuredSegments(b *testing.B) {
|
||||
//nodes for cache
|
||||
selection := rand.Intn(4)
|
||||
for _, v := range ids[:selection] {
|
||||
n := &pb.Node{Id: v, Address: &pb.NodeAddress{Address: v}}
|
||||
n := &pb.Node{Id: v, Type: pb.NodeType_STORAGE, Address: &pb.NodeAddress{Address: v}}
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
pieces := []int32{0, 1, 2, 3}
|
||||
|
@ -96,7 +96,8 @@ func (srv *Server) PingNode(ctx context.Context, req *pb.PingNodeRequest) (*pb.P
|
||||
}
|
||||
|
||||
p, err := nc.Ping(ctx, pb.Node{
|
||||
Id: req.Id,
|
||||
Id: req.Id,
|
||||
Type: self.Type,
|
||||
Address: &pb.NodeAddress{
|
||||
Address: req.Address,
|
||||
},
|
||||
|
@ -72,7 +72,9 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
|
||||
Wallet: c.Farmer.Wallet,
|
||||
}
|
||||
|
||||
kad, err := NewKademlia(server.Identity().ID, []pb.Node{*in}, server.Addr().String(), metadata, server.Identity(), c.DBPath, c.Alpha)
|
||||
nodeType := pb.NodeType_STORAGE // TODO: fix this for satellites
|
||||
|
||||
kad, err := NewKademlia(server.Identity().ID, nodeType, []pb.Node{*in}, server.Addr().String(), metadata, server.Identity(), c.DBPath, c.Alpha)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -54,9 +54,10 @@ type Kademlia struct {
|
||||
}
|
||||
|
||||
// NewKademlia returns a newly configured Kademlia instance
|
||||
func NewKademlia(id dht.NodeID, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *provider.FullIdentity, path string, alpha int) (*Kademlia, error) {
|
||||
func NewKademlia(id dht.NodeID, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *provider.FullIdentity, path string, alpha int) (*Kademlia, error) {
|
||||
self := pb.Node{
|
||||
Id: id.String(),
|
||||
Type: nodeType,
|
||||
Address: &pb.NodeAddress{Address: address},
|
||||
Metadata: metadata,
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ func TestNewKademlia(t *testing.T) {
|
||||
identity, err := ca.NewIdentity()
|
||||
assert.NoError(t, err)
|
||||
|
||||
kad, err := NewKademlia(v.id, v.bn, v.addr, nil, identity, dir, defaultAlpha)
|
||||
kad, err := NewKademlia(v.id, pb.NodeType_STORAGE, v.bn, v.addr, nil, identity, dir, defaultAlpha)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, v.expectedErr, err)
|
||||
assert.Equal(t, kad.bootstrapNodes, v.bn)
|
||||
@ -97,7 +97,7 @@ func TestPeerDiscovery(t *testing.T) {
|
||||
Email: "foo@bar.com",
|
||||
Wallet: "FarmerWallet",
|
||||
}
|
||||
k, err := NewKademlia(dht.NodeID(testID.ID), bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha)
|
||||
k, err := NewKademlia(dht.NodeID(testID.ID), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha)
|
||||
assert.NoError(t, err)
|
||||
rt, err := k.GetRoutingTable(context.Background())
|
||||
assert.NoError(t, err)
|
||||
@ -177,7 +177,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
|
||||
// new kademlia
|
||||
dir, cleanup := mktempdir(t, "kademlia")
|
||||
|
||||
k, err := NewKademlia(id, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
|
||||
k, err := NewKademlia(id, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
|
||||
assert.NoError(t, err)
|
||||
s := node.NewServer(k)
|
||||
// new ident opts
|
||||
@ -220,7 +220,7 @@ func TestGetNodes(t *testing.T) {
|
||||
|
||||
dir, cleanup := mktempdir(t, "kademlia")
|
||||
defer cleanup()
|
||||
k, err := NewKademlia(kid, []pb.Node{pb.Node{Id: id2.String(), Address: &pb.NodeAddress{Address: lis.Addr().String()}}}, lis.Addr().String(), nil, fid, dir, defaultAlpha)
|
||||
k, err := NewKademlia(kid, pb.NodeType_STORAGE, []pb.Node{pb.Node{Id: id2.String(), Address: &pb.NodeAddress{Address: lis.Addr().String()}}}, lis.Addr().String(), nil, fid, dir, defaultAlpha)
|
||||
assert.NoError(t, err)
|
||||
defer func() {
|
||||
assert.NoError(t, k.Disconnect())
|
||||
|
@ -75,14 +75,14 @@ func TestChoose(t *testing.T) {
|
||||
limit: 4,
|
||||
space: 0,
|
||||
allNodes: func() []*pb.Node {
|
||||
n1 := &pb.Node{Id: "n1"}
|
||||
n2 := &pb.Node{Id: "n2"}
|
||||
n3 := &pb.Node{Id: "n3"}
|
||||
n4 := &pb.Node{Id: "n4"}
|
||||
n5 := &pb.Node{Id: "n5"}
|
||||
n6 := &pb.Node{Id: "n6"}
|
||||
n7 := &pb.Node{Id: "n7"}
|
||||
n8 := &pb.Node{Id: "n8"}
|
||||
n1 := &pb.Node{Id: "n1", Type: pb.NodeType_STORAGE}
|
||||
n2 := &pb.Node{Id: "n2", Type: pb.NodeType_STORAGE}
|
||||
n3 := &pb.Node{Id: "n3", Type: pb.NodeType_STORAGE}
|
||||
n4 := &pb.Node{Id: "n4", Type: pb.NodeType_STORAGE}
|
||||
n5 := &pb.Node{Id: "n5", Type: pb.NodeType_STORAGE}
|
||||
n6 := &pb.Node{Id: "n6", Type: pb.NodeType_STORAGE}
|
||||
n7 := &pb.Node{Id: "n7", Type: pb.NodeType_STORAGE}
|
||||
n8 := &pb.Node{Id: "n8", Type: pb.NodeType_STORAGE}
|
||||
return []*pb.Node{n1, n2, n3, n4, n5, n6, n7, n8}
|
||||
}(),
|
||||
excluded: func() []dht.NodeID {
|
||||
|
@ -151,11 +151,15 @@ func (o *Server) populate(ctx context.Context, starting storage.Key, maxNodes, r
|
||||
}
|
||||
|
||||
for _, v := range nodes {
|
||||
rest := v.GetRestrictions()
|
||||
if v.Type != pb.NodeType_STORAGE {
|
||||
continue
|
||||
}
|
||||
|
||||
if rest.GetFreeBandwidth() < restrictedBandwidth ||
|
||||
rest.GetFreeDisk() < restrictedSpace ||
|
||||
contains(excluded, v.Id) {
|
||||
rest := v.GetRestrictions()
|
||||
if rest.GetFreeBandwidth() < restrictedBandwidth || rest.GetFreeDisk() < restrictedSpace {
|
||||
continue
|
||||
}
|
||||
if contains(excluded, v.Id) {
|
||||
continue
|
||||
}
|
||||
result = append(result, v)
|
||||
|
Loading…
Reference in New Issue
Block a user