2018-07-19 15:48:08 +01:00
|
|
|
// Copyright (C) 2018 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information
|
|
|
|
|
2018-10-25 17:11:50 +01:00
|
|
|
package node
|
2018-07-19 15:48:08 +01:00
|
|
|
|
|
|
|
import (
|
2018-10-26 17:38:22 +01:00
|
|
|
"context"
|
2018-07-19 15:48:08 +01:00
|
|
|
"sync"
|
2018-12-06 16:05:57 +00:00
|
|
|
"sync/atomic"
|
|
|
|
"unsafe"
|
2018-10-26 15:07:02 +01:00
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
2018-10-26 17:38:22 +01:00
|
|
|
"google.golang.org/grpc"
|
2018-10-26 15:07:02 +01:00
|
|
|
|
2018-10-26 17:38:22 +01:00
|
|
|
"storj.io/storj/pkg/pb"
|
|
|
|
"storj.io/storj/pkg/provider"
|
2018-11-30 13:40:13 +00:00
|
|
|
"storj.io/storj/pkg/storj"
|
2018-10-26 17:38:22 +01:00
|
|
|
"storj.io/storj/pkg/transport"
|
2018-10-26 15:07:02 +01:00
|
|
|
"storj.io/storj/pkg/utils"
|
2018-07-19 15:48:08 +01:00
|
|
|
)
|
|
|
|
|
2018-10-26 15:07:02 +01:00
|
|
|
// Error defines a connection pool error
|
|
|
|
var Error = errs.Class("connection pool error")
|
|
|
|
|
|
|
|
// ConnectionPool is the in memory pool of node connections
|
2018-07-19 15:48:08 +01:00
|
|
|
type ConnectionPool struct {
|
2018-10-26 17:38:22 +01:00
|
|
|
tc transport.Client
|
2018-07-19 15:48:08 +01:00
|
|
|
mu sync.RWMutex
|
2018-11-29 18:39:27 +00:00
|
|
|
items map[storj.NodeID]*Conn
|
2018-07-19 15:48:08 +01:00
|
|
|
}
|
|
|
|
|
2018-10-26 17:38:22 +01:00
|
|
|
// Conn is the connection that is stored in the connection pool
|
|
|
|
type Conn struct {
|
|
|
|
addr string
|
|
|
|
|
|
|
|
dial sync.Once
|
|
|
|
client pb.NodesClient
|
2018-12-06 16:05:57 +00:00
|
|
|
grpc unsafe.Pointer //*grpc.ClientConn
|
2018-10-26 17:38:22 +01:00
|
|
|
err error
|
2018-07-19 15:48:08 +01:00
|
|
|
}
|
|
|
|
|
2018-10-26 17:38:22 +01:00
|
|
|
// NewConn intitalizes a new Conn struct with the provided address, but does not iniate a connection
|
|
|
|
func NewConn(addr string) *Conn { return &Conn{addr: addr} }
|
|
|
|
|
|
|
|
// NewConnectionPool initializes a new in memory pool
|
2018-12-22 04:51:42 +00:00
|
|
|
func NewConnectionPool(identity *provider.FullIdentity, obs ...transport.Observer) *ConnectionPool {
|
2018-10-26 17:38:22 +01:00
|
|
|
return &ConnectionPool{
|
2018-12-22 04:51:42 +00:00
|
|
|
tc: transport.NewClient(identity, obs...),
|
2018-11-29 18:39:27 +00:00
|
|
|
items: make(map[storj.NodeID]*Conn),
|
2018-10-26 17:38:22 +01:00
|
|
|
mu: sync.RWMutex{},
|
|
|
|
}
|
2018-07-19 15:48:08 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Get retrieves a node connection with the provided nodeID
|
|
|
|
// nil is returned if the NodeID is not in the connection pool
|
2018-11-29 18:39:27 +00:00
|
|
|
func (pool *ConnectionPool) Get(id storj.NodeID) (interface{}, error) {
|
2018-10-25 17:11:50 +01:00
|
|
|
pool.mu.Lock()
|
|
|
|
defer pool.mu.Unlock()
|
2018-10-26 17:38:22 +01:00
|
|
|
|
2018-11-29 18:39:27 +00:00
|
|
|
i, ok := pool.items[id]
|
2018-10-26 17:38:22 +01:00
|
|
|
if !ok {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return i, nil
|
2018-07-19 15:48:08 +01:00
|
|
|
}
|
|
|
|
|
2018-10-26 17:38:22 +01:00
|
|
|
// Disconnect deletes a connection associated with the provided NodeID
|
2018-11-29 18:39:27 +00:00
|
|
|
func (pool *ConnectionPool) Disconnect(id storj.NodeID) error {
|
2018-10-25 17:11:50 +01:00
|
|
|
pool.mu.Lock()
|
|
|
|
defer pool.mu.Unlock()
|
2018-10-26 17:38:22 +01:00
|
|
|
|
2018-11-29 18:39:27 +00:00
|
|
|
return pool.disconnect(id)
|
2018-11-20 16:54:52 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
|
2018-11-29 18:39:27 +00:00
|
|
|
func (pool *ConnectionPool) disconnect(id storj.NodeID) error {
|
2018-12-06 16:05:57 +00:00
|
|
|
conn, ok := pool.items[id]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
ptr := atomic.LoadPointer(&conn.grpc)
|
|
|
|
if ptr == nil {
|
2018-10-26 17:38:22 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-11-29 18:39:27 +00:00
|
|
|
delete(pool.items, id)
|
2018-10-26 17:38:22 +01:00
|
|
|
|
2018-12-06 16:05:57 +00:00
|
|
|
return (*grpc.ClientConn)(ptr).Close()
|
2018-07-19 15:48:08 +01:00
|
|
|
}
|
2018-10-26 15:07:02 +01:00
|
|
|
|
2018-10-26 17:38:22 +01:00
|
|
|
// Dial connects to the node with the given ID and Address returning a gRPC Node Client
|
|
|
|
func (pool *ConnectionPool) Dial(ctx context.Context, n *pb.Node) (pb.NodesClient, error) {
|
2018-11-29 18:39:27 +00:00
|
|
|
id := n.Id
|
2018-10-26 17:38:22 +01:00
|
|
|
pool.mu.Lock()
|
|
|
|
conn, ok := pool.items[id]
|
|
|
|
if !ok {
|
|
|
|
conn = NewConn(n.GetAddress().Address)
|
|
|
|
pool.items[id] = conn
|
|
|
|
}
|
|
|
|
pool.mu.Unlock()
|
|
|
|
|
|
|
|
conn.dial.Do(func() {
|
2018-12-06 16:05:57 +00:00
|
|
|
grpc, err := pool.tc.DialNode(ctx, n, grpc.WithBlock())
|
|
|
|
conn.err = err
|
2018-10-26 17:38:22 +01:00
|
|
|
if conn.err != nil {
|
|
|
|
return
|
2018-10-26 15:07:02 +01:00
|
|
|
}
|
2018-10-26 17:38:22 +01:00
|
|
|
|
2018-12-06 16:05:57 +00:00
|
|
|
atomic.StorePointer(&conn.grpc, unsafe.Pointer(grpc))
|
|
|
|
|
|
|
|
conn.client = pb.NewNodesClient(grpc)
|
2018-10-26 17:38:22 +01:00
|
|
|
})
|
|
|
|
|
|
|
|
if conn.err != nil {
|
|
|
|
return nil, conn.err
|
|
|
|
}
|
|
|
|
|
|
|
|
return conn.client, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// DisconnectAll closes all connections nodes and removes them from the connection pool
|
|
|
|
func (pool *ConnectionPool) DisconnectAll() error {
|
2018-11-20 16:54:52 +00:00
|
|
|
pool.mu.Lock()
|
|
|
|
defer pool.mu.Unlock()
|
|
|
|
|
2018-10-26 17:38:22 +01:00
|
|
|
errs := []error{}
|
|
|
|
for k := range pool.items {
|
2018-11-20 16:54:52 +00:00
|
|
|
if err := pool.disconnect(k); err != nil {
|
2018-10-26 15:07:02 +01:00
|
|
|
errs = append(errs, Error.Wrap(err))
|
|
|
|
}
|
|
|
|
}
|
2018-10-26 17:38:22 +01:00
|
|
|
|
2018-10-26 15:07:02 +01:00
|
|
|
return utils.CombineErrors(errs...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Init initializes the cache
|
|
|
|
func (pool *ConnectionPool) Init() {
|
2018-11-20 16:54:52 +00:00
|
|
|
pool.mu.Lock()
|
2018-11-29 18:39:27 +00:00
|
|
|
pool.items = make(map[storj.NodeID]*Conn)
|
2018-11-20 16:54:52 +00:00
|
|
|
pool.mu.Unlock()
|
2018-10-26 15:07:02 +01:00
|
|
|
}
|