Remove unused workers (#640)

This commit is contained in:
Egon Elbre 2018-11-19 17:07:24 +02:00 committed by GitHub
parent d07433c150
commit 832317b0ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 374 deletions

View File

@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strconv"
"sync/atomic"
"testing"
"github.com/golang/protobuf/proto"
@ -393,7 +394,7 @@ func mktempdir(t *testing.T, dir string) (string, func()) {
return rootdir, cleanup
}
func startTestNodeServer() (*grpc.Server, *mockNodeServer, *provider.FullIdentity, string) {
func startTestNodeServer() (*grpc.Server, *mockNodesServer, *provider.FullIdentity, string) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, nil, nil, ""
@ -412,7 +413,7 @@ func startTestNodeServer() (*grpc.Server, *mockNodeServer, *provider.FullIdentit
return nil, nil, nil, ""
}
grpcServer := grpc.NewServer(identOpt)
mn := &mockNodeServer{queryCalled: 0}
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
go func() {
@ -423,3 +424,40 @@ func startTestNodeServer() (*grpc.Server, *mockNodeServer, *provider.FullIdentit
return grpcServer, mn, identity, lis.Addr().String()
}
func newTestServer(nn []*pb.Node) (*grpc.Server, *mockNodesServer) {
ca, err := provider.NewTestCA(context.Background())
if err != nil {
return nil, nil
}
identity, err := ca.NewIdentity()
if err != nil {
return nil, nil
}
identOpt, err := identity.ServerOption()
if err != nil {
return nil, nil
}
grpcServer := grpc.NewServer(identOpt)
mn := &mockNodesServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
return grpcServer, mn
}
type mockNodesServer struct {
queryCalled int32
pingCalled int32
returnValue []*pb.Node
}
func (mn *mockNodesServer) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
atomic.AddInt32(&mn.queryCalled, 1)
return &pb.QueryResponse{Response: mn.returnValue}, nil
}
func (mn *mockNodesServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
atomic.AddInt32(&mn.pingCalled, 1)
return &pb.PingResponse{}, nil
}

View File

@ -1,159 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package kademlia
import (
"context"
"log"
"sync"
"time"
"github.com/zeebo/errs"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
)
var (
// WorkerError is the class of errors for the worker struct
WorkerError = errs.Class("worker error")
// default timeout is the minimum timeout for worker cancellation
// 250ms was the minimum value allowing current workers to finish work
// before returning
defaultTimeout = 250 * time.Millisecond
)
// worker pops work off a priority queue and does lookups on the work received
type worker struct {
contacted map[string]bool
pq *XorQueue
mu *sync.Mutex
maxResponse time.Duration
cancel context.CancelFunc
nodeClient node.Client
find dht.NodeID
workInProgress int
k int
}
func newWorker(ctx context.Context, rt *RoutingTable, nodes []*pb.Node, nc node.Client, target dht.NodeID, k int) *worker {
pq := NewXorQueue(k)
pq.Insert(target, nodes)
return &worker{
contacted: map[string]bool{},
pq: pq,
mu: &sync.Mutex{},
maxResponse: 0 * time.Millisecond,
nodeClient: nc,
find: target,
workInProgress: 0,
k: k,
}
}
// create x workers
// have a worker that gets work off the queue
// send that work on a channel
// have workers get work available off channel
// after queue is empty and no work is in progress, close channel.
func (w *worker) work(ctx context.Context, ch chan *pb.Node) {
// grab uncontacted node from working set
// change status to inprogress
// ask node for target
// if node has target cancel ctx and send node
for {
select {
case <-ctx.Done():
return
case n := <-ch:
// network lookup for nodes
nodes := w.lookup(ctx, n)
// update our priority queue
w.update(nodes)
}
}
}
func (w *worker) getWork(ctx context.Context, ch chan *pb.Node) {
for {
if ctx.Err() != nil {
return
}
w.mu.Lock()
if w.pq.Len() <= 0 && w.workInProgress <= 0 {
w.mu.Unlock()
timeout := defaultTimeout
if timeout < (2 * w.maxResponse) {
timeout = 2 * w.maxResponse
}
time.AfterFunc(timeout, w.cancel)
return
}
if w.pq.Len() <= 0 {
w.mu.Unlock()
// if there is nothing left to get off the queue
// and the work-in-progress is not empty
// let's wait a bit for the workers to populate the queue
time.Sleep(50 * time.Millisecond)
continue
}
w.workInProgress++
node, _ := w.pq.Closest()
ch <- node
w.mu.Unlock()
}
}
func (w *worker) lookup(ctx context.Context, node *pb.Node) []*pb.Node {
start := time.Now()
if node.GetAddress() == nil {
return nil
}
nodes, err := w.nodeClient.Lookup(ctx, *node, pb.Node{Id: w.find.String()})
if err != nil {
// TODO(coyle): I think we might want to do another look up on this node or update something
// but for now let's just log and ignore.
log.Printf("Error occurred during lookup for %s on %s :: error = %s", w.find.String(), node.GetId(), err.Error())
return []*pb.Node{}
}
// add node to the previously contacted list so we don't duplicate lookups
w.mu.Lock()
w.contacted[node.GetId()] = true
w.mu.Unlock()
latency := time.Since(start)
if latency > w.maxResponse {
w.maxResponse = latency
}
return nodes
}
func (w *worker) update(nodes []*pb.Node) {
w.mu.Lock()
defer w.mu.Unlock()
uncontactedNodes := []*pb.Node{}
for _, v := range nodes {
// if we have already done a lookup on this node we don't want to do it again for this lookup loop
if !w.contacted[v.GetId()] {
uncontactedNodes = append(uncontactedNodes, v)
}
}
w.pq.Insert(w.find, uncontactedNodes)
w.workInProgress--
}
// SetCancellation adds the cancel function to the worker
func (w *worker) SetCancellation(cf context.CancelFunc) {
w.cancel = cf
}

View File

@ -1,213 +0,0 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information
package kademlia
import (
"context"
"net"
"sync/atomic"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"storj.io/storj/pkg/dht/mocks"
"storj.io/storj/pkg/node"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/provider"
)
func TestGetWork(t *testing.T) {
cases := []struct {
name string
worker *worker
expected *pb.Node
ch chan *pb.Node
}{
{
name: "test valid chore returned",
worker: newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: "1001"}}, nil, node.IDFromString("1000"), 5),
expected: &pb.Node{Id: "1001"},
ch: make(chan *pb.Node, 2),
},
{
name: "test no chore left",
worker: func() *worker {
w := newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: "foo"}}, nil, node.IDFromString("foo"), 5)
w.maxResponse = 0
w.pq.Closest()
assert.Equal(t, w.pq.Len(), 0)
return w
}(),
expected: nil,
ch: make(chan *pb.Node, 2),
},
}
for _, v := range cases {
ctx, cf := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cf()
v.worker.cancel = cf
v.worker.getWork(ctx, v.ch)
if v.expected != nil {
actual := <-v.ch
assert.Equal(t, v.expected, actual)
} else {
assert.Len(t, v.ch, 0)
}
}
}
func TestWorkCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
worker := newWorker(ctx, nil, []*pb.Node{&pb.Node{Id: "1001"}}, nil, node.IDFromString("1000"), 5)
// TODO: ensure this also works when running
cancel()
worker.work(ctx, make(chan *pb.Node))
}
func TestWorkerLookup(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockDHT := mock_dht.NewMockDHT(ctrl)
mockRT := mock_dht.NewMockRoutingTable(ctrl)
srv, mock, identity, addr := startTestNodeServer()
defer srv.Stop()
id := identity.ID.String()
cases := []struct {
name string
worker *worker
work *pb.Node
expected []*pb.Node
}{
{
name: "test valid chore returned",
worker: func() *worker {
nc, err := node.NewNodeClient(identity, pb.Node{Id: id, Address: &pb.NodeAddress{Address: "127.0.0.1:0"}}, mockDHT)
assert.NoError(t, err)
mock.returnValue = []*pb.Node{&pb.Node{Id: id}}
return newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: id}}, nc, node.IDFromString(id), 5)
}(),
work: &pb.Node{Id: id, Address: &pb.NodeAddress{Address: addr}},
expected: []*pb.Node{&pb.Node{Id: id}},
},
}
for _, v := range cases {
mockDHT.EXPECT().GetRoutingTable(gomock.Any()).Return(mockRT, nil)
mockRT.EXPECT().ConnectionSuccess(gomock.Any()).Return(nil)
actual := v.worker.lookup(context.Background(), v.work)
assert.Equal(t, v.expected, actual)
assert.Equal(t, int32(1), mock.queryCalled)
}
}
func TestUpdate(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockDHT := mock_dht.NewMockDHT(ctrl)
lis, err := net.Listen("tcp", "127.0.0.1:0")
assert.NoError(t, err)
srv, _ := newTestServer(nil)
go func() { _ = srv.Serve(lis) }()
defer srv.Stop()
cases := []struct {
name string
worker *worker
input []*pb.Node
expectedQueueLength int
expected []*pb.Node
expectedErr error
}{
{
name: "test nil nodes",
worker: func() *worker {
ca, err := provider.NewTestCA(context.Background())
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
nc, err := node.NewNodeClient(identity, pb.Node{Id: "foo", Address: &pb.NodeAddress{Address: ":7070"}}, mockDHT)
assert.NoError(t, err)
return newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: "0000"}}, nc, node.IDFromString("foo"), 2)
}(),
expectedQueueLength: 1,
input: nil,
expectedErr: WorkerError.New("nodes must not be empty"),
expected: []*pb.Node{&pb.Node{Id: "0000"}},
},
{
name: "test combined less than k",
worker: func() *worker {
ca, err := provider.NewTestCA(context.Background())
assert.NoError(t, err)
identity, err := ca.NewIdentity()
assert.NoError(t, err)
nc, err := node.NewNodeClient(identity, pb.Node{Id: "a", Address: &pb.NodeAddress{Address: ":7070"}}, mockDHT)
assert.NoError(t, err)
return newWorker(context.Background(), nil, []*pb.Node{&pb.Node{Id: "h"}}, nc, node.IDFromString("a"), 2)
}(),
expectedQueueLength: 2,
expected: []*pb.Node{&pb.Node{Id: "g"}, &pb.Node{Id: "f"}},
input: []*pb.Node{&pb.Node{Id: "f"}, &pb.Node{Id: "g"}},
expectedErr: nil,
},
}
for _, v := range cases {
v.worker.update(v.input)
assert.Equal(t, v.expectedQueueLength, v.worker.pq.Len())
i := 0
for v.worker.pq.Len() > 0 {
node, _ := v.worker.pq.Closest()
assert.Equal(t, v.expected[i], node)
i++
}
}
}
func newTestServer(nn []*pb.Node) (*grpc.Server, *mockNodeServer) {
ca, err := provider.NewTestCA(context.Background())
if err != nil {
return nil, nil
}
identity, err := ca.NewIdentity()
if err != nil {
return nil, nil
}
identOpt, err := identity.ServerOption()
if err != nil {
return nil, nil
}
grpcServer := grpc.NewServer(identOpt)
mn := &mockNodeServer{queryCalled: 0}
pb.RegisterNodesServer(grpcServer, mn)
return grpcServer, mn
}
type mockNodeServer struct {
queryCalled int32
pingCalled int32
returnValue []*pb.Node
}
func (mn *mockNodeServer) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
atomic.AddInt32(&mn.queryCalled, 1)
return &pb.QueryResponse{Response: mn.returnValue}, nil
}
func (mn *mockNodeServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
atomic.AddInt32(&mn.pingCalled, 1)
return &pb.PingResponse{}, nil
}