2018-04-23 16:54:22 +01:00
|
|
|
// Copyright (C) 2018 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
2018-04-12 14:50:22 +01:00
|
|
|
package overlay
|
|
|
|
|
|
|
|
import (
|
2018-05-16 19:47:59 +01:00
|
|
|
"context"
|
|
|
|
"flag"
|
|
|
|
"fmt"
|
|
|
|
"net"
|
2018-06-05 22:06:37 +01:00
|
|
|
"net/http"
|
2018-05-16 19:47:59 +01:00
|
|
|
|
|
|
|
"go.uber.org/zap"
|
2018-04-12 14:50:22 +01:00
|
|
|
"google.golang.org/grpc"
|
2018-06-13 19:22:32 +01:00
|
|
|
"gopkg.in/spacemonkeygo/monkit.v2"
|
2018-05-16 19:47:59 +01:00
|
|
|
|
|
|
|
"storj.io/storj/pkg/kademlia"
|
|
|
|
proto "storj.io/storj/protos/overlay"
|
|
|
|
)
|
2018-04-23 16:54:22 +01:00
|
|
|
|
2018-05-16 19:47:59 +01:00
|
|
|
var (
|
2018-06-05 22:06:37 +01:00
|
|
|
redisAddress, redisPassword, httpPort, bootstrapIP, bootstrapPort, localPort string
|
|
|
|
db int
|
|
|
|
srvPort uint
|
2018-04-12 14:50:22 +01:00
|
|
|
)
|
|
|
|
|
2018-05-16 19:47:59 +01:00
|
|
|
func init() {
|
2018-06-05 22:06:37 +01:00
|
|
|
flag.StringVar(&httpPort, "httpPort", "", "The port for the health endpoint")
|
|
|
|
flag.StringVar(&redisAddress, "redisAddress", "", "The <IP:PORT> string to use for connection to a redis cache")
|
|
|
|
flag.StringVar(&redisPassword, "redisPassword", "", "The password used for authentication to a secured redis instance")
|
2018-05-16 19:47:59 +01:00
|
|
|
flag.IntVar(&db, "db", 0, "The network cache database")
|
2018-06-05 22:06:37 +01:00
|
|
|
flag.UintVar(&srvPort, "srvPort", 8080, "Port to listen on")
|
|
|
|
flag.StringVar(&bootstrapIP, "bootstrapIP", "", "Optional IP to bootstrap node against")
|
|
|
|
flag.StringVar(&bootstrapPort, "bootstrapPort", "", "Optional port of node to bootstrap against")
|
|
|
|
flag.StringVar(&localPort, "localPort", "8080", "Specify a different port to listen on locally")
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
|
|
|
|
2018-04-23 16:54:22 +01:00
|
|
|
// NewServer creates a new Overlay Service Server
|
2018-06-13 19:22:32 +01:00
|
|
|
func NewServer(k *kademlia.Kademlia, cache *Cache, l *zap.Logger, m *monkit.Registry) *grpc.Server {
|
2018-04-12 14:50:22 +01:00
|
|
|
grpcServer := grpc.NewServer()
|
2018-06-19 15:00:15 +01:00
|
|
|
proto.RegisterOverlayServer(grpcServer, &Server{
|
2018-06-05 22:06:37 +01:00
|
|
|
kad: k,
|
2018-06-13 19:22:32 +01:00
|
|
|
cache: cache,
|
2018-06-05 22:06:37 +01:00
|
|
|
logger: l,
|
|
|
|
metrics: m,
|
|
|
|
})
|
2018-04-12 14:50:22 +01:00
|
|
|
|
2018-04-23 16:54:22 +01:00
|
|
|
return grpcServer
|
2018-04-12 14:50:22 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewClient connects to grpc server at the provided address with the provided options
|
|
|
|
// returns a new instance of an overlay Client
|
2018-05-16 19:47:59 +01:00
|
|
|
func NewClient(serverAddr *string, opts ...grpc.DialOption) (proto.OverlayClient, error) {
|
2018-04-12 14:50:22 +01:00
|
|
|
conn, err := grpc.Dial(*serverAddr, opts...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-05-16 19:47:59 +01:00
|
|
|
return proto.NewOverlayClient(conn), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Service contains all methods needed to implement the process.Service interface
|
|
|
|
type Service struct {
|
|
|
|
logger *zap.Logger
|
|
|
|
metrics *monkit.Registry
|
|
|
|
}
|
|
|
|
|
|
|
|
// Process is the main function that executes the service
|
|
|
|
func (s *Service) Process(ctx context.Context) error {
|
2018-06-05 22:06:37 +01:00
|
|
|
// TODO
|
|
|
|
// 1. Boostrap a node on the network
|
|
|
|
// 2. Start up the overlay gRPC service
|
|
|
|
// 3. Connect to Redis
|
|
|
|
// 4. Boostrap Redis Cache
|
|
|
|
|
|
|
|
// TODO(coyle): Should add the ability to pass a configuration to change the bootstrap node
|
|
|
|
in := kademlia.GetIntroNode(bootstrapIP, bootstrapPort)
|
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
kad, err := kademlia.NewKademlia([]proto.Node{in}, "0.0.0.0", localPort)
|
2018-06-05 22:06:37 +01:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Failed to instantiate new Kademlia", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := kad.ListenAndServe(); err != nil {
|
|
|
|
s.logger.Error("Failed to ListenAndServe on new Kademlia", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := kad.Bootstrap(ctx); err != nil {
|
|
|
|
s.logger.Error("Failed to Bootstrap on new Kademlia", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2018-05-16 19:47:59 +01:00
|
|
|
|
|
|
|
// bootstrap cache
|
2018-06-13 19:22:32 +01:00
|
|
|
var cache *Cache
|
|
|
|
if redisAddress != "" {
|
|
|
|
cache, err = NewRedisOverlayCache(redisAddress, redisPassword, db, kad)
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Failed to create a new redis overlay client", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2018-05-16 19:47:59 +01:00
|
|
|
}
|
2018-06-05 22:06:37 +01:00
|
|
|
|
2018-05-16 19:47:59 +01:00
|
|
|
if err := cache.Bootstrap(ctx); err != nil {
|
|
|
|
s.logger.Error("Failed to boostrap cache", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// send off cache refreshes concurrently
|
|
|
|
go cache.Refresh(ctx)
|
|
|
|
|
2018-06-05 22:06:37 +01:00
|
|
|
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", srvPort))
|
2018-05-16 19:47:59 +01:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Failed to initialize TCP connection", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
grpcServer := NewServer(kad, cache, s.logger, s.metrics)
|
2018-06-05 22:06:37 +01:00
|
|
|
|
|
|
|
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "OK") })
|
|
|
|
go func() { http.ListenAndServe(fmt.Sprintf(":%s", httpPort), nil) }()
|
|
|
|
go cache.Walk(ctx)
|
2018-05-16 19:47:59 +01:00
|
|
|
|
2018-06-13 19:22:32 +01:00
|
|
|
// If the passed context times out or is cancelled, shutdown the gRPC server
|
|
|
|
go func() {
|
|
|
|
if _, ok := <-ctx.Done(); !ok {
|
|
|
|
grpcServer.GracefulStop()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// If `grpcServer.Serve(...)` returns an error, shutdown/cleanup the gRPC server
|
2018-05-16 19:47:59 +01:00
|
|
|
defer grpcServer.GracefulStop()
|
|
|
|
return grpcServer.Serve(lis)
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetLogger adds the initialized logger to the Service
|
|
|
|
func (s *Service) SetLogger(l *zap.Logger) error {
|
|
|
|
s.logger = l
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetMetricHandler adds the initialized metric handler to the Service
|
|
|
|
func (s *Service) SetMetricHandler(m *monkit.Registry) error {
|
|
|
|
s.metrics = m
|
|
|
|
return nil
|
2018-04-12 14:50:22 +01:00
|
|
|
}
|
2018-05-30 15:03:44 +01:00
|
|
|
|
|
|
|
// InstanceID implements Service.InstanceID
|
|
|
|
func (s *Service) InstanceID() string { return "" }
|