storj/satellite/satellitedb/storjscanpayments.go
Egon Elbre df53914faa satellite/satellitedb: use utilities for conversions
This avoids some potential typos.

Change-Id: Icc5262e1f96fe220dd07212c00acacf6960ee909
2023-06-08 16:13:47 +03:00

197 lines
6.7 KiB
Go

// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"time"
"github.com/zeebo/errs"
"storj.io/common/currency"
"storj.io/private/dbutil/pgutil"
"storj.io/storj/private/blockchain"
"storj.io/storj/satellite/payments"
"storj.io/storj/satellite/payments/storjscan"
"storj.io/storj/satellite/satellitedb/dbx"
)
var _ storjscan.PaymentsDB = (*storjscanPayments)(nil)
// storjscanPayments implements storjscan.DB.
type storjscanPayments struct {
db *satelliteDB
}
// InsertBatch inserts list of payments in a single transaction.
func (storjscanPayments *storjscanPayments) InsertBatch(ctx context.Context, payments []storjscan.CachedPayment) (err error) {
defer mon.Task()(&ctx)(&err)
cmnd := `INSERT INTO storjscan_payments(
block_hash,
block_number,
transaction,
log_index,
from_address,
to_address,
token_value,
usd_value,
status,
timestamp,
created_at
) SELECT
UNNEST($1::BYTEA[]),
UNNEST($2::INT8[]),
UNNEST($3::BYTEA[]),
UNNEST($4::INT4[]),
UNNEST($5::BYTEA[]),
UNNEST($6::BYTEA[]),
UNNEST($7::INT8[]),
UNNEST($8::INT8[]),
UNNEST($9::TEXT[]),
UNNEST($10::TIMESTAMPTZ[]),
$11
`
var (
blockHashes = make([][]byte, 0, len(payments))
blockNumbers = make([]int64, 0, len(payments))
transactions = make([][]byte, 0, len(payments))
logIndexes = make([]int32, 0, len(payments))
fromAddresses = make([][]byte, 0, len(payments))
toAddresses = make([][]byte, 0, len(payments))
tokenValues = make([]int64, 0, len(payments))
usdValues = make([]int64, 0, len(payments))
statuses = make([]string, 0, len(payments))
timestamps = make([]time.Time, 0, len(payments))
createdAt = time.Now()
)
for i := range payments {
payment := payments[i]
blockHashes = append(blockHashes, payment.BlockHash[:])
blockNumbers = append(blockNumbers, payment.BlockNumber)
transactions = append(transactions, payment.Transaction[:])
logIndexes = append(logIndexes, int32(payment.LogIndex))
fromAddresses = append(fromAddresses, payment.From[:])
toAddresses = append(toAddresses, payment.To[:])
tokenValues = append(tokenValues, payment.TokenValue.BaseUnits())
usdValues = append(usdValues, payment.USDValue.BaseUnits())
statuses = append(statuses, string(payment.Status))
timestamps = append(timestamps, payment.Timestamp)
}
_, err = storjscanPayments.db.ExecContext(ctx, cmnd,
pgutil.ByteaArray(blockHashes),
pgutil.Int8Array(blockNumbers),
pgutil.ByteaArray(transactions),
pgutil.Int4Array(logIndexes),
pgutil.ByteaArray(fromAddresses),
pgutil.ByteaArray(toAddresses),
pgutil.Int8Array(tokenValues),
pgutil.Int8Array(usdValues),
pgutil.TextArray(statuses),
pgutil.TimestampTZArray(timestamps),
createdAt)
return err
}
// List returns list of storjscan payments order by block number and log index desc.
func (storjscanPayments *storjscanPayments) List(ctx context.Context) (_ []storjscan.CachedPayment, err error) {
defer mon.Task()(&ctx)(&err)
dbxPmnts, err := storjscanPayments.db.All_StorjscanPayment_OrderBy_Asc_BlockNumber_Asc_LogIndex(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
var payments []storjscan.CachedPayment
for _, dbxPmnt := range dbxPmnts {
payments = append(payments, fromDBXPayment(dbxPmnt))
}
return payments, nil
}
// ListWallet returns list of storjscan payments order by block number and log index desc.
func (storjscanPayments *storjscanPayments) ListWallet(ctx context.Context, wallet blockchain.Address, limit int, offset int64) ([]storjscan.CachedPayment, error) {
dbxPmnts, err := storjscanPayments.db.Limited_StorjscanPayment_By_ToAddress_OrderBy_Desc_BlockNumber_Desc_LogIndex(ctx,
dbx.StorjscanPayment_ToAddress(wallet[:]),
limit, offset)
if err != nil {
if errs.Is(err, sql.ErrNoRows) {
return []storjscan.CachedPayment{}, nil
}
return nil, Error.Wrap(err)
}
return convertSliceNoError(dbxPmnts, fromDBXPayment), nil
}
// LastBlock returns the highest block known to DB.
func (storjscanPayments *storjscanPayments) LastBlock(ctx context.Context, status payments.PaymentStatus) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
blockNumber, err := storjscanPayments.db.First_StorjscanPayment_BlockNumber_By_Status_OrderBy_Desc_BlockNumber_Desc_LogIndex(
ctx, dbx.StorjscanPayment_Status(string(status)))
if err != nil {
return 0, Error.Wrap(err)
}
if blockNumber == nil {
return 0, Error.Wrap(storjscan.ErrNoPayments)
}
return blockNumber.BlockNumber, nil
}
// DeletePending removes all pending transactions from the DB.
func (storjscanPayments storjscanPayments) DeletePending(ctx context.Context) error {
_, err := storjscanPayments.db.Delete_StorjscanPayment_By_Status(ctx,
dbx.StorjscanPayment_Status(payments.PaymentStatusPending))
return err
}
func (storjscanPayments storjscanPayments) ListConfirmed(ctx context.Context, blockNumber int64, logIndex int) (_ []storjscan.CachedPayment, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: use DBX here
query := `SELECT block_hash, block_number, transaction, log_index, from_address, to_address, token_value, usd_value, status, timestamp
FROM storjscan_payments WHERE (storjscan_payments.block_number, storjscan_payments.log_index) > (?, ?) AND storjscan_payments.status = ?
ORDER BY storjscan_payments.block_number, storjscan_payments.log_index`
rows, err := storjscanPayments.db.Query(ctx, storjscanPayments.db.Rebind(query), blockNumber, logIndex, payments.PaymentStatusConfirmed)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var payments []storjscan.CachedPayment
for rows.Next() {
var payment dbx.StorjscanPayment
err = rows.Scan(&payment.BlockHash, &payment.BlockNumber, &payment.Transaction, &payment.LogIndex,
&payment.FromAddress, &payment.ToAddress, &payment.TokenValue, &payment.UsdValue, &payment.Status, &payment.Timestamp)
if err != nil {
return nil, err
}
payments = append(payments, fromDBXPayment(&payment))
}
return payments, rows.Err()
}
// fromDBXPayment converts dbx storjscan payment type to storjscan.CachedPayment.
func fromDBXPayment(dbxPmnt *dbx.StorjscanPayment) storjscan.CachedPayment {
payment := storjscan.CachedPayment{
TokenValue: currency.AmountFromBaseUnits(dbxPmnt.TokenValue, currency.StorjToken),
USDValue: currency.AmountFromBaseUnits(dbxPmnt.UsdValue, currency.USDollarsMicro),
Status: payments.PaymentStatus(dbxPmnt.Status),
BlockNumber: dbxPmnt.BlockNumber,
LogIndex: dbxPmnt.LogIndex,
Timestamp: dbxPmnt.Timestamp.UTC(),
}
copy(payment.From[:], dbxPmnt.FromAddress)
copy(payment.To[:], dbxPmnt.ToAddress)
copy(payment.BlockHash[:], dbxPmnt.BlockHash)
copy(payment.Transaction[:], dbxPmnt.Transaction)
return payment
}