diff --git a/pkg/kademlia/kademlia.go b/pkg/kademlia/kademlia.go index b1ee960c2..26afa3f83 100644 --- a/pkg/kademlia/kademlia.go +++ b/pkg/kademlia/kademlia.go @@ -14,7 +14,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" "google.golang.org/grpc" - "storj.io/storj/pkg/dht" "storj.io/storj/pkg/node" "storj.io/storj/pkg/pb" @@ -207,8 +206,8 @@ func (k *Kademlia) Ping(ctx context.Context, node pb.Node) (pb.Node, error) { // FindNode looks up the provided NodeID first in the local Node, and if it is not found // begins searching the network for the NodeID. Returns and error if node was not found func (k *Kademlia) FindNode(ctx context.Context, ID dht.NodeID) (pb.Node, error) { - // TODO(coyle) - return pb.Node{}, NodeErr.New("TODO FindNode") + //TODO(coyle) + return pb.Node{Id: ID.String()}, NodeErr.New("TODO FindNode") } // ListenAndServe connects the kademlia node to the network and listens for incoming requests diff --git a/pkg/overlay/cache.go b/pkg/overlay/cache.go index 16f01926f..70154b43e 100644 --- a/pkg/overlay/cache.go +++ b/pkg/overlay/cache.go @@ -49,16 +49,12 @@ func NewBoltOverlayCache(dbPath string, dht dht.DHT) (*Cache, error) { if err != nil { return nil, err } - return NewOverlayCache(storelogger.New(zap.L(), db), dht), nil } // NewOverlayCache returns a new Cache func NewOverlayCache(db storage.KeyValueStore, dht dht.DHT) *Cache { - return &Cache{ - DB: db, - DHT: dht, - } + return &Cache{DB: db, DHT: dht} } // Get looks up the provided nodeID from the overlay cache @@ -71,12 +67,10 @@ func (o *Cache) Get(ctx context.Context, key string) (*pb.Node, error) { // TODO: log? return an error? return nil, nil } - na := &pb.Node{} if err := proto.Unmarshal(b, na); err != nil { return nil, err } - return na, nil } @@ -115,7 +109,6 @@ func (o *Cache) Put(nodeID string, value pb.Node) error { if err != nil { return err } - return o.DB.Put(node.IDFromString(nodeID).Bytes(), data) } @@ -125,7 +118,6 @@ func (o *Cache) Bootstrap(ctx context.Context) error { if err != nil { return OverlayError.New("Error getting nodes from DHT: %v", err) } - for _, v := range nodes { found, err := o.DHT.FindNode(ctx, node.IDFromString(v.Id)) if err != nil { @@ -154,7 +146,6 @@ func (o *Cache) Refresh(ctx context.Context) error { if err != nil { return err } - rid := node.ID(r) near, err := o.DHT.GetNodes(ctx, rid.String(), 128) if err != nil { @@ -177,7 +168,6 @@ func (o *Cache) Refresh(ctx context.Context) error { continue } } - // TODO: Kademlia hooks to do this automatically rather than at interval nodes, err := o.DHT.GetNodes(ctx, "", 128) if err != nil { @@ -200,6 +190,7 @@ func (o *Cache) Refresh(ctx context.Context) error { continue } } + return nil } diff --git a/pkg/overlay/cache_test.go b/pkg/overlay/cache_test.go index 44f1dbe76..b27b9bd4f 100644 --- a/pkg/overlay/cache_test.go +++ b/pkg/overlay/cache_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/zeebo/errs" "go.uber.org/zap/zaptest" - "storj.io/storj/pkg/dht" "storj.io/storj/pkg/kademlia" "storj.io/storj/pkg/node" @@ -66,6 +65,7 @@ func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, pb.Node) { pm := strconv.Itoa(p) assert.NoError(t, err) intro, err := kademlia.GetIntroNode(net.JoinHostPort(ip, pm)) + intro.Id = "test" assert.NoError(t, err) ca, err := provider.NewTestCA(ctx) @@ -80,8 +80,10 @@ func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, pb.Node) { assert.NoError(t, err) bootNode := rt.Local() - err = boot.ListenAndServe() - assert.NoError(t, err) + go func() { + err = boot.ListenAndServe() + assert.NoError(t, err) + }() p++ err = boot.Bootstrap(context.Background()) @@ -97,8 +99,10 @@ func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, pb.Node) { p++ dhts = append(dhts, dht) - err = dht.ListenAndServe() - assert.NoError(t, err) + go func() { + err = dht.ListenAndServe() + assert.NoError(t, err) + }() err = dht.Bootstrap(context.Background()) assert.NoError(t, err) } @@ -606,10 +610,9 @@ func TestMockPut(t *testing.T) { } func TestRefresh(t *testing.T) { - t.Skip() for _, c := range refreshCases { t.Run(c.testID, func(t *testing.T) { - dhts, b := bootstrapTestNetwork(t, "127.0.0.1", "0") + dhts, b := bootstrapTestNetwork(t, "127.0.0.1", "1024") ctx := context.Background() db := teststore.New() @@ -617,14 +620,14 @@ func TestRefresh(t *testing.T) { t.Fatal(err) } - dht := newTestKademlia(t, "127.0.0.1", "0", dhts[rand.Intn(testNetSize)], b) + dht := newTestKademlia(t, "127.0.0.1", "1024", dhts[rand.Intn(testNetSize)], b) - _cache := &Cache{ - DB: db, - DHT: dht, - } + _cache := &Cache{DB: db, DHT: dht} - err := _cache.Refresh(ctx) + err := _cache.Bootstrap(ctx) + assert.Equal(t, err, c.expectedErr) + + err = _cache.Refresh(ctx) assert.Equal(t, err, c.expectedErr) }) } diff --git a/storage/common.go b/storage/common.go index e6e7bc53b..e6edccb18 100644 --- a/storage/common.go +++ b/storage/common.go @@ -19,6 +19,9 @@ var ErrKeyNotFound = errs.Class("key not found") // ErrEmptyKey is returned when an empty key is used in Put var ErrEmptyKey = errors.New("empty key") +// ErrEmptyQueue is returned when attempting to Dequeue from an empty queue +var ErrEmptyQueue = errors.New("empty queue") + // ErrLimitExceeded is returned when request limit is exceeded var ErrLimitExceeded = errors.New("limit exceeded") @@ -71,7 +74,7 @@ type KeyValueStore interface { type Queue interface { //Enqueue add a FIFO element Enqueue(Value) error - //Dequeue removes a FIFO element + //Dequeue removes a FIFO element, returning ErrEmptyQueue if empty Dequeue() (Value, error) //Close closes the store Close() error diff --git a/storage/testqueue/queue.go b/storage/testqueue/queue.go index 9e2eab9c2..8b813197d 100644 --- a/storage/testqueue/queue.go +++ b/storage/testqueue/queue.go @@ -5,7 +5,6 @@ package testqueue import ( "container/list" - "fmt" "sync" "storj.io/storj/storage" @@ -39,7 +38,7 @@ func (q *Queue) Dequeue() (storage.Value, error) { q.s.Remove(e) // Dequeue return e.Value.(storage.Value), nil } - return nil, fmt.Errorf("queue empty") + return nil, storage.ErrEmptyQueue } //Close closes the queue