Get satellites to send BW agreements to via KAD (#985)
* moved from hardcoded overlay IP to kad
This commit is contained in:
parent
06bdcb1915
commit
ea47d27c1b
@ -144,10 +144,6 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
// TODO: this will eventually go away
|
||||
"pointer-db.auth.api-key": setupCfg.APIKey,
|
||||
|
||||
// TODO: this is a source of bugs. this value should be pulled from
|
||||
// kademlia instead
|
||||
"piecestore.agreementsender.overlay-addr": overlayAddr,
|
||||
|
||||
"log.development": true,
|
||||
"log.level": "debug",
|
||||
}
|
||||
|
@ -168,12 +168,11 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
}
|
||||
|
||||
overrides := map[string]interface{}{
|
||||
"identity.cert-path": setupCfg.Identity.CertPath,
|
||||
"identity.key-path": setupCfg.Identity.KeyPath,
|
||||
"identity.server.address": defaultServerAddr,
|
||||
"storage.path": filepath.Join(setupDir, "storage"),
|
||||
"kademlia.bootstrap-addr": defaultSatteliteAddr,
|
||||
"piecestore.agreementsender.overlay-addr": defaultSatteliteAddr,
|
||||
"identity.cert-path": setupCfg.Identity.CertPath,
|
||||
"identity.key-path": setupCfg.Identity.KeyPath,
|
||||
"identity.server.address": defaultServerAddr,
|
||||
"storage.path": filepath.Join(setupDir, "storage"),
|
||||
"kademlia.bootstrap-addr": defaultSatteliteAddr,
|
||||
}
|
||||
|
||||
return process.SaveConfig(cmd.Flags(), filepath.Join(setupDir, "config.yaml"), overrides)
|
||||
|
@ -194,10 +194,8 @@ func newNetwork(flags *Flags) (*Processes, error) {
|
||||
process.Arguments = withCommon(Arguments{
|
||||
"setup": {
|
||||
"--ca.difficulty", difficulty,
|
||||
"--piecestore.agreementsender.overlay-addr", bootstrapSatellite.Address,
|
||||
},
|
||||
"run": {
|
||||
"--piecestore.agreementsender.overlay-addr", bootstrapSatellite.Address,
|
||||
"--kademlia.bootstrap-addr", bootstrapSatellite.Address,
|
||||
"--kademlia.operator.email", fmt.Sprintf("storage%d@example.com", i),
|
||||
"--kademlia.operator.wallet", "0x0123456789012345678901234567890123456789",
|
||||
|
@ -10,121 +10,96 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||
"storj.io/storj/pkg/provider"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultCheckInterval = flag.Duration("piecestore.agreementsender.check-interval", time.Hour, "number of seconds to sleep between agreement checks")
|
||||
defaultOverlayAddr = flag.String("piecestore.agreementsender.overlay-addr", "127.0.0.1:7777", "Overlay Address")
|
||||
|
||||
//todo: cache kad responses if this interval is very small
|
||||
defaultCheckInterval = flag.Duration("piecestore.agreementsender.check-interval", time.Hour, "duration to sleep between agreement checks")
|
||||
// ASError wraps errors returned from agreementsender package
|
||||
ASError = errs.Class("agreement sender error")
|
||||
)
|
||||
|
||||
// AgreementSender maintains variables required for reading bandwidth agreements from a DB and sending them to a Payers
|
||||
type AgreementSender struct {
|
||||
DB *psdb.DB
|
||||
overlay overlay.Client
|
||||
identity *provider.FullIdentity
|
||||
errs []error
|
||||
DB *psdb.DB
|
||||
log *zap.Logger
|
||||
transport transport.Client
|
||||
kad *kademlia.Kademlia
|
||||
}
|
||||
|
||||
// Initialize the Agreement Sender
|
||||
func Initialize(DB *psdb.DB, identity *provider.FullIdentity) (*AgreementSender, error) {
|
||||
overlay, err := overlay.NewClient(identity, *defaultOverlayAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &AgreementSender{DB: DB, identity: identity, overlay: overlay}, nil
|
||||
// New creates an Agreement Sender
|
||||
func New(log *zap.Logger, DB *psdb.DB, identity *provider.FullIdentity, kad *kademlia.Kademlia) *AgreementSender {
|
||||
return &AgreementSender{DB: DB, log: log, transport: transport.NewClient(identity), kad: kad}
|
||||
}
|
||||
|
||||
// Run the afreement sender with a context to cehck for cancel
|
||||
func (as *AgreementSender) Run(ctx context.Context) error {
|
||||
zap.S().Info("AgreementSender is starting up")
|
||||
|
||||
type agreementGroup struct {
|
||||
satellite storj.NodeID
|
||||
agreements []*psdb.Agreement
|
||||
}
|
||||
|
||||
c := make(chan *agreementGroup, 1)
|
||||
|
||||
// Run the agreement sender with a context to check for cancel
|
||||
func (as *AgreementSender) Run(ctx context.Context) {
|
||||
//todo: we likely don't want to stop on err, but consider returning errors via a channel
|
||||
ticker := time.NewTicker(*defaultCheckInterval)
|
||||
defer ticker.Stop()
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
agreementGroups, err := as.DB.GetBandwidthAllocations()
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
continue
|
||||
}
|
||||
for {
|
||||
as.log.Debug("AgreementSender is running", zap.Duration("duration", *defaultCheckInterval))
|
||||
agreementGroups, err := as.DB.GetBandwidthAllocations()
|
||||
if err != nil {
|
||||
as.log.Error("Agreementsender could not retrieve bandwidth allocations", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
for satellite, agreements := range agreementGroups {
|
||||
as.sendAgreementsToSatellite(ctx, satellite, agreements)
|
||||
}
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-ctx.Done():
|
||||
as.log.Debug("AgreementSender is shutting down", zap.Error(ctx.Err()))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send agreements in groups by satellite id to open less connections
|
||||
for satellite, agreements := range agreementGroups {
|
||||
c <- &agreementGroup{satellite, agreements}
|
||||
}
|
||||
func (as *AgreementSender) sendAgreementsToSatellite(ctx context.Context, satID storj.NodeID, agreements []*psdb.Agreement) {
|
||||
as.log.Info("Sending agreements to satellite", zap.Int("number of agreements", len(agreements)), zap.String("satellite id", satID.String()))
|
||||
// Get satellite ip from kademlia
|
||||
satellite, err := as.kad.FindNode(ctx, satID)
|
||||
if err != nil {
|
||||
as.log.Error("Agreementsender could not find satellite", zap.Error(err))
|
||||
return
|
||||
}
|
||||
// Create client from satellite ip
|
||||
conn, err := as.transport.DialNode(ctx, &satellite)
|
||||
if err != nil {
|
||||
as.log.Error("Agreementsender could not dial satellite", zap.Error(err))
|
||||
return
|
||||
}
|
||||
client := pb.NewBandwidthClient(conn)
|
||||
defer func() {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
as.log.Error("Agreementsender failed to close connection", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return utils.CombineErrors(as.errs...)
|
||||
case agreementGroup := <-c:
|
||||
go func() {
|
||||
zap.S().Infof("Sending %v agreements to satellite %s\n", len(agreementGroup.agreements), agreementGroup.satellite)
|
||||
|
||||
// Get satellite ip from overlay by Lookup agreementGroup.satellite
|
||||
satellite, err := as.overlay.Lookup(ctx, agreementGroup.satellite)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create client from satellite ip
|
||||
identOpt, err := as.identity.DialOption(storj.NodeID{})
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(satellite.GetAddress().Address, identOpt)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
client := pb.NewBandwidthClient(conn)
|
||||
|
||||
for _, agreement := range agreementGroup.agreements {
|
||||
|
||||
msg := &pb.RenterBandwidthAllocation{
|
||||
Data: agreement.Agreement,
|
||||
Signature: agreement.Signature,
|
||||
}
|
||||
|
||||
// Send agreement to satellite
|
||||
r, err := client.BandwidthAgreements(ctx, msg)
|
||||
if err != nil || r.GetStatus() != pb.AgreementsSummary_OK {
|
||||
zap.S().Errorf("Failed to send agreement to satellite: %+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Delete from PSDB by signature
|
||||
if err = as.DB.DeleteBandwidthAllocationBySignature(agreement.Signature); err != nil {
|
||||
zap.S().Error(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
for _, agreement := range agreements {
|
||||
msg := &pb.RenterBandwidthAllocation{
|
||||
Data: agreement.Agreement,
|
||||
Signature: agreement.Signature,
|
||||
}
|
||||
// Send agreement to satellite
|
||||
r, err := client.BandwidthAgreements(ctx, msg)
|
||||
if err != nil || r.GetStatus() != pb.AgreementsSummary_OK {
|
||||
as.log.Error("Agreementsender failed to send agreement to satellite", zap.Error(err))
|
||||
return
|
||||
}
|
||||
// Delete from PSDB by signature
|
||||
if err = as.DB.DeleteBandwidthAllocationBySignature(agreement.Signature); err != nil {
|
||||
as.log.Error("Agreementsender failed to delete bandwidth allocation", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,60 +35,44 @@ type Config struct {
|
||||
// Run implements provider.Responsibility
|
||||
func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
//piecestore
|
||||
db, err := psdb.Open(ctx, filepath.Join(c.Path, "piece-store-data"), filepath.Join(c.Path, "piecestore.db"))
|
||||
if err != nil {
|
||||
return ServerError.Wrap(err)
|
||||
}
|
||||
|
||||
s, err := NewEndpoint(zap.L(), c, db, server.Identity().Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pb.RegisterPieceStoreRoutesServer(server.GRPC(), s)
|
||||
|
||||
// Run the agreement sender process
|
||||
asProcess, err := agreementsender.Initialize(s.DB, server.Identity())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
if err := asProcess.Run(ctx); err != nil {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
// Run the kademlia bucket refresher process
|
||||
//kademlia
|
||||
k := kademlia.LoadFromContext(ctx)
|
||||
if k == nil {
|
||||
return ServerError.New("Failed to load Kademlia from context")
|
||||
}
|
||||
|
||||
rt, err := k.GetRoutingTable(ctx)
|
||||
if err != nil {
|
||||
return ServerError.Wrap(err)
|
||||
}
|
||||
|
||||
krt, ok := rt.(*kademlia.RoutingTable)
|
||||
if !ok {
|
||||
return ServerError.New("Could not convert dht.RoutingTable to *kademlia.RoutingTable")
|
||||
}
|
||||
|
||||
refreshProcess := newService(zap.L(), c.KBucketRefreshInterval, krt, s)
|
||||
|
||||
go func() {
|
||||
if err := refreshProcess.Run(ctx); err != nil {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
log.Fatal(s.Stop(ctx))
|
||||
}()
|
||||
//agreementsender
|
||||
agreementsender := agreementsender.New(zap.L(), s.DB, server.Identity(), k)
|
||||
go agreementsender.Run(ctx)
|
||||
|
||||
defer func() { log.Fatal(s.Stop(ctx)) }()
|
||||
s.log.Info("Started Node", zap.String("ID", fmt.Sprint(server.Identity().ID)))
|
||||
return server.Run(ctx)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user