From e1a8bbdcb63d819b28747de8cf4f65021d4f9f10 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Tue, 29 Jan 2019 08:51:07 +0200 Subject: [PATCH] Kademlia flags cleanup (#1137) --- bootstrap/peer.go | 6 ++--- pkg/kademlia/config.go | 39 +++++++++++++----------------- pkg/kademlia/kademlia.go | 45 ++--------------------------------- pkg/kademlia/kademlia_test.go | 34 ++++++++++++++++++++------ pkg/kademlia/routing.go | 35 ++++++++++++++------------- satellite/peer.go | 6 ++--- storagenode/peer.go | 6 ++--- 7 files changed, 74 insertions(+), 97 deletions(-) diff --git a/bootstrap/peer.go b/bootstrap/peer.go index b5c18b70a..fca0219be 100644 --- a/bootstrap/peer.go +++ b/bootstrap/peer.go @@ -118,13 +118,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P } kdb, ndb := peer.DB.RoutingTable() - peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb) + peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb, &config.RoutingTableConfig) if err != nil { return nil, errs.Combine(err, peer.Close()) } // TODO: reduce number of arguments - peer.Kademlia.Service, err = kademlia.NewWith(peer.Log.Named("kademlia"), self, nil, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable) + peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, nil, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable) if err != nil { return nil, errs.Combine(err, peer.Close()) } @@ -188,7 +188,7 @@ func (peer *Peer) Close() error { errlist.Add(peer.Kademlia.Service.Close()) } if peer.Kademlia.RoutingTable != nil { - errlist.Add(peer.Kademlia.RoutingTable.SelfClose()) + errlist.Add(peer.Kademlia.RoutingTable.Close()) } return errlist.Err() diff --git a/pkg/kademlia/config.go b/pkg/kademlia/config.go index 104909f2f..a3c5f0f88 100644 --- a/pkg/kademlia/config.go +++ b/pkg/kademlia/config.go @@ -4,7 +4,6 @@ package kademlia import ( - "flag" "fmt" "regexp" @@ -18,10 +17,23 @@ var ( // mon = monkit.Package() // TODO: figure out whether this is needed ) -var ( - flagBucketSize = flag.Int("kademlia.bucket-size", 20, "size of each Kademlia bucket") - flagReplacementCacheSize = flag.Int("kademlia.replacement-cache-size", 5, "size of Kademlia replacement cache") -) +// Config defines all of the things that are needed to start up Kademlia +// server endpoints (and not necessarily client code). +type Config struct { + BootstrapAddr string `help:"the Kademlia node to bootstrap against" default:"127.0.0.1:7778"` + DBPath string `help:"the path for storage node db services to be created on" default:"$CONFDIR/kademlia"` + ExternalAddress string `user:"true" help:"the public address of the Kademlia node, useful for nodes behind NAT" default:""` + Operator OperatorConfig + + // TODO: reduce the number of flags here + Alpha int `help:"alpha is a system wide concurrency parameter" default:"5"` + RoutingTableConfig +} + +// Verify verifies whether kademlia config is valid. +func (c Config) Verify(log *zap.Logger) error { + return c.Operator.Verify(log) +} // OperatorConfig defines properties related to storage node operator metadata type OperatorConfig struct { @@ -34,11 +46,9 @@ func (c OperatorConfig) Verify(log *zap.Logger) error { if err := isOperatorEmailValid(log, c.Email); err != nil { return err } - if err := isOperatorWalletValid(log, c.Wallet); err != nil { return err } - return nil } @@ -63,18 +73,3 @@ func isOperatorWalletValid(log *zap.Logger, wallet string) error { log.Sugar().Info("Operator wallet: ", wallet) return nil } - -// Config defines all of the things that are needed to start up Kademlia -// server endpoints (and not necessarily client code). -type Config struct { - BootstrapAddr string `help:"the Kademlia node to bootstrap against" default:"127.0.0.1:7778"` - DBPath string `help:"the path for storage node db services to be created on" default:"$CONFDIR/kademlia"` - Alpha int `help:"alpha is a system wide concurrency parameter" default:"5"` - ExternalAddress string `user:"true" help:"the public address of the Kademlia node, useful for nodes behind NAT" default:""` - Operator OperatorConfig -} - -// Verify verifies whether kademlia config is valid. -func (c Config) Verify(log *zap.Logger) error { - return c.Operator.Verify(log) -} diff --git a/pkg/kademlia/kademlia.go b/pkg/kademlia/kademlia.go index eef0065dd..ff3127b4e 100644 --- a/pkg/kademlia/kademlia.go +++ b/pkg/kademlia/kademlia.go @@ -5,10 +5,7 @@ package kademlia import ( "context" - "fmt" "math/rand" - "os" - "path/filepath" "time" "github.com/gogo/protobuf/proto" @@ -22,7 +19,6 @@ import ( "storj.io/storj/pkg/storj" "storj.io/storj/pkg/transport" "storj.io/storj/storage" - "storj.io/storj/storage/boltdb" ) var ( @@ -57,45 +53,8 @@ type Kademlia struct { bootstrapFinished sync2.Fence } -// New returns a newly configured Kademlia instance -var New = NewKademlia - -// NewKademlia returns a newly configured Kademlia instance -func NewKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *provider.FullIdentity, path string, alpha int) (*Kademlia, error) { - self := pb.Node{ - Id: identity.ID, - Type: nodeType, - Address: &pb.NodeAddress{Address: address}, - Metadata: metadata, - } - if _, err := os.Stat(path); os.IsNotExist(err) { - if err := os.MkdirAll(path, 0777); err != nil { - return nil, err - } - } - - bucketIdentifier := self.Id.String()[:5] // need a way to differentiate between nodes if running more than one simultaneously - dbpath := filepath.Join(path, fmt.Sprintf("kademlia_%s.db", bucketIdentifier)) - - dbs, err := boltdb.NewShared(dbpath, KademliaBucket, NodeBucket) - if err != nil { - return nil, BootstrapErr.Wrap(err) - } - kdb, ndb := dbs[0], dbs[1] - - rt, err := NewRoutingTable(log, self, kdb, ndb) - if err != nil { - return nil, BootstrapErr.Wrap(err) - } - - return NewKademliaWithRoutingTable(log, self, bootstrapNodes, identity, alpha, rt) -} - -// NewWith returns a newly configured Kademlia instance -var NewWith = NewKademliaWithRoutingTable - -// NewKademliaWithRoutingTable returns a newly configured Kademlia instance -func NewKademliaWithRoutingTable(log *zap.Logger, self pb.Node, bootstrapNodes []pb.Node, identity *provider.FullIdentity, alpha int, rt *RoutingTable) (*Kademlia, error) { +// NewService returns a newly configured Kademlia instance +func NewService(log *zap.Logger, self pb.Node, bootstrapNodes []pb.Node, identity *provider.FullIdentity, alpha int, rt *RoutingTable) (*Kademlia, error) { k := &Kademlia{ log: log, alpha: alpha, diff --git a/pkg/kademlia/kademlia_test.go b/pkg/kademlia/kademlia_test.go index 8c4269be7..f6f82e2d9 100644 --- a/pkg/kademlia/kademlia_test.go +++ b/pkg/kademlia/kademlia_test.go @@ -18,6 +18,8 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "google.golang.org/grpc" @@ -29,6 +31,7 @@ import ( "storj.io/storj/pkg/pb" "storj.io/storj/pkg/provider" "storj.io/storj/pkg/storj" + "storj.io/storj/storage/teststore" ) const ( @@ -50,7 +53,7 @@ func TestNewKademlia(t *testing.T) { { id: func() *identity.FullIdentity { id, err := testidentity.NewTestIdentity(ctx) - assert.NoError(t, err) + require.NoError(t, err) return id }(), bn: []pb.Node{{Id: teststorj.NodeIDFromString("foo")}}, @@ -59,7 +62,7 @@ func TestNewKademlia(t *testing.T) { { id: func() *provider.FullIdentity { id, err := testidentity.NewTestIdentity(ctx) - assert.NoError(t, err) + require.NoError(t, err) return id }(), bn: []pb.Node{{Id: teststorj.NodeIDFromString("foo")}}, @@ -70,8 +73,8 @@ func TestNewKademlia(t *testing.T) { for i, v := range cases { dir := filepath.Join(rootdir, strconv.Itoa(i)) - kad, err := NewKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, v.bn, v.addr, nil, v.id, dir, defaultAlpha) - assert.NoError(t, err) + kad, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, v.bn, v.addr, nil, v.id, dir, defaultAlpha) + require.NoError(t, err) assert.Equal(t, v.expectedErr, err) assert.Equal(t, kad.bootstrapNodes, v.bn) assert.NotNil(t, kad.dialer) @@ -100,7 +103,7 @@ func TestPeerDiscovery(t *testing.T) { Email: "foo@bar.com", Wallet: "OperatorWallet", } - k, err := NewKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha) + k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, bootstrapNodes, testAddress, metadata, testID, dir, defaultAlpha) assert.NoError(t, err) rt, err := k.GetRoutingTable(ctx) assert.NoError(t, err) @@ -174,7 +177,7 @@ func testNode(t *testing.T, bn []pb.Node) (*Kademlia, *grpc.Server, func()) { dir, cleanup := mktempdir(t, "kademlia") logger := zaptest.NewLogger(t) - k, err := NewKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha) + k, err := newKademlia(logger, pb.NodeType_STORAGE, bn, lis.Addr().String(), nil, fid, dir, defaultAlpha) assert.NoError(t, err) s := node.NewServer(logger, k) // new ident opts @@ -248,7 +251,7 @@ func TestFindNear(t *testing.T) { assert.NotEqual(t, fid.ID, fid2.ID) dir, cleanup := mktempdir(t, "kademlia") defer cleanup() - k, err := NewKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, []pb.Node{{Id: fid2.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}}, lis.Addr().String(), nil, fid, dir, defaultAlpha) + k, err := newKademlia(zaptest.NewLogger(t), pb.NodeType_STORAGE, []pb.Node{{Id: fid2.ID, Address: &pb.NodeAddress{Address: lis.Addr().String()}}}, lis.Addr().String(), nil, fid, dir, defaultAlpha) assert.NoError(t, err) defer func() { assert.NoError(t, k.Disconnect()) @@ -532,3 +535,20 @@ func (mn *mockNodesServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.P atomic.AddInt32(&mn.pingCalled, 1) return &pb.PingResponse{}, nil } + +// newKademlia returns a newly configured Kademlia instance +func newKademlia(log *zap.Logger, nodeType pb.NodeType, bootstrapNodes []pb.Node, address string, metadata *pb.NodeMetadata, identity *provider.FullIdentity, path string, alpha int) (*Kademlia, error) { + self := pb.Node{ + Id: identity.ID, + Type: nodeType, + Address: &pb.NodeAddress{Address: address}, + Metadata: metadata, + } + + rt, err := NewRoutingTable(log, self, teststore.New(), teststore.New(), nil) + if err != nil { + return nil, BootstrapErr.Wrap(err) + } + + return NewService(log, self, bootstrapNodes, identity, alpha, rt) +} diff --git a/pkg/kademlia/routing.go b/pkg/kademlia/routing.go index 0021154e9..ed114aa97 100644 --- a/pkg/kademlia/routing.go +++ b/pkg/kademlia/routing.go @@ -15,7 +15,6 @@ import ( "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" - "storj.io/storj/pkg/utils" "storj.io/storj/storage" ) @@ -46,6 +45,12 @@ var firstBucketID = bucketID{ var emptyBucketID = bucketID{} +// RoutingTableConfig configures the routing table +type RoutingTableConfig struct { + BucketSize int `help:"size of each Kademlia bucket" default:"20"` + ReplacementCacheSize int `help:"size of Kademlia replacement cache" default:"5"` +} + // RoutingTable implements the RoutingTable interface type RoutingTable struct { log *zap.Logger @@ -58,13 +63,20 @@ type RoutingTable struct { replacementCache map[bucketID][]*pb.Node bucketSize int // max number of nodes stored in a kbucket = 20 (k) rcBucketSize int // replacementCache bucket max length - } // NewRoutingTable returns a newly configured instance of a RoutingTable -func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.KeyValueStore) (*RoutingTable, error) { +func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.KeyValueStore, config *RoutingTableConfig) (*RoutingTable, error) { localNode.Type.DPanicOnInvalid("new routing table") + if config == nil || config.BucketSize == 0 || config.ReplacementCacheSize == 0 { + // TODO: handle this more nicely + config = &RoutingTableConfig{ + BucketSize: 20, + ReplacementCacheSize: 5, + } + } + rt := &RoutingTable{ log: logger, self: localNode, @@ -76,8 +88,8 @@ func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.Key seen: make(map[storj.NodeID]*pb.Node), replacementCache: make(map[bucketID][]*pb.Node), - bucketSize: *flagBucketSize, - rcBucketSize: *flagReplacementCacheSize, + bucketSize: config.BucketSize, + rcBucketSize: config.ReplacementCacheSize, } ok, err := rt.addNode(&localNode) if !ok || err != nil { @@ -86,18 +98,9 @@ func NewRoutingTable(logger *zap.Logger, localNode pb.Node, kdb, ndb storage.Key return rt, nil } -// SelfClose close without closing dependencies -// TODO: rename to Close and remove Close -func (rt *RoutingTable) SelfClose() error { - return nil -} - -// Close closes underlying databases +// Close close without closing dependencies func (rt *RoutingTable) Close() error { - return utils.CombineErrors( - rt.kadBucketDB.Close(), - rt.nodeBucketDB.Close(), - ) + return nil } // Local returns the local nodes ID diff --git a/satellite/peer.go b/satellite/peer.go index 74d56cd59..9db703911 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -227,14 +227,14 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config) (* } peer.Kademlia.kdb, peer.Kademlia.ndb = dbs[0], dbs[1] - peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, peer.Kademlia.kdb, peer.Kademlia.ndb) + peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, peer.Kademlia.kdb, peer.Kademlia.ndb, &config.RoutingTableConfig) if err != nil { return nil, errs.Combine(err, peer.Close()) } } // TODO: reduce number of arguments - peer.Kademlia.Service, err = kademlia.NewWith(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable) + peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable) if err != nil { return nil, errs.Combine(err, peer.Close()) } @@ -469,7 +469,7 @@ func (peer *Peer) Close() error { errlist.Add(peer.Kademlia.Service.Close()) } if peer.Kademlia.RoutingTable != nil { - errlist.Add(peer.Kademlia.RoutingTable.SelfClose()) + errlist.Add(peer.Kademlia.RoutingTable.Close()) } if peer.Kademlia.ndb != nil || peer.Kademlia.kdb != nil { diff --git a/storagenode/peer.go b/storagenode/peer.go index 9b343bb7e..623ccb61f 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -141,13 +141,13 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config Config) (*P } kdb, ndb := peer.DB.RoutingTable() - peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb) + peer.Kademlia.RoutingTable, err = kademlia.NewRoutingTable(peer.Log.Named("routing"), self, kdb, ndb, &config.RoutingTableConfig) if err != nil { return nil, errs.Combine(err, peer.Close()) } // TODO: reduce number of arguments - peer.Kademlia.Service, err = kademlia.NewWith(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable) + peer.Kademlia.Service, err = kademlia.NewService(peer.Log.Named("kademlia"), self, []pb.Node{*in}, peer.Identity, config.Alpha, peer.Kademlia.RoutingTable) if err != nil { return nil, errs.Combine(err, peer.Close()) } @@ -244,7 +244,7 @@ func (peer *Peer) Close() error { errlist.Add(peer.Kademlia.Service.Close()) } if peer.Kademlia.RoutingTable != nil { - errlist.Add(peer.Kademlia.RoutingTable.SelfClose()) + errlist.Add(peer.Kademlia.RoutingTable.Close()) } return errlist.Err()