Verify overlay cache is accurately and fully tested (#540)
* added intro node id, moved port range to 1024, listen in go routines
This commit is contained in:
parent
f28af4cdfb
commit
d3c347a0ac
@ -14,7 +14,6 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/node"
|
||||
"storj.io/storj/pkg/pb"
|
||||
@ -207,8 +206,8 @@ func (k *Kademlia) Ping(ctx context.Context, node pb.Node) (pb.Node, error) {
|
||||
// FindNode looks up the provided NodeID first in the local Node, and if it is not found
|
||||
// begins searching the network for the NodeID. Returns and error if node was not found
|
||||
func (k *Kademlia) FindNode(ctx context.Context, ID dht.NodeID) (pb.Node, error) {
|
||||
// TODO(coyle)
|
||||
return pb.Node{}, NodeErr.New("TODO FindNode")
|
||||
//TODO(coyle)
|
||||
return pb.Node{Id: ID.String()}, NodeErr.New("TODO FindNode")
|
||||
}
|
||||
|
||||
// ListenAndServe connects the kademlia node to the network and listens for incoming requests
|
||||
|
@ -49,16 +49,12 @@ func NewBoltOverlayCache(dbPath string, dht dht.DHT) (*Cache, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewOverlayCache(storelogger.New(zap.L(), db), dht), nil
|
||||
}
|
||||
|
||||
// NewOverlayCache returns a new Cache
|
||||
func NewOverlayCache(db storage.KeyValueStore, dht dht.DHT) *Cache {
|
||||
return &Cache{
|
||||
DB: db,
|
||||
DHT: dht,
|
||||
}
|
||||
return &Cache{DB: db, DHT: dht}
|
||||
}
|
||||
|
||||
// Get looks up the provided nodeID from the overlay cache
|
||||
@ -71,12 +67,10 @@ func (o *Cache) Get(ctx context.Context, key string) (*pb.Node, error) {
|
||||
// TODO: log? return an error?
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
na := &pb.Node{}
|
||||
if err := proto.Unmarshal(b, na); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return na, nil
|
||||
}
|
||||
|
||||
@ -115,7 +109,6 @@ func (o *Cache) Put(nodeID string, value pb.Node) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return o.DB.Put(node.IDFromString(nodeID).Bytes(), data)
|
||||
}
|
||||
|
||||
@ -125,7 +118,6 @@ func (o *Cache) Bootstrap(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return OverlayError.New("Error getting nodes from DHT: %v", err)
|
||||
}
|
||||
|
||||
for _, v := range nodes {
|
||||
found, err := o.DHT.FindNode(ctx, node.IDFromString(v.Id))
|
||||
if err != nil {
|
||||
@ -154,7 +146,6 @@ func (o *Cache) Refresh(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rid := node.ID(r)
|
||||
near, err := o.DHT.GetNodes(ctx, rid.String(), 128)
|
||||
if err != nil {
|
||||
@ -177,7 +168,6 @@ func (o *Cache) Refresh(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Kademlia hooks to do this automatically rather than at interval
|
||||
nodes, err := o.DHT.GetNodes(ctx, "", 128)
|
||||
if err != nil {
|
||||
@ -200,6 +190,7 @@ func (o *Cache) Refresh(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/storj/pkg/dht"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/node"
|
||||
@ -66,6 +65,7 @@ func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, pb.Node) {
|
||||
pm := strconv.Itoa(p)
|
||||
assert.NoError(t, err)
|
||||
intro, err := kademlia.GetIntroNode(net.JoinHostPort(ip, pm))
|
||||
intro.Id = "test"
|
||||
assert.NoError(t, err)
|
||||
|
||||
ca, err := provider.NewTestCA(ctx)
|
||||
@ -80,8 +80,10 @@ func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, pb.Node) {
|
||||
assert.NoError(t, err)
|
||||
bootNode := rt.Local()
|
||||
|
||||
err = boot.ListenAndServe()
|
||||
assert.NoError(t, err)
|
||||
go func() {
|
||||
err = boot.ListenAndServe()
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
p++
|
||||
|
||||
err = boot.Bootstrap(context.Background())
|
||||
@ -97,8 +99,10 @@ func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, pb.Node) {
|
||||
|
||||
p++
|
||||
dhts = append(dhts, dht)
|
||||
err = dht.ListenAndServe()
|
||||
assert.NoError(t, err)
|
||||
go func() {
|
||||
err = dht.ListenAndServe()
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
err = dht.Bootstrap(context.Background())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
@ -606,10 +610,9 @@ func TestMockPut(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRefresh(t *testing.T) {
|
||||
t.Skip()
|
||||
for _, c := range refreshCases {
|
||||
t.Run(c.testID, func(t *testing.T) {
|
||||
dhts, b := bootstrapTestNetwork(t, "127.0.0.1", "0")
|
||||
dhts, b := bootstrapTestNetwork(t, "127.0.0.1", "1024")
|
||||
ctx := context.Background()
|
||||
|
||||
db := teststore.New()
|
||||
@ -617,14 +620,14 @@ func TestRefresh(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dht := newTestKademlia(t, "127.0.0.1", "0", dhts[rand.Intn(testNetSize)], b)
|
||||
dht := newTestKademlia(t, "127.0.0.1", "1024", dhts[rand.Intn(testNetSize)], b)
|
||||
|
||||
_cache := &Cache{
|
||||
DB: db,
|
||||
DHT: dht,
|
||||
}
|
||||
_cache := &Cache{DB: db, DHT: dht}
|
||||
|
||||
err := _cache.Refresh(ctx)
|
||||
err := _cache.Bootstrap(ctx)
|
||||
assert.Equal(t, err, c.expectedErr)
|
||||
|
||||
err = _cache.Refresh(ctx)
|
||||
assert.Equal(t, err, c.expectedErr)
|
||||
})
|
||||
}
|
||||
|
@ -19,6 +19,9 @@ var ErrKeyNotFound = errs.Class("key not found")
|
||||
// ErrEmptyKey is returned when an empty key is used in Put
|
||||
var ErrEmptyKey = errors.New("empty key")
|
||||
|
||||
// ErrEmptyQueue is returned when attempting to Dequeue from an empty queue
|
||||
var ErrEmptyQueue = errors.New("empty queue")
|
||||
|
||||
// ErrLimitExceeded is returned when request limit is exceeded
|
||||
var ErrLimitExceeded = errors.New("limit exceeded")
|
||||
|
||||
@ -71,7 +74,7 @@ type KeyValueStore interface {
|
||||
type Queue interface {
|
||||
//Enqueue add a FIFO element
|
||||
Enqueue(Value) error
|
||||
//Dequeue removes a FIFO element
|
||||
//Dequeue removes a FIFO element, returning ErrEmptyQueue if empty
|
||||
Dequeue() (Value, error)
|
||||
//Close closes the store
|
||||
Close() error
|
||||
|
@ -5,7 +5,6 @@ package testqueue
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"storj.io/storj/storage"
|
||||
@ -39,7 +38,7 @@ func (q *Queue) Dequeue() (storage.Value, error) {
|
||||
q.s.Remove(e) // Dequeue
|
||||
return e.Value.(storage.Value), nil
|
||||
}
|
||||
return nil, fmt.Errorf("queue empty")
|
||||
return nil, storage.ErrEmptyQueue
|
||||
}
|
||||
|
||||
//Close closes the queue
|
||||
|
Loading…
Reference in New Issue
Block a user