satellite/satellitedb: use utilities for conversions
This avoids some potential typos. Change-Id: Icc5262e1f96fe220dd07212c00acacf6960ee909
This commit is contained in:
parent
974b4f938a
commit
df53914faa
@ -224,34 +224,21 @@ func (db billingDB) List(ctx context.Context, userID uuid.UUID) (txs []billing.T
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
for _, dbxTX := range dbxTXs {
|
||||
tx, err := fromDBXBillingTransaction(dbxTX)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
txs = append(txs, *tx)
|
||||
}
|
||||
|
||||
return txs, nil
|
||||
txs, err = convertSlice(dbxTXs, fromDBXBillingTransaction)
|
||||
return txs, Error.Wrap(err)
|
||||
}
|
||||
|
||||
func (db billingDB) ListSource(ctx context.Context, userID uuid.UUID, txSource string) (txs []billing.Transaction, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
dbxTXs, err := db.db.All_BillingTransaction_By_UserId_And_Source_OrderBy_Desc_Timestamp(ctx,
|
||||
dbx.BillingTransaction_UserId(userID[:]), dbx.BillingTransaction_Source(txSource))
|
||||
dbx.BillingTransaction_UserId(userID[:]),
|
||||
dbx.BillingTransaction_Source(txSource))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
for _, dbxTX := range dbxTXs {
|
||||
tx, err := fromDBXBillingTransaction(dbxTX)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
txs = append(txs, *tx)
|
||||
}
|
||||
|
||||
return txs, nil
|
||||
txs, err = convertSlice(dbxTXs, fromDBXBillingTransaction)
|
||||
return txs, Error.Wrap(err)
|
||||
}
|
||||
|
||||
func (db billingDB) GetBalance(ctx context.Context, userID uuid.UUID) (_ currency.Amount, err error) {
|
||||
@ -269,12 +256,12 @@ func (db billingDB) GetBalance(ctx context.Context, userID uuid.UUID) (_ currenc
|
||||
}
|
||||
|
||||
// fromDBXBillingTransaction converts *dbx.BillingTransaction to *billing.Transaction.
|
||||
func fromDBXBillingTransaction(dbxTX *dbx.BillingTransaction) (*billing.Transaction, error) {
|
||||
func fromDBXBillingTransaction(dbxTX *dbx.BillingTransaction) (billing.Transaction, error) {
|
||||
userID, err := uuid.FromBytes(dbxTX.UserId)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
return billing.Transaction{}, errs.Wrap(err)
|
||||
}
|
||||
return &billing.Transaction{
|
||||
return billing.Transaction{
|
||||
ID: dbxTX.Id,
|
||||
UserID: userID,
|
||||
Amount: currency.AmountFromBaseUnits(dbxTX.Amount, currency.USDollarsMicro),
|
||||
|
@ -53,17 +53,8 @@ func (db *coinPaymentsTransactions) ListAccount(ctx context.Context, userID uuid
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var txs []stripe.Transaction
|
||||
for _, dbxTX := range dbxTXs {
|
||||
tx, err := fromDBXCoinpaymentsTransaction(dbxTX)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
|
||||
txs = append(txs, *tx)
|
||||
}
|
||||
|
||||
return txs, nil
|
||||
txs, err := convertSlice(dbxTXs, fromDBXCoinpaymentsTransaction)
|
||||
return txs, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// TestInsert inserts new coinpayments transaction into DB.
|
||||
@ -126,17 +117,17 @@ func (db *coinPaymentsTransactions) TestLockRate(ctx context.Context, id coinpay
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to *stripecoinpayments.Transaction.
|
||||
func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (*stripe.Transaction, error) {
|
||||
// fromDBXCoinpaymentsTransaction converts *dbx.CoinpaymentsTransaction to stripecoinpayments.Transaction.
|
||||
func fromDBXCoinpaymentsTransaction(dbxCPTX *dbx.CoinpaymentsTransaction) (stripe.Transaction, error) {
|
||||
userID, err := uuid.FromBytes(dbxCPTX.UserId)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
return stripe.Transaction{}, errs.Wrap(err)
|
||||
}
|
||||
|
||||
// TODO: the currency here should be passed in to this function or stored
|
||||
// in the database.
|
||||
|
||||
return &stripe.Transaction{
|
||||
return stripe.Transaction{
|
||||
ID: coinpayments.TransactionID(dbxCPTX.Id),
|
||||
AccountID: userID,
|
||||
Address: dbxCPTX.Address,
|
||||
|
@ -204,12 +204,9 @@ func projectInvitationFromDBX(dbxInvite *dbx.ProjectInvitation) (_ *console.Proj
|
||||
// projectInvitationSliceFromDBX converts a project member invitation slice from the database to a
|
||||
// slice of console.ProjectInvitation.
|
||||
func projectInvitationSliceFromDBX(dbxInvites []*dbx.ProjectInvitation) (invites []console.ProjectInvitation, err error) {
|
||||
for _, dbxInvite := range dbxInvites {
|
||||
invite, err := projectInvitationFromDBX(dbxInvite)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
invites = append(invites, *invite)
|
||||
}
|
||||
return invites, nil
|
||||
return convertSlice(dbxInvites,
|
||||
func(i *dbx.ProjectInvitation) (console.ProjectInvitation, error) {
|
||||
r, err := projectInvitationFromDBX(i)
|
||||
return *r, err
|
||||
})
|
||||
}
|
||||
|
@ -213,19 +213,13 @@ func sanitizeOrderDirectionName(pmo console.OrderDirection) string {
|
||||
// projectMembersFromDbxSlice is used for creating []ProjectMember entities from autogenerated []*dbx.ProjectMember struct.
|
||||
func projectMembersFromDbxSlice(ctx context.Context, projectMembersDbx []*dbx.ProjectMember) (_ []console.ProjectMember, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
var projectMembers []console.ProjectMember
|
||||
var errors []error
|
||||
|
||||
// Generating []dbo from []dbx and collecting all errors
|
||||
for _, projectMemberDbx := range projectMembersDbx {
|
||||
projectMember, err := projectMemberFromDBX(ctx, projectMemberDbx)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
projectMembers = append(projectMembers, *projectMember)
|
||||
}
|
||||
|
||||
return projectMembers, errs.Combine(errors...)
|
||||
rs, errors := convertSliceWithErrors(projectMembersDbx,
|
||||
func(v *dbx.ProjectMember) (r console.ProjectMember, _ error) {
|
||||
p, err := projectMemberFromDBX(ctx, v)
|
||||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
return *p, err
|
||||
})
|
||||
return rs, errs.Combine(errors...)
|
||||
}
|
||||
|
@ -427,20 +427,14 @@ func projectFromDBX(ctx context.Context, project *dbx.Project) (_ *console.Proje
|
||||
func projectsFromDbxSlice(ctx context.Context, projectsDbx []*dbx.Project) (_ []console.Project, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var projects []console.Project
|
||||
var errors []error
|
||||
|
||||
// Generating []dbo from []dbx and collecting all errors
|
||||
for _, projectDbx := range projectsDbx {
|
||||
project, err := projectFromDBX(ctx, projectDbx)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
projects = append(projects, *project)
|
||||
}
|
||||
|
||||
projects, errors := convertSliceWithErrors(projectsDbx,
|
||||
func(v *dbx.Project) (r console.Project, _ error) {
|
||||
p, err := projectFromDBX(ctx, v)
|
||||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
return *p, nil
|
||||
})
|
||||
return projects, errs.Combine(errors...)
|
||||
}
|
||||
|
||||
|
@ -53,18 +53,10 @@ func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally ti
|
||||
func (db *StoragenodeAccounting) GetTallies(ctx context.Context) (_ []*accounting.StoragenodeStorageTally, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
raws, err := db.db.All_StoragenodeStorageTally(ctx)
|
||||
out := make([]*accounting.StoragenodeStorageTally, len(raws))
|
||||
for i, r := range raws {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
out[i] = &accounting.StoragenodeStorageTally{
|
||||
NodeID: nodeID,
|
||||
IntervalEndTime: r.IntervalEndTime,
|
||||
DataTotal: r.DataTotal,
|
||||
}
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
out, err := convertSlice(raws, fromDBXStoragenodeStorageTally)
|
||||
return out, Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -72,18 +64,10 @@ func (db *StoragenodeAccounting) GetTallies(ctx context.Context) (_ []*accountin
|
||||
func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) (_ []*accounting.StoragenodeStorageTally, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
raws, err := db.db.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.StoragenodeStorageTally_IntervalEndTime(latestRollup))
|
||||
out := make([]*accounting.StoragenodeStorageTally, len(raws))
|
||||
for i, r := range raws {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
out[i] = &accounting.StoragenodeStorageTally{
|
||||
NodeID: nodeID,
|
||||
IntervalEndTime: r.IntervalEndTime,
|
||||
DataTotal: r.DataTotal,
|
||||
}
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
out, err := convertSlice(raws, fromDBXStoragenodeStorageTally)
|
||||
return out, Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -132,16 +116,11 @@ func (db *StoragenodeAccounting) getBandwidthByNodeSince(ctx context.Context, la
|
||||
}
|
||||
cursor = next
|
||||
for _, r := range rollups {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
||||
v, err := fromDBXStoragenodeBandwidthRollup(r)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
return err
|
||||
}
|
||||
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: nodeID,
|
||||
IntervalStart: r.IntervalStart,
|
||||
Action: r.Action,
|
||||
Settled: r.Settled,
|
||||
})
|
||||
err = cb(ctx, &v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -171,16 +150,11 @@ func (db *StoragenodeAccounting) getBandwidthPhase2ByNodeSince(ctx context.Conte
|
||||
}
|
||||
cursor = next
|
||||
for _, r := range rollups {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
|
||||
v, err := fromDBXStoragenodeBandwidthRollupPhase2(r)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
err = cb(ctx, &accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: nodeID,
|
||||
IntervalStart: r.IntervalStart,
|
||||
Action: r.Action,
|
||||
Settled: r.Settled,
|
||||
})
|
||||
err = cb(ctx, &v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -642,18 +616,13 @@ func (db *StoragenodeAccounting) GetRollupsSince(ctx context.Context, since time
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
cursor = next
|
||||
for _, dbxRollup := range dbxRollups {
|
||||
id, err := storj.NodeIDFromBytes(dbxRollup.StoragenodeId)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
bwRollups = append(bwRollups, accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: id,
|
||||
IntervalStart: dbxRollup.IntervalStart,
|
||||
Action: dbxRollup.Action,
|
||||
Settled: dbxRollup.Settled,
|
||||
})
|
||||
|
||||
rollups, err := convertSlice(dbxRollups, fromDBXStoragenodeBandwidthRollup)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
bwRollups = append(bwRollups, rollups...)
|
||||
|
||||
if cursor == nil {
|
||||
return bwRollups, nil
|
||||
}
|
||||
@ -678,20 +647,66 @@ func (db *StoragenodeAccounting) GetArchivedRollupsSince(ctx context.Context, si
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
cursor = next
|
||||
for _, dbxRollup := range dbxRollups {
|
||||
id, err := storj.NodeIDFromBytes(dbxRollup.StoragenodeId)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
bwRollups = append(bwRollups, accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: id,
|
||||
IntervalStart: dbxRollup.IntervalStart,
|
||||
Action: dbxRollup.Action,
|
||||
Settled: dbxRollup.Settled,
|
||||
})
|
||||
|
||||
rollups, err := convertSlice(dbxRollups, fromDBXStoragenodeBandwidthRollupArchive)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
bwRollups = append(bwRollups, rollups...)
|
||||
|
||||
if cursor == nil {
|
||||
return bwRollups, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func fromDBXStoragenodeStorageTally(r *dbx.StoragenodeStorageTally) (*accounting.StoragenodeStorageTally, error) {
|
||||
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
return &accounting.StoragenodeStorageTally{
|
||||
NodeID: nodeID,
|
||||
IntervalEndTime: r.IntervalEndTime,
|
||||
DataTotal: r.DataTotal,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func fromDBXStoragenodeBandwidthRollup(v *dbx.StoragenodeBandwidthRollup) (r accounting.StoragenodeBandwidthRollup, _ error) {
|
||||
id, err := storj.NodeIDFromBytes(v.StoragenodeId)
|
||||
if err != nil {
|
||||
return r, Error.Wrap(err)
|
||||
}
|
||||
return accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: id,
|
||||
IntervalStart: v.IntervalStart,
|
||||
Action: v.Action,
|
||||
Settled: v.Settled,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func fromDBXStoragenodeBandwidthRollupPhase2(v *dbx.StoragenodeBandwidthRollupPhase2) (r accounting.StoragenodeBandwidthRollup, _ error) {
|
||||
id, err := storj.NodeIDFromBytes(v.StoragenodeId)
|
||||
if err != nil {
|
||||
return r, Error.Wrap(err)
|
||||
}
|
||||
return accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: id,
|
||||
IntervalStart: v.IntervalStart,
|
||||
Action: v.Action,
|
||||
Settled: v.Settled,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func fromDBXStoragenodeBandwidthRollupArchive(v *dbx.StoragenodeBandwidthRollupArchive) (r accounting.StoragenodeBandwidthRollup, _ error) {
|
||||
id, err := storj.NodeIDFromBytes(v.StoragenodeId)
|
||||
if err != nil {
|
||||
return r, Error.Wrap(err)
|
||||
}
|
||||
return accounting.StoragenodeBandwidthRollup{
|
||||
NodeID: id,
|
||||
IntervalStart: v.IntervalStart,
|
||||
Action: v.Action,
|
||||
Settled: v.Settled,
|
||||
}, nil
|
||||
}
|
||||
|
@ -126,12 +126,7 @@ func (storjscanPayments *storjscanPayments) ListWallet(ctx context.Context, wall
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
var payments []storjscan.CachedPayment
|
||||
for _, dbxPmnt := range dbxPmnts {
|
||||
payments = append(payments, fromDBXPayment(dbxPmnt))
|
||||
}
|
||||
|
||||
return payments, nil
|
||||
return convertSliceNoError(dbxPmnts, fromDBXPayment), nil
|
||||
}
|
||||
|
||||
// LastBlock returns the highest block known to DB.
|
||||
@ -160,6 +155,7 @@ func (storjscanPayments storjscanPayments) DeletePending(ctx context.Context) er
|
||||
func (storjscanPayments storjscanPayments) ListConfirmed(ctx context.Context, blockNumber int64, logIndex int) (_ []storjscan.CachedPayment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// TODO: use DBX here
|
||||
query := `SELECT block_hash, block_number, transaction, log_index, from_address, to_address, token_value, usd_value, status, timestamp
|
||||
FROM storjscan_payments WHERE (storjscan_payments.block_number, storjscan_payments.log_index) > (?, ?) AND storjscan_payments.status = ?
|
||||
ORDER BY storjscan_payments.block_number, storjscan_payments.log_index`
|
||||
|
@ -34,3 +34,29 @@ func convertSlice[In, Out any](xs []In, fn func(In) (Out, error)) ([]Out, error)
|
||||
}
|
||||
return rs, nil
|
||||
}
|
||||
|
||||
// convertSliceNoError converts xs by applying fn to each element.
|
||||
func convertSliceNoError[In, Out any](xs []In, fn func(In) Out) []Out {
|
||||
rs := make([]Out, len(xs))
|
||||
for i := range xs {
|
||||
rs[i] = fn(xs[i])
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
// convertSliceWithErrors converts xs by applying fn to each element.
|
||||
// It returns all the successfully converted values and returns the list of
|
||||
// errors separately.
|
||||
func convertSliceWithErrors[In, Out any](xs []In, fn func(In) (Out, error)) ([]Out, []error) {
|
||||
var errs []error
|
||||
rs := make([]Out, 0, len(xs))
|
||||
for i := range xs {
|
||||
r, err := fn(xs[i])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
rs = append(rs, r)
|
||||
}
|
||||
return rs, errs
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user