storj/pkg/piecestore/psserver/agreementsender/agreementsender.go

154 lines
4.7 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package agreementsender
import (
"io"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/piecestore/psserver/psdb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
)
var (
// ASError wraps errors returned from agreementsender package
ASError = errs.Class("agreement sender error")
)
// AgreementSender maintains variables required for reading bandwidth agreements from a DB and sending them to a Payers
type AgreementSender struct { // TODO: rename to service
DB *psdb.DB
log *zap.Logger
transport transport.Client
kad *kademlia.Kademlia
checkInterval time.Duration
}
// TODO: take transport instead of identity as argument
// New creates an Agreement Sender
func New(log *zap.Logger, DB *psdb.DB, tc transport.Client, kad *kademlia.Kademlia, checkInterval time.Duration) *AgreementSender {
return &AgreementSender{DB: DB, log: log, transport: tc, kad: kad, checkInterval: checkInterval}
}
// Run the agreement sender with a context to check for cancel
func (as *AgreementSender) Run(ctx context.Context) error {
//todo: we likely don't want to stop on err, but consider returning errors via a channel
ticker := time.NewTicker(as.checkInterval)
defer ticker.Stop()
for {
as.log.Debug("is running", zap.Duration("duration", as.checkInterval))
agreementGroups, err := as.DB.GetBandwidthAllocations()
if err != nil {
as.log.Error("could not retrieve bandwidth allocations", zap.Error(err))
continue
}
if len(agreementGroups) > 0 {
var group errgroup.Group
// send agreement payouts
for satellite, agreements := range agreementGroups {
satellite, agreements := satellite, agreements
group.Go(func() error {
timedCtx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel()
as.SettleAgreements(timedCtx, satellite, agreements)
return nil
})
}
_ = group.Wait() // doesn't return errors
}
// Delete older payout irrespective of its status
if err = as.DB.DeleteBandwidthAllocationPayouts(); err != nil {
as.log.Error("failed to delete bandwidth allocation", zap.Error(err))
}
select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
}
}
}
// SettleAgreements uploads agreements to the satellite
func (as *AgreementSender) SettleAgreements(ctx context.Context, satelliteID storj.NodeID, agreements []*psdb.Agreement) {
as.log.Info("sending agreements to satellite", zap.Int("number of agreements", len(agreements)), zap.String("satellite id", satelliteID.String()))
satellite, err := as.kad.FindNode(ctx, satelliteID)
if err != nil {
as.log.Warn("could not find satellite", zap.String("satellite id", satelliteID.String()), zap.Error(err))
return
}
conn, err := as.transport.DialNode(ctx, &satellite)
if err != nil {
as.log.Warn("could not dial satellite", zap.String("satellite id", satelliteID.String()), zap.Error(err))
return
}
defer func() {
if err := conn.Close(); err != nil {
as.log.Warn("failed to close connection", zap.String("satellite id", satelliteID.String()), zap.Error(err))
}
}()
client, err := pb.NewBandwidthClient(conn).Settlement(ctx)
if err != nil {
as.log.Error("failed to start settlement", zap.String("satellite id", satelliteID.String()), zap.Error(err))
return
}
var group errgroup.Group
group.Go(func() error {
for _, agreement := range agreements {
err := client.Send(&pb.BandwidthSettlementRequest{
Allocation: &agreement.Agreement,
})
if err != nil {
return err
}
}
return client.CloseSend()
})
for {
response, err := client.Recv()
if err != nil {
if err == io.EOF {
break
}
as.log.Error("failed to recv response", zap.String("satellite id", satelliteID.String()), zap.Error(err))
break
}
switch response.Status {
case pb.AgreementsSummary_REJECTED:
err = as.DB.UpdateBandwidthAllocationStatus(response.SerialNumber, psdb.AgreementStatusReject)
if err != nil {
as.log.Error("error", zap.String("satellite id", satelliteID.String()), zap.Error(err))
}
case pb.AgreementsSummary_OK:
err = as.DB.UpdateBandwidthAllocationStatus(response.SerialNumber, psdb.AgreementStatusSent)
if err != nil {
as.log.Error("error", zap.String("satellite id", satelliteID.String()), zap.Error(err))
}
default:
as.log.Error("unexpected response", zap.String("satellite id", satelliteID.String()), zap.Error(err))
}
}
if err := group.Wait(); err != nil {
as.log.Error("sending agreements returned an error", zap.String("satellite id", satelliteID.String()), zap.Error(err))
}
}