storagenode/heldamount: payment receipt added to monthly paystub, heldamount.service separated for service and endpoint

Change-Id: Id759586c6362edbef34c230d4f0d2585c11c9b47
This commit is contained in:
Qweder93 2020-07-02 16:54:32 +03:00 committed by Nikolai Siedov
parent 5786595d14
commit ac716e1514
10 changed files with 350 additions and 333 deletions

View File

@ -52,7 +52,7 @@ func (heldAmount *HeldAmount) PayStubMonthly(w http.ResponseWriter, r *http.Requ
id := queryParams.Get("id")
if id == "" {
payStubs, err := heldAmount.service.AllPayStubsMonthlyCached(ctx, period)
payStubs, err := heldAmount.service.AllPayStubsMonthly(ctx, period)
if err != nil {
heldAmount.serveJSONError(w, http.StatusInternalServerError, ErrHeldAmountAPI.Wrap(err))
return
@ -69,7 +69,7 @@ func (heldAmount *HeldAmount) PayStubMonthly(w http.ResponseWriter, r *http.Requ
return
}
payStub, err := heldAmount.service.SatellitePayStubMonthlyCached(ctx, satelliteID, period)
payStub, err := heldAmount.service.SatellitePayStubMonthly(ctx, satelliteID, period)
if err != nil {
if heldamount.ErrNoPayStubForPeriod.Has(err) {
heldAmount.serveJSONError(w, http.StatusNotFound, ErrHeldAmountAPI.Wrap(err))
@ -112,7 +112,7 @@ func (heldAmount *HeldAmount) PayStubPeriod(w http.ResponseWriter, r *http.Reque
id := queryParams.Get("id")
if id == "" {
payStubs, err := heldAmount.service.AllPayStubsPeriodCached(ctx, start, end)
payStubs, err := heldAmount.service.AllPayStubsPeriod(ctx, start, end)
if err != nil {
if heldamount.ErrBadPeriod.Has(err) {
heldAmount.serveJSONError(w, http.StatusBadRequest, ErrHeldAmountAPI.Wrap(err))
@ -134,7 +134,7 @@ func (heldAmount *HeldAmount) PayStubPeriod(w http.ResponseWriter, r *http.Reque
return
}
payStubs, err := heldAmount.service.SatellitePayStubPeriodCached(ctx, satelliteID, start, end)
payStubs, err := heldAmount.service.SatellitePayStubPeriod(ctx, satelliteID, start, end)
if err != nil {
if heldamount.ErrBadPeriod.Has(err) {
heldAmount.serveJSONError(w, http.StatusBadRequest, ErrHeldAmountAPI.Wrap(err))

View File

@ -150,13 +150,13 @@ func TestStorageNodeApi(t *testing.T) {
CurrentMonthEstimatedAmount: estimated1 + estimated2,
CurrentMonthHeld: int64(sum1 + sum2 - sum1AfterHeld - sum2AfterHeld),
PreviousMonthPayout: heldamount.PayoutMonthly{
EgressBandwidth: 0,
EgressPayout: 0,
EgressRepairAudit: 0,
RepairAuditPayout: 0,
DiskSpace: 0,
DiskSpaceAmount: 0,
HeldPercentRate: 0,
EgressBandwidth: 0,
EgressBandwidthPayout: 0,
EgressRepairAudit: 0,
EgressRepairAuditPayout: 0,
DiskSpace: 0,
DiskSpacePayout: 0,
HeldRate: 0,
},
})
require.NoError(t, err)
@ -180,13 +180,13 @@ func TestStorageNodeApi(t *testing.T) {
CurrentMonthEstimatedAmount: estimated1,
CurrentMonthHeld: int64(sum1 - sum1AfterHeld),
PreviousMonthPayout: heldamount.PayoutMonthly{
EgressBandwidth: 0,
EgressPayout: 0,
EgressRepairAudit: 0,
RepairAuditPayout: 0,
DiskSpace: 0,
DiskSpaceAmount: 0,
HeldPercentRate: 75,
EgressBandwidth: 0,
EgressBandwidthPayout: 0,
EgressRepairAudit: 0,
EgressRepairAuditPayout: 0,
DiskSpace: 0,
DiskSpacePayout: 0,
HeldRate: 75,
},
})
require.NoError(t, err)

View File

@ -407,11 +407,11 @@ func (s *Service) GetAllSatellitesEstimatedPayout(ctx context.Context) (payout h
payout.CurrentMonthEstimatedAmount += current
payout.CurrentMonthHeld += held
payout.PreviousMonthPayout.DiskSpaceAmount += previous.DiskSpaceAmount
payout.PreviousMonthPayout.DiskSpacePayout += previous.DiskSpacePayout
payout.PreviousMonthPayout.DiskSpace += previous.DiskSpace
payout.PreviousMonthPayout.EgressBandwidth += previous.EgressBandwidth
payout.PreviousMonthPayout.EgressPayout += previous.EgressPayout
payout.PreviousMonthPayout.RepairAuditPayout += previous.RepairAuditPayout
payout.PreviousMonthPayout.EgressBandwidthPayout += previous.EgressBandwidthPayout
payout.PreviousMonthPayout.EgressRepairAuditPayout += previous.EgressRepairAuditPayout
payout.PreviousMonthPayout.EgressRepairAudit += previous.EgressRepairAudit
}
@ -487,7 +487,7 @@ func (s *Service) estimatedPayoutPreviousMonth(ctx context.Context, satelliteID
}
heldRate := s.getHeldRate(stats.JoinedAt)
payoutData.HeldPercentRate = heldRate
payoutData.HeldRate = heldRate
bandwidthDaily, err := s.bandwidthDB.GetDailySatelliteRollups(ctx, satelliteID, from, to)
if err != nil {
@ -497,10 +497,10 @@ func (s *Service) estimatedPayoutPreviousMonth(ctx context.Context, satelliteID
for i := 0; i < len(bandwidthDaily); i++ {
payoutData.EgressBandwidth += bandwidthDaily[i].Egress.Usage
usagePayout := float64(bandwidthDaily[i].Egress.Usage*priceModel.EgressBandwidth*heldRate/100) / math.Pow10(12)
payoutData.EgressPayout += int64(usagePayout)
payoutData.EgressBandwidthPayout += int64(usagePayout)
payoutData.EgressRepairAudit += bandwidthDaily[i].Egress.Audit + bandwidthDaily[i].Egress.Repair
repairAuditPayout := float64((bandwidthDaily[i].Egress.Audit*priceModel.AuditBandwidth+bandwidthDaily[i].Egress.Repair*priceModel.RepairBandwidth)*heldRate/100) / math.Pow10(12)
payoutData.RepairAuditPayout += int64(repairAuditPayout)
payoutData.EgressRepairAuditPayout += int64(repairAuditPayout)
}
storageDaily, err := s.storageUsageDB.GetDaily(ctx, satelliteID, from, to)
@ -510,7 +510,7 @@ func (s *Service) estimatedPayoutPreviousMonth(ctx context.Context, satelliteID
for j := 0; j < len(storageDaily); j++ {
payoutData.DiskSpace += storageDaily[j].AtRestTotal
payoutData.DiskSpaceAmount += int64(storageDaily[j].AtRestTotal / 730 / math.Pow10(12) * float64(priceModel.DiskSpace*heldRate/100))
payoutData.DiskSpacePayout += int64(storageDaily[j].AtRestTotal / 730 / math.Pow10(12) * float64(priceModel.DiskSpace*heldRate/100))
}
return payoutData, nil

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/storj/storagenode"
@ -138,45 +137,6 @@ func TestHeldAmountDB(t *testing.T) {
assert.NoError(t, err)
})
t.Run("Test GetPayment", func(t *testing.T) {
paym, err := heldAmount.GetPayment(ctx, satelliteID, period)
assert.NoError(t, err)
assert.Equal(t, paym.Created, payment.Created)
assert.Equal(t, paym.SatelliteID, payment.SatelliteID)
assert.Equal(t, paym.Period, payment.Period)
assert.Equal(t, paym.ID, payment.ID)
assert.Equal(t, paym.Amount, payment.Amount)
assert.Equal(t, paym.Notes, payment.Notes)
assert.Equal(t, paym.Receipt, payment.Receipt)
paym, err = heldAmount.GetPayment(ctx, satelliteID, "")
assert.Error(t, err)
assert.Equal(t, true, heldamount.ErrNoPayStubForPeriod.Has(err))
assert.Nil(t, paym)
paym, err = heldAmount.GetPayment(ctx, storj.NodeID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, period)
assert.Error(t, err)
assert.Equal(t, true, heldamount.ErrNoPayStubForPeriod.Has(err))
assert.Nil(t, paym)
})
t.Run("Test StorePayment", func(t *testing.T) {
payments, err := heldAmount.AllPayments(ctx, period)
assert.NoError(t, err)
assert.Equal(t, 1, len(payments))
assert.Equal(t, payments[0].Created, payment.Created)
assert.Equal(t, payments[0].SatelliteID, payment.SatelliteID)
assert.Equal(t, payments[0].Period, payment.Period)
assert.Equal(t, payments[0].ID, payment.ID)
assert.Equal(t, payments[0].Amount, payment.Amount)
assert.Equal(t, payments[0].Notes, payment.Notes)
assert.Equal(t, payments[0].Receipt, payment.Receipt)
payments, err = heldAmount.AllPayments(ctx, "")
assert.NoError(t, err)
assert.Equal(t, len(payments), 0)
})
t.Run("Test SatellitesHeldbackHistory", func(t *testing.T) {
heldback, err := heldAmount.SatellitesHeldbackHistory(ctx, satelliteID)
assert.NoError(t, err)
@ -233,7 +193,7 @@ func TestSatellitePayStubPeriodCached(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
heldAmountDB := db.HeldAmount()
reputationDB := db.Reputation()
service := heldamount.NewService(nil, heldAmountDB, reputationDB, rpc.Dialer{}, nil)
service := heldamount.NewService(nil, heldAmountDB, reputationDB, nil)
payStub := heldamount.PayStub{
SatelliteID: storj.NodeID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
@ -264,15 +224,15 @@ func TestSatellitePayStubPeriodCached(t *testing.T) {
require.NoError(t, err)
}
payStubs, err := service.SatellitePayStubPeriodCached(ctx, payStub.SatelliteID, "2020-01", "2020-03")
payStubs, err := service.SatellitePayStubPeriod(ctx, payStub.SatelliteID, "2020-01", "2020-03")
require.NoError(t, err)
require.Equal(t, 3, len(payStubs))
payStubs, err = service.SatellitePayStubPeriodCached(ctx, payStub.SatelliteID, "2019-01", "2021-03")
payStubs, err = service.SatellitePayStubPeriod(ctx, payStub.SatelliteID, "2019-01", "2021-03")
require.NoError(t, err)
require.Equal(t, 3, len(payStubs))
payStubs, err = service.SatellitePayStubPeriodCached(ctx, payStub.SatelliteID, "2019-01", "2020-01")
payStubs, err = service.SatellitePayStubPeriod(ctx, payStub.SatelliteID, "2019-01", "2020-01")
require.NoError(t, err)
require.Equal(t, 1, len(payStubs))
})
@ -282,7 +242,7 @@ func TestAllPayStubPeriodCached(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
heldAmountDB := db.HeldAmount()
reputationDB := db.Reputation()
service := heldamount.NewService(nil, heldAmountDB, reputationDB, rpc.Dialer{}, nil)
service := heldamount.NewService(nil, heldAmountDB, reputationDB, nil)
payStub := heldamount.PayStub{
SatelliteID: storj.NodeID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
@ -316,19 +276,19 @@ func TestAllPayStubPeriodCached(t *testing.T) {
}
}
payStubs, err := service.AllPayStubsPeriodCached(ctx, "2020-01", "2020-03")
payStubs, err := service.AllPayStubsPeriod(ctx, "2020-01", "2020-03")
require.NoError(t, err)
require.Equal(t, 9, len(payStubs))
payStubs, err = service.AllPayStubsPeriodCached(ctx, "2019-01", "2021-03")
payStubs, err = service.AllPayStubsPeriod(ctx, "2019-01", "2021-03")
require.NoError(t, err)
require.Equal(t, 9, len(payStubs))
payStubs, err = service.AllPayStubsPeriodCached(ctx, "2019-01", "2020-01")
payStubs, err = service.AllPayStubsPeriod(ctx, "2019-01", "2020-01")
require.NoError(t, err)
require.Equal(t, 3, len(payStubs))
payStubs, err = service.AllPayStubsPeriodCached(ctx, "2019-01", "2019-01")
payStubs, err = service.AllPayStubsPeriod(ctx, "2019-01", "2019-01")
require.NoError(t, err)
require.Equal(t, 0, len(payStubs))
})

View File

@ -0,0 +1,236 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package heldamount
import (
"context"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/storj/private/date"
"storj.io/storj/storagenode/trust"
)
// Client encapsulates HeldAmountClient with underlying connection.
//
// architecture: Client
type Client struct {
conn *rpc.Conn
pb.DRPCHeldAmountClient
}
// Close closes underlying client connection.
func (c *Client) Close() error {
return c.conn.Close()
}
// Endpoint retrieves info from satellites using an rpc client.
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
dialer rpc.Dialer
trust *trust.Pool
}
// NewEndpoint creates new instance of endpoint.
func NewEndpoint(log *zap.Logger, dialer rpc.Dialer, trust *trust.Pool) *Endpoint {
return &Endpoint{
log: log,
dialer: dialer,
trust: trust,
}
}
// GetPaystub retrieves held amount for particular satellite from satellite using RPC.
func (endpoint *Endpoint) GetPaystub(ctx context.Context, satelliteID storj.NodeID, period string) (_ *PayStub, err error) {
defer mon.Task()(&ctx)(&err)
client, err := endpoint.dial(ctx, satelliteID)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
defer func() { err = errs.Combine(err, client.Close()) }()
requestedPeriod, err := date.PeriodToTime(period)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
resp, err := client.GetPayStub(ctx, &pb.GetHeldAmountRequest{Period: requestedPeriod})
if err != nil {
if rpcstatus.Code(err) == rpcstatus.OutOfRange {
return nil, ErrNoPayStubForPeriod.Wrap(err)
}
return nil, ErrHeldAmountService.Wrap(err)
}
return &PayStub{
Period: period[0:7],
SatelliteID: satelliteID,
Created: resp.CreatedAt,
Codes: resp.Codes,
UsageAtRest: resp.UsageAtRest,
UsageGet: resp.UsageGet,
UsagePut: resp.UsagePut,
UsageGetRepair: resp.UsageGetRepair,
UsagePutRepair: resp.UsagePutRepair,
UsageGetAudit: resp.UsageGetAudit,
CompAtRest: resp.CompAtRest,
CompGet: resp.CompGet,
CompPut: resp.CompPut,
CompGetRepair: resp.CompGetRepair,
CompPutRepair: resp.CompPutRepair,
CompGetAudit: resp.CompGetAudit,
SurgePercent: resp.SurgePercent,
Held: resp.Held,
Owed: resp.Owed,
Disposed: resp.Disposed,
Paid: resp.Paid,
}, nil
}
// GetAllPaystubs retrieves all paystubs for particular satellite.
func (endpoint *Endpoint) GetAllPaystubs(ctx context.Context, satelliteID storj.NodeID) (_ []PayStub, err error) {
defer mon.Task()(&ctx)(&err)
client, err := endpoint.dial(ctx, satelliteID)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
defer func() { err = errs.Combine(err, client.Close()) }()
resp, err := client.GetAllPaystubs(ctx, &pb.GetAllPaystubsRequest{})
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
var payStubs []PayStub
for i := 0; i < len(resp.Paystub); i++ {
paystub := PayStub{
Period: resp.Paystub[i].Period.String()[0:7],
SatelliteID: satelliteID,
Created: resp.Paystub[i].CreatedAt,
Codes: resp.Paystub[i].Codes,
UsageAtRest: resp.Paystub[i].UsageAtRest,
UsageGet: resp.Paystub[i].UsageGet,
UsagePut: resp.Paystub[i].UsagePut,
UsageGetRepair: resp.Paystub[i].UsageGetRepair,
UsagePutRepair: resp.Paystub[i].UsagePutRepair,
UsageGetAudit: resp.Paystub[i].UsageGetAudit,
CompAtRest: resp.Paystub[i].CompAtRest,
CompGet: resp.Paystub[i].CompGet,
CompPut: resp.Paystub[i].CompPut,
CompGetRepair: resp.Paystub[i].CompGetRepair,
CompPutRepair: resp.Paystub[i].CompPutRepair,
CompGetAudit: resp.Paystub[i].CompGetAudit,
SurgePercent: resp.Paystub[i].SurgePercent,
Held: resp.Paystub[i].Held,
Owed: resp.Paystub[i].Owed,
Disposed: resp.Paystub[i].Disposed,
Paid: resp.Paystub[i].Paid,
}
payStubs = append(payStubs, paystub)
}
return payStubs, nil
}
// GetPayment retrieves payment data from particular satellite using grpc.
func (endpoint *Endpoint) GetPayment(ctx context.Context, satelliteID storj.NodeID, period string) (_ *Payment, err error) {
defer mon.Task()(&ctx)(&err)
client, err := endpoint.dial(ctx, satelliteID)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
defer func() { err = errs.Combine(err, client.Close()) }()
requestedPeriod, err := date.PeriodToTime(period)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
resp, err := client.GetPayment(ctx, &pb.GetPaymentRequest{Period: requestedPeriod})
if err != nil {
if rpcstatus.Code(err) == rpcstatus.OutOfRange {
return nil, nil
}
return nil, ErrHeldAmountService.Wrap(err)
}
return &Payment{
ID: resp.Id,
Created: resp.CreatedAt,
SatelliteID: satelliteID,
Period: period[0:7],
Amount: resp.Amount,
Receipt: resp.Receipt,
Notes: resp.Notes,
}, nil
}
// GetAllPayments retrieves all payments for particular satellite.
func (endpoint *Endpoint) GetAllPayments(ctx context.Context, satelliteID storj.NodeID) (_ []Payment, err error) {
defer mon.Task()(&ctx)(&err)
client, err := endpoint.dial(ctx, satelliteID)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
defer func() { err = errs.Combine(err, client.Close()) }()
resp, err := client.GetAllPayments(ctx, &pb.GetAllPaymentsRequest{})
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
var payments []Payment
for i := 0; i < len(resp.Payment); i++ {
payment := Payment{
ID: resp.Payment[i].Id,
Created: resp.Payment[i].CreatedAt,
SatelliteID: satelliteID,
Period: resp.Payment[i].Period.String()[0:7],
Amount: resp.Payment[i].Amount,
Receipt: resp.Payment[i].Receipt,
Notes: resp.Payment[i].Notes,
}
payments = append(payments, payment)
}
return payments, nil
}
// dial dials the HeldAmount client for the satellite by id
func (endpoint *Endpoint) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) {
defer mon.Task()(&ctx)(&err)
nodeurl, err := endpoint.trust.GetNodeURL(ctx, satelliteID)
if err != nil {
return nil, errs.New("unable to find satellite %s: %w", satelliteID, err)
}
conn, err := endpoint.dialer.DialNodeURL(ctx, nodeurl)
if err != nil {
return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err)
}
return &Client{
conn: conn,
DRPCHeldAmountClient: pb.NewDRPCHeldAmountClient(conn),
}, nil
}

View File

@ -32,10 +32,6 @@ type DB interface {
AllPeriods(ctx context.Context) ([]string, error)
// StorePayment inserts or updates payment into the DB
StorePayment(ctx context.Context, payment Payment) error
// GetPayment retrieves payment stats for specific satellite in specific period.
GetPayment(ctx context.Context, satelliteID storj.NodeID, period string) (*Payment, error)
// AllPayments retrieves payment stats from all satellites in specific period from DB.
AllPayments(ctx context.Context, period string) ([]Payment, error)
}
// ErrNoPayStubForPeriod represents errors from the heldamount database.
@ -64,6 +60,7 @@ type PayStub struct {
Owed int64 `json:"owed"`
Disposed int64 `json:"disposed"`
Paid int64 `json:"paid"`
Receipt string `json:"receipt"`
}
// AmountPeriod is node's held amount for period.
@ -81,13 +78,13 @@ type EstimatedPayout struct {
// PayoutMonthly contains bandwidth and payout amount for month.
type PayoutMonthly struct {
EgressBandwidth int64 `json:"egressBandwidth"`
EgressPayout int64 `json:"egressPayout"`
EgressRepairAudit int64 `json:"egressRepairAudit"`
RepairAuditPayout int64 `json:"repairAuditPayout"`
DiskSpace float64 `json:"diskSpace"`
DiskSpaceAmount int64 `json:"diskSpaceAmount"`
HeldPercentRate int64 `json:"heldRate"`
EgressBandwidth int64 `json:"egressBandwidth"`
EgressBandwidthPayout int64 `json:"egressBandwidthPayout"`
EgressRepairAudit int64 `json:"egressRepairAudit"`
EgressRepairAuditPayout int64 `json:"egressRepairAuditPayout"`
DiskSpace float64 `json:"diskSpace"`
DiskSpacePayout int64 `json:"diskSpacePayout"`
HeldRate int64 `json:"heldRate"`
}
// Payment is node payment data for specific period.

View File

@ -14,9 +14,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/storj/private/date"
"storj.io/storj/storagenode/reputation"
@ -33,21 +30,6 @@ var (
mon = monkit.Package()
)
// Client encapsulates HeldAmountClient with underlying connection
//
// architecture: Client
type Client struct {
conn *rpc.Conn
pb.DRPCHeldAmountClient
}
// Close closes underlying client connection
func (c *Client) Close() error {
return c.conn.Close()
}
// TODO: separate service on service and endpoint.
// Service retrieves info from satellites using an rpc client
//
// architecture: Service
@ -56,121 +38,21 @@ type Service struct {
db DB
reputationDB reputation.DB
dialer rpc.Dialer
trust *trust.Pool
trust *trust.Pool
}
// NewService creates new instance of service.
func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, dialer rpc.Dialer, trust *trust.Pool) *Service {
func NewService(log *zap.Logger, db DB, reputationDB reputation.DB, trust *trust.Pool) *Service {
return &Service{
log: log,
db: db,
reputationDB: reputationDB,
dialer: dialer,
trust: trust,
}
}
// GetPaystubStats retrieves held amount for particular satellite from satellite using RPC.
func (service *Service) GetPaystubStats(ctx context.Context, satelliteID storj.NodeID, period string) (_ *PayStub, err error) {
defer mon.Task()(&ctx)(&err)
client, err := service.dial(ctx, satelliteID)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
defer func() { err = errs.Combine(err, client.Close()) }()
requestedPeriod, err := date.PeriodToTime(period)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
resp, err := client.GetPayStub(ctx, &pb.GetHeldAmountRequest{Period: requestedPeriod})
if err != nil {
if rpcstatus.Code(err) == rpcstatus.OutOfRange {
return nil, ErrNoPayStubForPeriod.Wrap(err)
}
return nil, ErrHeldAmountService.Wrap(err)
}
return &PayStub{
Period: period[0:7],
SatelliteID: satelliteID,
Created: resp.CreatedAt,
Codes: resp.Codes,
UsageAtRest: resp.UsageAtRest,
UsageGet: resp.UsageGet,
UsagePut: resp.UsagePut,
UsageGetRepair: resp.UsageGetRepair,
UsagePutRepair: resp.UsagePutRepair,
UsageGetAudit: resp.UsageGetAudit,
CompAtRest: resp.CompAtRest,
CompGet: resp.CompGet,
CompPut: resp.CompPut,
CompGetRepair: resp.CompGetRepair,
CompPutRepair: resp.CompPutRepair,
CompGetAudit: resp.CompGetAudit,
SurgePercent: resp.SurgePercent,
Held: resp.Held,
Owed: resp.Owed,
Disposed: resp.Disposed,
Paid: resp.Paid,
}, nil
}
// GetAllPaystubs retrieves all paystubs for particular satellite.
func (service *Service) GetAllPaystubs(ctx context.Context, satelliteID storj.NodeID) (_ []PayStub, err error) {
defer mon.Task()(&ctx)(&err)
client, err := service.dial(ctx, satelliteID)
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
defer func() { err = errs.Combine(err, client.Close()) }()
resp, err := client.GetAllPaystubs(ctx, &pb.GetAllPaystubsRequest{})
if err != nil {
return nil, ErrHeldAmountService.Wrap(err)
}
var payStubs []PayStub
for i := 0; i < len(resp.Paystub); i++ {
paystub := PayStub{
Period: resp.Paystub[i].Period.String()[0:7],
SatelliteID: satelliteID,
Created: resp.Paystub[i].CreatedAt,
Codes: resp.Paystub[i].Codes,
UsageAtRest: resp.Paystub[i].UsageAtRest,
UsageGet: resp.Paystub[i].UsageGet,
UsagePut: resp.Paystub[i].UsagePut,
UsageGetRepair: resp.Paystub[i].UsageGetRepair,
UsagePutRepair: resp.Paystub[i].UsagePutRepair,
UsageGetAudit: resp.Paystub[i].UsageGetAudit,
CompAtRest: resp.Paystub[i].CompAtRest,
CompGet: resp.Paystub[i].CompGet,
CompPut: resp.Paystub[i].CompPut,
CompGetRepair: resp.Paystub[i].CompGetRepair,
CompPutRepair: resp.Paystub[i].CompPutRepair,
CompGetAudit: resp.Paystub[i].CompGetAudit,
SurgePercent: resp.Paystub[i].SurgePercent,
Held: resp.Paystub[i].Held,
Owed: resp.Paystub[i].Owed,
Disposed: resp.Paystub[i].Disposed,
Paid: resp.Paystub[i].Paid,
}
payStubs = append(payStubs, paystub)
}
return payStubs, nil
}
// SatellitePayStubMonthlyCached retrieves held amount for particular satellite for selected month from storagenode database.
func (service *Service) SatellitePayStubMonthlyCached(ctx context.Context, satelliteID storj.NodeID, period string) (payStub *PayStub, err error) {
// SatellitePayStubMonthly retrieves held amount for particular satellite for selected month from storagenode database.
func (service *Service) SatellitePayStubMonthly(ctx context.Context, satelliteID storj.NodeID, period string) (payStub *PayStub, err error) {
defer mon.Task()(&ctx, &satelliteID, &period)(&err)
payStub, err = service.db.GetPayStub(ctx, satelliteID, period)
@ -181,8 +63,8 @@ func (service *Service) SatellitePayStubMonthlyCached(ctx context.Context, satel
return payStub, nil
}
// AllPayStubsMonthlyCached retrieves held amount for all satellites per selected period from storagenode database.
func (service *Service) AllPayStubsMonthlyCached(ctx context.Context, period string) (payStubs []PayStub, err error) {
// AllPayStubsMonthly retrieves held amount for all satellites per selected period from storagenode database.
func (service *Service) AllPayStubsMonthly(ctx context.Context, period string) (payStubs []PayStub, err error) {
defer mon.Task()(&ctx, &period)(&err)
payStubs, err = service.db.AllPayStubs(ctx, period)
@ -193,8 +75,8 @@ func (service *Service) AllPayStubsMonthlyCached(ctx context.Context, period str
return payStubs, nil
}
// SatellitePayStubPeriodCached retrieves held amount for all satellites for selected months from storagenode database.
func (service *Service) SatellitePayStubPeriodCached(ctx context.Context, satelliteID storj.NodeID, periodStart, periodEnd string) (payStubs []PayStub, err error) {
// SatellitePayStubPeriod retrieves held amount for all satellites for selected months from storagenode database.
func (service *Service) SatellitePayStubPeriod(ctx context.Context, satelliteID storj.NodeID, periodStart, periodEnd string) (payStubs []PayStub, err error) {
defer mon.Task()(&ctx, &satelliteID, &periodStart, &periodEnd)(&err)
periods, err := parsePeriodRange(periodStart, periodEnd)
@ -218,8 +100,8 @@ func (service *Service) SatellitePayStubPeriodCached(ctx context.Context, satell
return payStubs, nil
}
// AllPayStubsPeriodCached retrieves held amount for all satellites for selected range of months from storagenode database.
func (service *Service) AllPayStubsPeriodCached(ctx context.Context, periodStart, periodEnd string) (payStubs []PayStub, err error) {
// AllPayStubsPeriod retrieves held amount for all satellites for selected range of months from storagenode database.
func (service *Service) AllPayStubsPeriod(ctx context.Context, periodStart, periodEnd string) (payStubs []PayStub, err error) {
defer mon.Task()(&ctx, &periodStart, &periodEnd)(&err)
periods, err := parsePeriodRange(periodStart, periodEnd)
@ -325,26 +207,6 @@ func (service *Service) AllHeldbackHistory(ctx context.Context) (result []HeldHi
return result, nil
}
// dial dials the HeldAmount client for the satellite by id
func (service *Service) dial(ctx context.Context, satelliteID storj.NodeID) (_ *Client, err error) {
defer mon.Task()(&ctx)(&err)
nodeurl, err := service.trust.GetNodeURL(ctx, satelliteID)
if err != nil {
return nil, errs.New("unable to find satellite %s: %w", satelliteID, err)
}
conn, err := service.dialer.DialNodeURL(ctx, nodeurl)
if err != nil {
return nil, errs.New("unable to connect to the satellite %s: %w", satelliteID, err)
}
return &Client{
conn: conn,
DRPCHeldAmountClient: pb.NewDRPCHeldAmountClient(conn),
}, nil
}
// TODO: move to separate struct.
func parsePeriodRange(periodStart, periodEnd string) (periods []string, err error) {
var yearStart, yearEnd, monthStart, monthEnd int

View File

@ -46,10 +46,11 @@ type CacheStorage struct {
type Cache struct {
log *zap.Logger
db CacheStorage
service *Service
heldamountService *heldamount.Service
trust *trust.Pool
db CacheStorage
service *Service
heldamountEndpoint *heldamount.Endpoint
heldamountService *heldamount.Service
trust *trust.Pool
maxSleep time.Duration
Reputation *sync2.Cycle
@ -57,16 +58,17 @@ type Cache struct {
}
// NewCache creates new caching service instance
func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, heldamountService *heldamount.Service, trust *trust.Pool) *Cache {
func NewCache(log *zap.Logger, config Config, db CacheStorage, service *Service, heldamountEndpoint *heldamount.Endpoint, heldamountService *heldamount.Service, trust *trust.Pool) *Cache {
return &Cache{
log: log,
db: db,
service: service,
heldamountService: heldamountService,
trust: trust,
maxSleep: config.MaxSleep,
Reputation: sync2.NewCycle(config.ReputationSync),
Storage: sync2.NewCycle(config.StorageSync),
log: log,
db: db,
service: service,
heldamountEndpoint: heldamountEndpoint,
heldamountService: heldamountService,
trust: trust,
maxSleep: config.MaxSleep,
Reputation: sync2.NewCycle(config.ReputationSync),
Storage: sync2.NewCycle(config.StorageSync),
}
}
@ -75,7 +77,7 @@ func (cache *Cache) Run(ctx context.Context) error {
var group errgroup.Group
err := cache.satelliteLoop(ctx, func(satelliteID storj.NodeID) error {
stubHistory, err := cache.heldamountService.GetAllPaystubs(ctx, satelliteID)
stubHistory, err := cache.heldamountEndpoint.GetAllPaystubs(ctx, satelliteID)
if err != nil {
return err
}
@ -87,6 +89,18 @@ func (cache *Cache) Run(ctx context.Context) error {
}
}
paymentHistory, err := cache.heldamountEndpoint.GetAllPayments(ctx, satelliteID)
if err != nil {
return err
}
for j := 0; j < len(paymentHistory); j++ {
err := cache.db.HeldAmount.StorePayment(ctx, paymentHistory[j])
if err != nil {
return err
}
}
pricingModel, err := cache.service.GetPricingModel(ctx, satelliteID)
if err != nil {
return err
@ -185,7 +199,7 @@ func (cache *Cache) CacheHeldAmount(ctx context.Context) (err error) {
}
previousMonth := yearAndMonth.AddDate(0, -1, 0).String()
payStub, err := cache.heldamountService.GetPaystubStats(ctx, satellite, previousMonth)
payStub, err := cache.heldamountEndpoint.GetPaystub(ctx, satellite, previousMonth)
if err != nil {
if heldamount.ErrNoPayStubForPeriod.Has(err) {
return nil
@ -200,6 +214,17 @@ func (cache *Cache) CacheHeldAmount(ctx context.Context) (err error) {
}
}
payment, err := cache.heldamountEndpoint.GetPayment(ctx, satellite, previousMonth)
if err != nil {
return err
}
if payment != nil {
if err = cache.db.HeldAmount.StorePayment(ctx, *payment); err != nil {
return err
}
}
return nil
})
}

View File

@ -256,7 +256,8 @@ type Peer struct {
}
Heldamount struct {
Service *heldamount.Service
Service *heldamount.Service
Endpoint *heldamount.Endpoint
}
Bandwidth *bandwidth.Service
@ -530,6 +531,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Log.Named("heldamount:service"),
peer.DB.HeldAmount(),
peer.DB.Reputation(),
peer.Storage2.Trust,
)
peer.Heldamount.Endpoint = heldamount.NewEndpoint(
peer.Log.Named("heldamount:endpoint"),
peer.Dialer,
peer.Storage2.Trust,
)
@ -553,6 +558,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
Satellites: peer.DB.Satellites(),
},
peer.NodeStats.Service,
peer.Heldamount.Endpoint,
peer.Heldamount.Service,
peer.Storage2.Trust,
)

View File

@ -91,7 +91,7 @@ func (db *heldamountDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID
Period: period,
}
row := db.QueryRowContext(ctx,
rowStub := db.QueryRowContext(ctx,
`SELECT created_at,
codes,
usage_at_rest,
@ -115,7 +115,7 @@ func (db *heldamountDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID
satelliteID, period,
)
err = row.Scan(
err = rowStub.Scan(
&result.Created,
&result.Codes,
&result.UsageAtRest,
@ -143,6 +143,19 @@ func (db *heldamountDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID
return nil, ErrHeldAmount.Wrap(err)
}
rowPayment := db.QueryRowContext(ctx,
`SELECT receipt FROM payments WHERE satellite_id = ? AND period = ?`,
satelliteID, period,
)
err = rowPayment.Scan(&result.Receipt)
if err != nil {
if sql.ErrNoRows == err {
return &result, nil
}
return nil, ErrHeldAmount.Wrap(err)
}
return &result, nil
}
@ -340,88 +353,6 @@ func (db *heldamountDB) StorePayment(ctx context.Context, payment heldamount.Pay
return ErrHeldAmount.Wrap(err)
}
// GetPayment retrieves payment data for a specific satellite.
func (db *heldamountDB) GetPayment(ctx context.Context, satelliteID storj.NodeID, period string) (_ *heldamount.Payment, err error) {
defer mon.Task()(&ctx)(&err)
result := heldamount.Payment{
SatelliteID: satelliteID,
Period: period,
}
row := db.QueryRowContext(ctx,
`SELECT id,
created_at,
amount,
receipt,
notes
FROM payments WHERE satellite_id = ? AND period = ?`,
satelliteID, period,
)
err = row.Scan(
&result.ID,
&result.Created,
&result.Amount,
&result.Receipt,
&result.Notes,
)
if err != nil {
if sql.ErrNoRows == err {
return nil, heldamount.ErrNoPayStubForPeriod.Wrap(err)
}
return nil, ErrHeldAmount.Wrap(err)
}
return &result, nil
}
// AllPayments retrieves all payment stats from DB for specific period.
func (db *heldamountDB) AllPayments(ctx context.Context, period string) (_ []heldamount.Payment, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT
satellite_id,
id,
created_at,
amount,
receipt,
notes
FROM payments WHERE period = ?`
rows, err := db.QueryContext(ctx, query, period)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var paymentList []heldamount.Payment
for rows.Next() {
var payment heldamount.Payment
payment.Period = period
err := rows.Scan(&payment.SatelliteID,
&payment.ID,
&payment.Created,
&payment.Amount,
&payment.Receipt,
&payment.Notes,
)
if err != nil {
return nil, ErrHeldAmount.Wrap(err)
}
paymentList = append(paymentList, payment)
}
if err = rows.Err(); err != nil {
return nil, ErrHeldAmount.Wrap(err)
}
return paymentList, nil
}
// SatellitesDisposedHistory returns all disposed amount for specific satellite from DB.
func (db *heldamountDB) SatellitesDisposedHistory(ctx context.Context, satelliteID storj.NodeID) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)