Overlay worker start (#190)
This is a naive implementation of the overlay worker. Future improvements / to dos: - Walk through the cache and remove nodes that don't respond - Better look ups for new nodes - Better random ID generation - Kademlia hooks for automatically adding new nodes to the cache * adding refresh cache functionality, added schedule function * update put in db * Tests passing * wip overlay tests for cache refresh * update scheduler code * update refresh function * WIP adding random lookups to refresh worker * remove quit channel * updates fire on schedule and the refresh function finds near nodes * updates to refresh function, getting more buckets and nodes * updates to refresh function and cache operations * add cancellation to context, fix k number of nodes in lookups
This commit is contained in:
parent
2c500e7ba2
commit
4c6d359473
2
Gopkg.lock
generated
2
Gopkg.lock
generated
@ -1011,6 +1011,6 @@
|
||||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "9c05cf2ea65617806454427ce9caa5a36376912fd04575f7e0dcb554f257f6f1"
|
||||
inputs-digest = "36e51a4a608646321bf8a68a1d5ec527da814b5caaacddf3f9868d8676034080"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
@ -5,6 +5,8 @@ package overlay
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"crypto/rand"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
@ -108,56 +110,67 @@ func (o *Cache) Bootstrap(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// called after kademlia is bootstrapped
|
||||
// needs to take RoutingTable and start to persist it into the cache
|
||||
// take bootstrap node
|
||||
// get their route table
|
||||
// loop through nodes in RT and get THEIR route table
|
||||
// keep going forever and ever
|
||||
|
||||
// Other Possibilities: Randomly generate node ID's to ask for?
|
||||
|
||||
_, err = o.DHT.GetRoutingTable(ctx)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Refresh walks the network looking for new nodes and pings existing nodes to eliminate stale addresses
|
||||
// Refresh updates the cache db with the current DHT.
|
||||
// We currently do not penalize nodes that are unresponsive,
|
||||
// but should in the future.
|
||||
func (o *Cache) Refresh(ctx context.Context) error {
|
||||
// iterate over all nodes
|
||||
// compare responses to find new nodes
|
||||
// listen for responses from existing nodes
|
||||
// if no response from existing, then mark it as offline for time period
|
||||
// if responds, it refreshes in DHT
|
||||
_, rtErr := o.DHT.GetRoutingTable(ctx)
|
||||
|
||||
if rtErr != nil {
|
||||
return rtErr
|
||||
}
|
||||
|
||||
_, err := o.DHT.GetNodes(ctx, "0", 128)
|
||||
|
||||
log.Print("starting cache refresh")
|
||||
r, err := randomID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Walk iterates over buckets to traverse the network
|
||||
func (o *Cache) Walk(ctx context.Context) error {
|
||||
nodes, err := o.DHT.GetNodes(ctx, "0", 128)
|
||||
rid := kademlia.NodeID(r)
|
||||
near, err := o.DHT.GetNodes(ctx, rid.String(), 128)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range nodes {
|
||||
_, err := o.DHT.FindNode(ctx, kademlia.StringToNodeID(v.Id))
|
||||
for _, node := range near {
|
||||
pinged, err := o.DHT.Ping(ctx, *node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = o.DB.Put([]byte(pinged.Id), []byte(pinged.Address.Address))
|
||||
if err != nil {
|
||||
zap.Error(ErrNodeNotFound)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Kademlia hooks to do this automatically rather than at interval
|
||||
nodes, err := o.DHT.GetNodes(ctx, "", 128)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
pinged, err := o.DHT.Ping(ctx, *node)
|
||||
if err != nil {
|
||||
zap.Error(ErrNodeNotFound)
|
||||
return err
|
||||
} else {
|
||||
err := o.DB.Put([]byte(pinged.Id), []byte(pinged.Address.Address))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Walk iterates over each node in each bucket to traverse the network
|
||||
func (o *Cache) Walk(ctx context.Context) error {
|
||||
// TODO: This should walk the cache, rather than be a duplicate of refresh
|
||||
return nil
|
||||
}
|
||||
|
||||
func randomID() ([]byte, error) {
|
||||
result := make([]byte, 64)
|
||||
_, err := rand.Read(result)
|
||||
return result, err
|
||||
}
|
||||
|
@ -6,9 +6,11 @@ package overlay
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -17,6 +19,8 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/internal/test"
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/protos/overlay"
|
||||
"storj.io/storj/storage"
|
||||
@ -36,8 +40,66 @@ const (
|
||||
mock dbClient = iota
|
||||
bolt
|
||||
_redis
|
||||
testNetSize = 30
|
||||
)
|
||||
|
||||
func newTestKademlia(t *testing.T, ip, port string, d dht.DHT, b overlay.Node) *kademlia.Kademlia {
|
||||
i, err := kademlia.NewID()
|
||||
assert.NoError(t, err)
|
||||
id := kademlia.NodeID(*i)
|
||||
n := []overlay.Node{b}
|
||||
kad, err := kademlia.NewKademlia(&id, n, ip, port)
|
||||
assert.NoError(t, err)
|
||||
|
||||
return kad
|
||||
}
|
||||
|
||||
func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, overlay.Node) {
|
||||
bid, err := kademlia.NewID()
|
||||
assert.NoError(t, err)
|
||||
|
||||
bnid := kademlia.NodeID(*bid)
|
||||
dhts := []dht.DHT{}
|
||||
|
||||
p, err := strconv.Atoi(port)
|
||||
pm := strconv.Itoa(p)
|
||||
assert.NoError(t, err)
|
||||
intro, err := kademlia.GetIntroNode(bnid.String(), ip, pm)
|
||||
assert.NoError(t, err)
|
||||
|
||||
boot, err := kademlia.NewKademlia(&bnid, []overlay.Node{*intro}, ip, pm)
|
||||
|
||||
assert.NoError(t, err)
|
||||
rt, err := boot.GetRoutingTable(context.Background())
|
||||
bootNode := rt.Local()
|
||||
|
||||
err = boot.ListenAndServe()
|
||||
assert.NoError(t, err)
|
||||
p++
|
||||
|
||||
err = boot.Bootstrap(context.Background())
|
||||
assert.NoError(t, err)
|
||||
for i := 0; i < testNetSize; i++ {
|
||||
gg := strconv.Itoa(p)
|
||||
|
||||
nid, err := kademlia.NewID()
|
||||
assert.NoError(t, err)
|
||||
id := kademlia.NodeID(*nid)
|
||||
|
||||
dht, err := kademlia.NewKademlia(&id, []overlay.Node{bootNode}, ip, gg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
p++
|
||||
dhts = append(dhts, dht)
|
||||
err = dht.ListenAndServe()
|
||||
assert.NoError(t, err)
|
||||
err = dht.Bootstrap(context.Background())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
return dhts, bootNode
|
||||
}
|
||||
|
||||
var (
|
||||
getCases = []struct {
|
||||
testID string
|
||||
@ -146,6 +208,20 @@ var (
|
||||
data: test.KvStore{},
|
||||
},
|
||||
}
|
||||
|
||||
refreshCases = []struct {
|
||||
testID string
|
||||
expectedTimesCalled int
|
||||
expectedErr error
|
||||
data test.KvStore
|
||||
}{
|
||||
{
|
||||
testID: "valid update",
|
||||
expectedTimesCalled: 1,
|
||||
expectedErr: nil,
|
||||
data: test.KvStore{},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func redisTestClient(t *testing.T, data test.KvStore) storage.KeyValueStore {
|
||||
@ -313,6 +389,25 @@ func TestMockPut(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRefresh(t *testing.T) {
|
||||
for _, c := range refreshCases {
|
||||
t.Run(c.testID, func(t *testing.T) {
|
||||
dhts, b := bootstrapTestNetwork(t, "127.0.0.1", "3000")
|
||||
ctx := context.Background()
|
||||
db := test.NewMockKeyValueStore(c.data)
|
||||
dht := newTestKademlia(t, "127.0.0.1", "2999", dhts[rand.Intn(testNetSize)], b)
|
||||
|
||||
_cache := &Cache{
|
||||
DB: db,
|
||||
DHT: dht,
|
||||
}
|
||||
|
||||
err := _cache.Refresh(ctx)
|
||||
assert.Equal(t, err, c.expectedErr)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRedisOverlayCache(t *testing.T) {
|
||||
cases := []struct {
|
||||
testName, address string
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
@ -26,7 +27,8 @@ var (
|
||||
// Config is a configuration struct for everything you need to start the
|
||||
// Overlay cache responsibility.
|
||||
type Config struct {
|
||||
DatabaseURL string `help:"the database connection string to use" default:"bolt://$CONFDIR/overlay.db"`
|
||||
DatabaseURL string `help:"the database connection string to use" default:"bolt://$CONFDIR/overlay.db"`
|
||||
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"30s"`
|
||||
}
|
||||
|
||||
// Run implements the provider.Responsibility interface. Run assumes a
|
||||
@ -72,11 +74,23 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
|
||||
return err
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Duration(c.RefreshInterval))
|
||||
defer ticker.Stop()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
// TODO(jt): should there be a for loop here?
|
||||
err := cache.Refresh(ctx)
|
||||
if err != nil {
|
||||
zap.S().Fatal("cache refreshes stopped", zap.Error(err))
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err := cache.Refresh(ctx)
|
||||
if err != nil {
|
||||
zap.S().Error("Error with cache refresh: ", err)
|
||||
}
|
||||
case <- ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -89,15 +103,6 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
|
||||
metrics: monkit.Default,
|
||||
})
|
||||
|
||||
go func() {
|
||||
// TODO(jt): should there be a for loop here?
|
||||
// TODO(jt): how is this different from Refresh?
|
||||
err := cache.Walk(ctx)
|
||||
if err != nil {
|
||||
zap.S().Fatal("cache walking stopped", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
return server.Run(ctx)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user