Kademlia flags cleanup (#1137)

This commit is contained in:
Egon Elbre 2019-01-29 08:51:07 +02:00 committed by GitHub
parent bb2588e9ae
commit e1a8bbdcb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 97 deletions

View File

@ -118,13 +118,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
}
kdb, ndb := peer.DB.RoutingTable()
peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb)
peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb, &config.RoutingTableConfig)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
// TODO: reduce number of arguments
peer.Kademlia.Service, err = kademlia.NewWith(peer.Log.Named("kademlia"), self, nil, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, nil, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -188,7 +188,7 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Kademlia.Service.Close())
}
if peer.Kademlia.RoutingTable != nil {
errlist.Add(peer.Kademlia.RoutingTable.SelfClose())
errlist.Add(peer.Kademlia.RoutingTable.Close())
}
return errlist.Err()

View File

@ -4,7 +4,6 @@
package kademlia
import (
"flag"
"fmt"
"regexp"
@ -18,10 +17,23 @@ var (
// mon = monkit.Package() // TODO: figure out whether this is needed
)
var (
flagBucketSize = flag.Int("kademlia.bucket-size", 20, "size of each Kademlia bucket")
flagReplacementCacheSize = flag.Int("kademlia.replacement-cache-size", 5, "size of Kademlia replacement cache")
)
// Config defines all of the things that are needed to start up Kademlia
// server endpoints (and not necessarily client code).
type Config struct {
BootstrapAddr string `help:"the Kademlia node to bootstrap against" default:"127.0.0.1:7778"`
DBPath string `help:"the path for storage node db services to be created on" default:"$CONFDIR/kademlia"`
ExternalAddress string `user:"true" help:"the public address of the Kademlia node, useful for nodes behind NAT" default:""`
Operator OperatorConfig
// TODO: reduce the number of flags here
Alpha int `help:"alpha is a system wide concurrency parameter" default:"5"`
RoutingTableConfig
}
// Verify verifies whether kademlia config is valid.
func (c Config) Verify(log *zap.Logger) error {
return c.Operator.Verify(log)
}
// OperatorConfig defines properties related to storage node operator metadata
type OperatorConfig struct {
@ -34,11 +46,9 @@ func (c OperatorConfig) Verify(log *zap.Logger) error {
if err := isOperatorEmailValid(log, c.Email); err != nil {
return err
}
if err := isOperatorWalletValid(log, c.Wallet); err != nil {
return err
}
return nil
}
@ -63,18 +73,3 @@ func isOperatorWalletValid(log *zap.Logger, wallet string) error {
log.Sugar().Info("Operator wallet: ", wallet)
return nil
}
// Config defines all of the things that are needed to start up Kademlia
// server endpoints (and not necessarily client code).
type Config struct {
BootstrapAddr string `help:"the Kademlia node to bootstrap against" default:"127.0.0.1:7778"`
DBPath string `help:"the path for storage node db services to be created on" default:"$CONFDIR/kademlia"`
Alpha int `help:"alpha is a system wide concurrency parameter" default:"5"`
ExternalAddress string `user:"true" help:"the public address of the Kademlia node, useful for nodes behind NAT" default:""`
Operator OperatorConfig
}
// Verify verifies whether kademlia config is valid.
func (c Config) Verify(log *zap.Logger) error {
return c.Operator.Verify(log)
}

View File

@ -5,10 +5,7 @@ package kademlia
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"time"
"github.com/gogo/protobuf/proto"
@ -22,7 +19,6 @@ import (
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/storage"
"storj.io/storj/storage/boltdb"
)
var (
@ -57,45 +53,8 @@ type Kademlia struct {
bootstrapFinished sync2.Fence
}
// New returns a newly configured Kademlia instance
var New = NewKademlia
// NewKademlia returns a newly configured Kademlia instance
func NewKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *provider.FullIdentity, path string, alpha int) (*Kademlia, error) {
self := pb.Node{
Id: identity.ID,
Type: nodeType,
Address: &pb.NodeAddress{Address: address},
Metadata: metadata,
}
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
}
bucketIdentifier := self.Id.String()[:5] // need a way to differentiate between nodes if running more than one simultaneously
dbpath := filepath.Join(path, fmt.Sprintf("kademlia_%s.db", bucketIdentifier))
dbs, err := boltdb.NewShared(dbpath, KademliaBucket, NodeBucket)
if err != nil {
return nil, BootstrapErr.Wrap(err)
}
kdb, ndb := dbs[0], dbs[1]
rt, err := NewRoutingTable(log, self, kdb, ndb)
if err != nil {
return nil, BootstrapErr.Wrap(err)
}
return NewKademliaWithRoutingTable(log, self, bootstrapNodes, identity, alpha, rt)
}
// NewWith returns a newly configured Kademlia instance
var NewWith = NewKademliaWithRoutingTable
// NewKademliaWithRoutingTable returns a newly configured Kademlia instance
func NewKademliaWithRoutingTable(log *zap.Logger, self pb.Node, bootstrapNodes []pb.Node, identity *provider.FullIdentity, alpha int, rt *RoutingTable) (*Kademlia, error) {
// NewService returns a newly configured Kademlia instance
func NewService(log *zap.Logger, self pb.Node, bootstrapNodes []pb.Node, identity *provider.FullIdentity, alpha int, rt *RoutingTable) (*Kademlia, error) {
k := &Kademlia{
log: log,
alpha: alpha,

View File

@ -18,6 +18,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
@ -29,6 +31,7 @@ import (
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/provider"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage/teststore"
)
const (
@ -50,7 +53,7 @@ func TestNewKademlia(t *testing.T) {
{
id: func() *identity.FullIdentity {
id, err := testidentity.NewTestIdentity(ctx)
assert.NoError(t, err)
require.NoError(t, err)
return id
}(),
bn: []pb.Node{{Id: teststorj.NodeIDFromString("foo")}},
@ -59,7 +62,7 @@ func TestNewKademlia(t *testing.T) {
{
id: func() *provider.FullIdentity {
id, err := testidentity.NewTestIdentity(ctx)
assert.NoError(t, err)
require.NoError(t, err)
return id
}(),
bn: []pb.Node{{Id: teststorj.NodeIDFromString("foo")}},
@ -70,8 +73,8 @@ func TestNewKademlia(t *testing.T) {
for i, v := range cases {
dir := filepath.Join(rootdir, strconv.Itoa(i))
kad, err := NewKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, v.bn, v.addr, nil, v.id, dir, defaultAlpha)
assert.NoError(t, err)
kad, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, v.bn, v.addr, nil, v.id, dir, defaultAlpha)
require.NoError(t, err)
assert.Equal(t, v.expectedErr, err)
assert.Equal(t, kad.bootstrapNodes, v.bn)
assert.NotNil(t, kad.dialer)
@ -100,7 +103,7 @@ func TestPeerDiscovery(t *testing.T) {
Email: "foo@bar.com",
Wallet: "OperatorWallet",
}
k, err := NewKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha)
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha)
assert.NoError(t, err)
rt, err := k.GetRoutingTable(ctx)
assert.NoError(t, err)
@ -174,7 +177,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) {
dir, cleanup := mktempdir(t, "kademlia")
logger := zaptest.NewLogger(t)
k, err := NewKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha)
assert.NoError(t, err)
s := node.NewServer(logger, k)
// new ident opts
@ -248,7 +251,7 @@ func TestFindNear(t *testing.T) {
assert.NotEqual(t, fid.ID, fid2.ID)
dir, cleanup := mktempdir(t, "kademlia")
defer cleanup()
k, err := NewKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, []pb.Node{{Id: fid2.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}}, lis.Addr().String(), nil, fid, dir, defaultAlpha)
k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, []pb.Node{{Id: fid2.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}}, lis.Addr().String(), nil, fid, dir, defaultAlpha)
assert.NoError(t, err)
defer func() {
assert.NoError(t, k.Disconnect())
@ -532,3 +535,20 @@ func (mn *mockNodesServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.P
atomic.AddInt32(&mn.pingCalled, 1)
return &pb.PingResponse{}, nil
}
// newKademlia returns a newly configured Kademlia instance
func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *provider.FullIdentity, path string, alpha int) (*Kademlia, error) {
self := pb.Node{
Id: identity.ID,
Type: nodeType,
Address: &pb.NodeAddress{Address: address},
Metadata: metadata,
}
rt, err := NewRoutingTable(log, self, teststore.New(), teststore.New(), nil)
if err != nil {
return nil, BootstrapErr.Wrap(err)
}
return NewService(log, self, bootstrapNodes, identity, alpha, rt)
}

View File

@ -15,7 +15,6 @@ import (
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
"storj.io/storj/storage"
)
@ -46,6 +45,12 @@ var firstBucketID = bucketID{
var emptyBucketID = bucketID{}
// RoutingTableConfig configures the routing table
type RoutingTableConfig struct {
BucketSize int `help:"size of each Kademlia bucket" default:"20"`
ReplacementCacheSize int `help:"size of Kademlia replacement cache" default:"5"`
}
// RoutingTable implements the RoutingTable interface
type RoutingTable struct {
log *zap.Logger
@ -58,13 +63,20 @@ type RoutingTable struct {
replacementCache map[bucketID][]*pb.Node
bucketSize int // max number of nodes stored in a kbucket = 20 (k)
rcBucketSize int // replacementCache bucket max length
}
// NewRoutingTable returns a newly configured instance of a RoutingTable
func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.KeyValueStore) (*RoutingTable, error) {
func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.KeyValueStore, config *RoutingTableConfig) (*RoutingTable, error) {
localNode.Type.DPanicOnInvalid("new routing table")
if config == nil || config.BucketSize == 0 || config.ReplacementCacheSize == 0 {
// TODO: handle this more nicely
config = &RoutingTableConfig{
BucketSize: 20,
ReplacementCacheSize: 5,
}
}
rt := &RoutingTable{
log: logger,
self: localNode,
@ -76,8 +88,8 @@ func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.Key
seen: make(map[storj.NodeID]*pb.Node),
replacementCache: make(map[bucketID][]*pb.Node),
bucketSize: *flagBucketSize,
rcBucketSize: *flagReplacementCacheSize,
bucketSize: config.BucketSize,
rcBucketSize: config.ReplacementCacheSize,
}
ok, err := rt.addNode(&localNode)
if !ok || err != nil {
@ -86,18 +98,9 @@ func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.Key
return rt, nil
}
// SelfClose close without closing dependencies
// TODO: rename to Close and remove Close
func (rt *RoutingTable) SelfClose() error {
return nil
}
// Close closes underlying databases
// Close close without closing dependencies
func (rt *RoutingTable) Close() error {
return utils.CombineErrors(
rt.kadBucketDB.Close(),
rt.nodeBucketDB.Close(),
)
return nil
}
// Local returns the local nodes ID

View File

@ -227,14 +227,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (*
}
peer.Kademlia.kdb, peer.Kademlia.ndb = dbs[0], dbs[1]
peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, peer.Kademlia.kdb, peer.Kademlia.ndb)
peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, peer.Kademlia.kdb, peer.Kademlia.ndb, &config.RoutingTableConfig)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
}
// TODO: reduce number of arguments
peer.Kademlia.Service, err = kademlia.NewWith(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -469,7 +469,7 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Kademlia.Service.Close())
}
if peer.Kademlia.RoutingTable != nil {
errlist.Add(peer.Kademlia.RoutingTable.SelfClose())
errlist.Add(peer.Kademlia.RoutingTable.Close())
}
if peer.Kademlia.ndb != nil || peer.Kademlia.kdb != nil {

View File

@ -141,13 +141,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P
}
kdb, ndb := peer.DB.RoutingTable()
peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb)
peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb, &config.RoutingTableConfig)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
// TODO: reduce number of arguments
peer.Kademlia.Service, err = kademlia.NewWith(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable)
peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -244,7 +244,7 @@ func (peer *Peer) Close() error {
errlist.Add(peer.Kademlia.Service.Close())
}
if peer.Kademlia.RoutingTable != nil {
errlist.Add(peer.Kademlia.RoutingTable.SelfClose())
errlist.Add(peer.Kademlia.RoutingTable.Close())
}
return errlist.Err()