satellite/payments/storjscan: add chore to periodically ask for new payments
Change-Id: Ia6ad182184596fe1fe1f9a40f909a3af4be8feef
This commit is contained in:
parent
bd0d717f4c
commit
13be4ad0dd
112
satellite/payments/storjscan/chore.go
Normal file
112
satellite/payments/storjscan/chore.go
Normal file
@ -0,0 +1,112 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package storjscan
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/storj/satellite/payments/monetary"
|
||||
)
|
||||
|
||||
// ChoreErr is storjscan chore err class.
|
||||
var ChoreErr = errs.Class("storjscan chore")
|
||||
|
||||
// Chore periodically queries for new payments from storjscan.
|
||||
//
|
||||
// architecture: Chore
|
||||
type Chore struct {
|
||||
log *zap.Logger
|
||||
client *Client
|
||||
paymentsDB PaymentsDB
|
||||
TransactionCycle *sync2.Cycle
|
||||
confirmations int
|
||||
}
|
||||
|
||||
// NewChore creates new chore.
|
||||
func NewChore(log *zap.Logger, client *Client, paymentsDB PaymentsDB, confirmations int, interval time.Duration) *Chore {
|
||||
return &Chore{
|
||||
log: log,
|
||||
client: client,
|
||||
paymentsDB: paymentsDB,
|
||||
TransactionCycle: sync2.NewCycle(interval),
|
||||
confirmations: confirmations,
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs storjscan payment loop.
|
||||
func (chore *Chore) Run(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return chore.TransactionCycle.Run(ctx, func(ctx context.Context) error {
|
||||
var from int64
|
||||
|
||||
blockNumber, err := chore.paymentsDB.LastBlock(ctx, PaymentStatusConfirmed)
|
||||
switch {
|
||||
case err == nil:
|
||||
from = blockNumber + 1
|
||||
case errs.Is(err, ErrNoPayments):
|
||||
from = 0
|
||||
default:
|
||||
chore.log.Error("error retrieving last payment", zap.Error(ChoreErr.Wrap(err)))
|
||||
return nil
|
||||
}
|
||||
|
||||
payments, err := chore.client.Payments(ctx, from)
|
||||
if err != nil {
|
||||
chore.log.Error("error retrieving payments", zap.Error(ChoreErr.Wrap(err)))
|
||||
return nil
|
||||
}
|
||||
if len(payments.Payments) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var cachedPayments []CachedPayment
|
||||
for _, payment := range payments.Payments {
|
||||
var status PaymentStatus
|
||||
if payments.LatestBlock.Number-payment.BlockNumber >= int64(chore.confirmations) {
|
||||
status = PaymentStatusConfirmed
|
||||
} else {
|
||||
status = PaymentStatusPending
|
||||
}
|
||||
|
||||
cachedPayments = append(cachedPayments, CachedPayment{
|
||||
From: payment.From,
|
||||
To: payment.To,
|
||||
TokenValue: monetary.AmountFromBaseUnits(payment.TokenValue.Int64(), monetary.StorjToken),
|
||||
Status: status,
|
||||
BlockHash: payment.BlockHash,
|
||||
BlockNumber: payment.BlockNumber,
|
||||
Transaction: payment.Transaction,
|
||||
LogIndex: payment.LogIndex,
|
||||
Timestamp: payment.Timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
err = chore.paymentsDB.DeletePending(ctx)
|
||||
if err != nil {
|
||||
chore.log.Error("error removing pending payments from the DB", zap.Error(ChoreErr.Wrap(err)))
|
||||
return nil
|
||||
}
|
||||
|
||||
err = chore.paymentsDB.InsertBatch(ctx, cachedPayments)
|
||||
if err != nil {
|
||||
chore.log.Error("error storing payments to db", zap.Error(ChoreErr.Wrap(err)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Close closes all underlying resources.
|
||||
func (chore *Chore) Close() (err error) {
|
||||
defer mon.Task()(nil)(&err)
|
||||
chore.TransactionCycle.Close()
|
||||
return nil
|
||||
}
|
148
satellite/payments/storjscan/chore_test.go
Normal file
148
satellite/payments/storjscan/chore_test.go
Normal file
@ -0,0 +1,148 @@
|
||||
// Copyright (C) 2022 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package storjscan_test
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/payments/monetary"
|
||||
"storj.io/storj/satellite/payments/storjscan"
|
||||
"storj.io/storj/satellite/payments/storjscan/blockchaintest"
|
||||
"storj.io/storj/satellite/payments/storjscan/storjscantest"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestChore(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
logger := zaptest.NewLogger(t)
|
||||
now := time.Now().Round(time.Second)
|
||||
|
||||
const confirmations = 12
|
||||
|
||||
var payments []storjscan.Payment
|
||||
var cachedPayments []storjscan.CachedPayment
|
||||
|
||||
latestBlock := storjscan.Header{
|
||||
Hash: blockchaintest.NewHash(),
|
||||
Number: 0,
|
||||
Timestamp: now,
|
||||
}
|
||||
|
||||
addPayments := func(count int) {
|
||||
l := len(payments)
|
||||
for i := l; i < l+count; i++ {
|
||||
payment := storjscan.Payment{
|
||||
From: blockchaintest.NewAddress(),
|
||||
To: blockchaintest.NewAddress(),
|
||||
TokenValue: new(big.Int).SetInt64(int64(i)),
|
||||
BlockHash: blockchaintest.NewHash(),
|
||||
BlockNumber: int64(i),
|
||||
Transaction: blockchaintest.NewHash(),
|
||||
LogIndex: i,
|
||||
Timestamp: now.Add(time.Duration(i) * time.Second),
|
||||
}
|
||||
payments = append(payments, payment)
|
||||
|
||||
cachedPayments = append(cachedPayments, storjscan.CachedPayment{
|
||||
From: payment.From,
|
||||
To: payment.To,
|
||||
TokenValue: monetary.AmountFromBaseUnits(payment.TokenValue.Int64(), monetary.StorjToken),
|
||||
Status: storjscan.PaymentStatusPending,
|
||||
BlockHash: payment.BlockHash,
|
||||
BlockNumber: payment.BlockNumber,
|
||||
Transaction: payment.Transaction,
|
||||
LogIndex: payment.LogIndex,
|
||||
Timestamp: payment.Timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
latestBlock = storjscan.Header{
|
||||
Hash: payments[len(payments)-1].BlockHash,
|
||||
Number: payments[len(payments)-1].BlockNumber,
|
||||
Timestamp: payments[len(payments)-1].Timestamp,
|
||||
}
|
||||
for i := 0; i < len(cachedPayments); i++ {
|
||||
if latestBlock.Number-cachedPayments[i].BlockNumber >= confirmations {
|
||||
cachedPayments[i].Status = storjscan.PaymentStatusConfirmed
|
||||
} else {
|
||||
cachedPayments[i].Status = storjscan.PaymentStatusPending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var reqCounter int
|
||||
|
||||
const (
|
||||
identifier = "eu"
|
||||
secret = "secret"
|
||||
)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var err error
|
||||
reqCounter++
|
||||
|
||||
if err = storjscantest.CheckAuth(r, identifier, secret); err != nil {
|
||||
storjscantest.ServeJSONError(t, w, http.StatusUnauthorized, err)
|
||||
return
|
||||
}
|
||||
|
||||
var from int64
|
||||
if s := r.URL.Query().Get("from"); s != "" {
|
||||
from, err = strconv.ParseInt(s, 10, 64)
|
||||
if err != nil {
|
||||
storjscantest.ServeJSONError(t, w, http.StatusBadRequest, errs.New("from parameter is missing"))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
storjscantest.ServePayments(t, w, from, latestBlock, payments)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
paymentsDB := db.StorjscanPayments()
|
||||
|
||||
client := storjscan.NewClient(server.URL, "eu", "secret")
|
||||
chore := storjscan.NewChore(logger, client, paymentsDB, confirmations, time.Second)
|
||||
ctx.Go(func() error {
|
||||
return chore.Run(ctx)
|
||||
})
|
||||
defer ctx.Check(chore.Close)
|
||||
|
||||
chore.TransactionCycle.Pause()
|
||||
chore.TransactionCycle.TriggerWait()
|
||||
cachedReqCounter := reqCounter
|
||||
|
||||
addPayments(100)
|
||||
chore.TransactionCycle.TriggerWait()
|
||||
|
||||
last, err := paymentsDB.LastBlock(ctx, storjscan.PaymentStatusPending)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 99, last)
|
||||
actual, err := paymentsDB.List(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cachedPayments, actual)
|
||||
|
||||
addPayments(100)
|
||||
chore.TransactionCycle.TriggerWait()
|
||||
|
||||
last, err = paymentsDB.LastBlock(ctx, storjscan.PaymentStatusPending)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 199, last)
|
||||
actual, err = paymentsDB.List(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cachedPayments, actual)
|
||||
|
||||
require.Equal(t, reqCounter, cachedReqCounter+2)
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue
Block a user