storj/pkg/piecestore/psserver/agreementsender/agreementsender.go

131 lines
3.4 KiB
Go
Raw Normal View History

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package agreementsender
import (
"flag"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/net/context"
"google.golang.org/grpc"
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/piecestore/psserver/psdb"
"storj.io/storj/pkg/provider"
2018-11-30 13:40:13 +00:00
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/utils"
)
var (
defaultCheckInterval = flag.Duration("piecestore.agreementsender.check-interval", time.Hour, "number of seconds to sleep between agreement checks")
defaultOverlayAddr = flag.String("piecestore.agreementsender.overlay-addr", "127.0.0.1:7777", "Overlay Address")
// ASError wraps errors returned from agreementsender package
ASError = errs.Class("agreement sender error")
)
// AgreementSender maintains variables required for reading bandwidth agreements from a DB and sending them to a Payers
type AgreementSender struct {
DB *psdb.DB
overlay overlay.Client
identity *provider.FullIdentity
errs []error
}
// Initialize the Agreement Sender
func Initialize(DB *psdb.DB, identity *provider.FullIdentity) (*AgreementSender, error) {
overlay, err := overlay.NewOverlayClient(identity, *defaultOverlayAddr)
if err != nil {
return nil, err
}
return &AgreementSender{DB: DB, identity: identity, overlay: overlay}, nil
}
// Run the afreement sender with a context to cehck for cancel
func (as *AgreementSender) Run(ctx context.Context) error {
zap.S().Info("AgreementSender is starting up")
type agreementGroup struct {
satellite storj.NodeID
agreements []*psdb.Agreement
}
c := make(chan *agreementGroup, 1)
ticker := time.NewTicker(*defaultCheckInterval)
defer ticker.Stop()
go func() {
for range ticker.C {
agreementGroups, err := as.DB.GetBandwidthAllocations()
if err != nil {
zap.S().Error(err)
continue
}
// Send agreements in groups by satellite id to open less connections
for satellite, agreements := range agreementGroups {
c <- &agreementGroup{satellite, agreements}
}
}
}()
for {
select {
case <-ctx.Done():
return utils.CombineErrors(as.errs...)
case agreementGroup := <-c:
go func() {
zap.S().Infof("Sending %v agreements to satellite %s\n", len(agreementGroup.agreements), agreementGroup.satellite)
// Get satellite ip from overlay by Lookup agreementGroup.satellite
satellite, err := as.overlay.Lookup(ctx, agreementGroup.satellite)
if err != nil {
zap.S().Error(err)
return
}
// Create client from satellite ip
identOpt, err := as.identity.DialOption(storj.NodeID{})
if err != nil {
zap.S().Error(err)
return
}
conn, err := grpc.Dial(satellite.GetAddress().Address, identOpt)
if err != nil {
zap.S().Error(err)
return
}
client := pb.NewBandwidthClient(conn)
for _, agreement := range agreementGroup.agreements {
msg := &pb.RenterBandwidthAllocation{
Data: agreement.Agreement,
Signature: agreement.Signature,
}
// Send agreement to satellite
r, err := client.BandwidthAgreements(ctx, msg)
if err != nil || r.GetStatus() != pb.AgreementsSummary_OK {
zap.S().Errorf("Failed to send agreement to satellite: %+v", err)
return
}
// Delete from PSDB by signature
if err = as.DB.DeleteBandwidthAllocationBySignature(agreement.Signature); err != nil {
zap.S().Error(err)
return
}
}
}()
}
}
}