framework for deleting expired Orders from Satellite (#1436)

framework for deleting old bandwidth agreements
This commit is contained in:
Bill Thorp 2019-03-12 16:57:21 -04:00 committed by GitHub
parent 59f1e267c9
commit 52e829c6de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 131 additions and 396 deletions

View File

@ -85,6 +85,21 @@ func (t *Tally) Tally(ctx context.Context) error {
err = t.SaveBWRaw(ctx, tallyEnd, time.Now().UTC(), bwTotals)
if err != nil {
errBWA = errs.New("Saving for bandwidth failed : %v", err)
} else {
//remove expired records
now := time.Now()
_, err = t.bwAgreementDB.GetExpired(tallyEnd, now)
if err != nil {
return err
}
var expiredOrdersHaveBeenSaved bool
//todo: write files to disk or whatever we decide to do here
if expiredOrdersHaveBeenSaved {
err = t.bwAgreementDB.DeleteExpired(tallyEnd, now)
if err != nil {
return err
}
}
}
}
return errs.Combine(errAtRest, errBWA)

View File

@ -29,15 +29,15 @@ func TestBandwidthDBAgreement(t *testing.T) {
snID, err := testidentity.NewTestIdentity(ctx)
require.NoError(t, err)
require.NoError(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_PUT, "1", upID, snID))
require.Error(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "1", upID, snID))
require.NoError(t, testCreateAgreement(ctx, t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "2", upID, snID))
require.NoError(t, testSaveOrder(t, db.BandwidthAgreement(), pb.BandwidthAction_PUT, "1", upID, snID))
require.Error(t, testSaveOrder(t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "1", upID, snID))
require.NoError(t, testSaveOrder(t, db.BandwidthAgreement(), pb.BandwidthAction_GET, "2", upID, snID))
testGetTotals(ctx, t, db.BandwidthAgreement(), snID)
testGetUplinkStats(ctx, t, db.BandwidthAgreement(), upID)
})
}
func testCreateAgreement(ctx context.Context, t *testing.T, b bwagreement.DB, action pb.BandwidthAction,
func testSaveOrder(t *testing.T, b bwagreement.DB, action pb.BandwidthAction,
serialNum string, upID, snID *identity.FullIdentity) error {
rba := &pb.Order{
PayerAllocation: pb.OrderLimit{
@ -48,7 +48,7 @@ func testCreateAgreement(ctx context.Context, t *testing.T, b bwagreement.DB, ac
Total: 1000,
StorageNodeId: snID.ID,
}
return b.CreateAgreement(ctx, rba)
return b.SaveOrder(rba)
}
func testGetUplinkStats(ctx context.Context, t *testing.T, b bwagreement.DB, upID *identity.FullIdentity) {

View File

@ -33,7 +33,7 @@ var (
type Config struct {
}
//UplinkStat contains information about an uplink's returned bandwidth agreement
//UplinkStat contains information about an uplink's returned Orders
type UplinkStat struct {
NodeID storj.NodeID
TotalBytes int64
@ -42,14 +42,29 @@ type UplinkStat struct {
TotalTransactions int
}
// DB stores bandwidth agreements.
//SavedOrder is information from an Order pertaining to accounting
type SavedOrder struct {
Serialnum string
StorageNodeID storj.NodeID
UplinkID storj.NodeID
Action int64
Total int64
CreatedAt time.Time
ExpiresAt time.Time
}
// DB stores orders for accounting purposes
type DB interface {
// CreateAgreement adds a new bandwidth agreement.
CreateAgreement(context.Context, *pb.Order) error
// SaveOrder saves an order for accounting
SaveOrder(*pb.Order) error
// GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range
GetTotals(context.Context, time.Time, time.Time) (map[storj.NodeID][]int64, error)
//GetTotals returns stats about an uplink
GetUplinkStats(context.Context, time.Time, time.Time) ([]UplinkStat, error)
//GetExpired gets orders that are expired and were created before some time
GetExpired(time.Time, time.Time) ([]SavedOrder, error)
//DeleteExpired deletes orders that are expired and were created before some time
DeleteExpired(time.Time, time.Time) error
}
// Server is an implementation of the pb.BandwidthServer interface
@ -97,7 +112,7 @@ func (s *Server) BandwidthAgreements(ctx context.Context, rba *pb.Order) (reply
}
//save and return rersults
if err = s.bwdb.CreateAgreement(ctx, rba); err != nil {
if err = s.bwdb.SaveOrder(rba); err != nil {
if strings.Contains(err.Error(), "UNIQUE constraint failed") ||
strings.Contains(err.Error(), "violates unique constraint") {
return reply, pb.ErrPayer.Wrap(auth.ErrSerial.Wrap(err))

View File

@ -5,6 +5,7 @@ package storj // import "storj.io/storj/pkg/storj"
import (
"crypto/sha256"
"database/sql/driver"
"math/bits"
"github.com/btcsuite/btcutil/base58"
@ -143,6 +144,22 @@ func (id NodeID) MarshalJSON() ([]byte, error) {
return []byte(`"` + id.String() + `"`), nil
}
// Value set a NodeID to a database field
func (id NodeID) Value() (driver.Value, error) {
return id.Bytes(), nil
}
// Scan extracts a NodeID from a database field
func (id *NodeID) Scan(src interface{}) (err error) {
b, ok := src.([]byte)
if !ok {
return ErrNodeID.New("NodeID Scan expects []byte")
}
n, err := NodeIDFromBytes(b)
*id = n
return err
}
// UnmarshalJSON deserializes a json string (as bytes) to a node ID
func (id *NodeID) UnmarshalJSON(data []byte) error {
var err error

View File

@ -8,6 +8,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/pkg/storj"
)
@ -61,3 +62,21 @@ func TestNodeID_Difficulty(t *testing.T) {
assert.Equal(t, testcase.difficulty, difficulty)
}
}
// TestNodeScan tests (*NodeID).Scan()
func TestNodeScan(t *testing.T) {
tmpID := &storj.NodeID{}
require.Error(t, tmpID.Scan(32))
require.Error(t, tmpID.Scan(false))
require.Error(t, tmpID.Scan([]byte{}))
require.NoError(t, tmpID.Scan(tmpID.Bytes()))
}
// TestNodeValue tests NodeID.Value()
func TestNodeValue(t *testing.T) {
tmpID := storj.NodeID{}
v, err := tmpID.Value()
require.NoError(t, err)
require.IsType(t, v, []byte{})
require.Len(t, v, storj.NodeIDSize)
}

View File

@ -20,16 +20,16 @@ type bandwidthagreement struct {
db *dbx.DB
}
func (b *bandwidthagreement) CreateAgreement(ctx context.Context, rba *pb.Order) (err error) {
expiration := time.Unix(rba.PayerAllocation.ExpirationUnixSec, 0)
_, err = b.db.Create_Bwagreement(
ctx,
dbx.Bwagreement_Serialnum(rba.PayerAllocation.SerialNumber+rba.StorageNodeId.String()),
dbx.Bwagreement_StorageNodeId(rba.StorageNodeId.Bytes()),
dbx.Bwagreement_UplinkId(rba.PayerAllocation.UplinkId.Bytes()),
dbx.Bwagreement_Action(int64(rba.PayerAllocation.Action)),
dbx.Bwagreement_Total(rba.Total),
dbx.Bwagreement_ExpiresAt(expiration),
func (b *bandwidthagreement) SaveOrder(rba *pb.Order) (err error) {
var saveOrderSQL = `INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? )`
_, err = b.db.DB.Exec(b.db.Rebind(saveOrderSQL),
rba.PayerAllocation.SerialNumber+rba.StorageNodeId.String(),
rba.StorageNodeId,
rba.PayerAllocation.UplinkId,
int64(rba.PayerAllocation.Action),
rba.Total,
time.Now().UTC(),
time.Unix(rba.PayerAllocation.ExpirationUnixSec, 0),
)
return err
}
@ -49,17 +49,11 @@ func (b *bandwidthagreement) GetUplinkStats(ctx context.Context, from, to time.T
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var nodeID []byte
stat := bwagreement.UplinkStat{}
err := rows.Scan(&nodeID, &stat.TotalBytes, &stat.PutActionCount, &stat.GetActionCount, &stat.TotalTransactions)
err := rows.Scan(&stat.NodeID, &stat.TotalBytes, &stat.PutActionCount, &stat.GetActionCount, &stat.TotalTransactions)
if err != nil {
return stats, err
}
id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return stats, err
}
stat.NodeID = id
stats = append(stats, stat)
}
return stats, nil
@ -85,23 +79,41 @@ func (b *bandwidthagreement) GetTotals(ctx context.Context, from, to time.Time)
totals := make(map[storj.NodeID][]int64)
for i := 0; rows.Next(); i++ {
var nodeID []byte
var nodeID storj.NodeID
data := make([]int64, len(pb.BandwidthAction_value))
err := rows.Scan(&nodeID, &data[pb.BandwidthAction_PUT], &data[pb.BandwidthAction_GET],
&data[pb.BandwidthAction_GET_AUDIT], &data[pb.BandwidthAction_GET_REPAIR], &data[pb.BandwidthAction_PUT_REPAIR])
if err != nil {
return totals, err
}
id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return totals, err
}
totals[id] = data
totals[nodeID] = data
}
return totals, nil
}
func (b *bandwidthagreement) DeletePaidAndExpired(ctx context.Context) error {
// TODO: implement deletion of paid and expired BWAs
return Error.New("DeletePaidAndExpired not implemented")
//GetExpired gets orders that are expired and were created before some time
func (b *bandwidthagreement) GetExpired(before time.Time, expiredAt time.Time) (orders []bwagreement.SavedOrder, err error) {
var getExpiredSQL = `SELECT serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at
FROM bwagreements WHERE created_at < ? AND expires_at < ?`
rows, err := b.db.DB.Query(b.db.Rebind(getExpiredSQL), before, expiredAt)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for i := 0; rows.Next(); i++ {
o := bwagreement.SavedOrder{}
err = rows.Scan(&o.Serialnum, &o.StorageNodeID, &o.UplinkID, &o.Action, &o.Total, &o.CreatedAt, &o.ExpiresAt)
if err != nil {
break
}
orders = append(orders, o)
}
return orders, err
}
//DeleteExpired deletes orders that are expired and were created before some time
func (b *bandwidthagreement) DeleteExpired(before time.Time, expiredAt time.Time) error {
var deleteExpiredSQL = `DELETE FROM bwagreements WHERE created_at < ? AND expires_at < ?`
_, err := b.db.DB.Exec(b.db.Rebind(deleteExpiredSQL), before, expiredAt)
return err
}

View File

@ -14,13 +14,6 @@ model bwagreement (
field expires_at timestamp
)
create bwagreement ( )
read limitoffset ( select bwagreement)
read all ( select bwagreement)
read all (
select bwagreement
where bwagreement.created_at > ?
)
//--- datarepair.irreparableDB ---//

View File

@ -2858,38 +2858,6 @@ type Value_Row struct {
Value time.Time
}
func (obj *postgresImpl) Create_Bwagreement(ctx context.Context,
bwagreement_serialnum Bwagreement_Serialnum_Field,
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
bwagreement_uplink_id Bwagreement_UplinkId_Field,
bwagreement_action Bwagreement_Action_Field,
bwagreement_total Bwagreement_Total_Field,
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
bwagreement *Bwagreement, err error) {
__now := obj.db.Hooks.Now().UTC()
__serialnum_val := bwagreement_serialnum.value()
__storage_node_id_val := bwagreement_storage_node_id.value()
__uplink_id_val := bwagreement_uplink_id.value()
__action_val := bwagreement_action.value()
__total_val := bwagreement_total.value()
__created_at_val := __now
__expires_at_val := bwagreement_expires_at.value()
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? ) RETURNING bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val)
bwagreement = &Bwagreement{}
err = obj.driver.QueryRow(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val).Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
return bwagreement, nil
}
func (obj *postgresImpl) Create_Irreparabledb(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field,
irreparabledb_segmentdetail Irreparabledb_Segmentdetail_Field,
@ -3277,106 +3245,6 @@ func (obj *postgresImpl) Create_CertRecord(ctx context.Context,
}
func (obj *postgresImpl) Limited_Bwagreement(ctx context.Context,
limit int, offset int64) (
rows []*Bwagreement, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values)
__values = append(__values, limit, offset)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
bwagreement := &Bwagreement{}
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, bwagreement)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) All_Bwagreement(ctx context.Context) (
rows []*Bwagreement, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements")
var __values []interface{}
__values = append(__values)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
bwagreement := &Bwagreement{}
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, bwagreement)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Context,
bwagreement_created_at_greater Bwagreement_CreatedAt_Field) (
rows []*Bwagreement, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?")
var __values []interface{}
__values = append(__values, bwagreement_created_at_greater.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
bwagreement := &Bwagreement{}
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, bwagreement)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) Get_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
irreparabledb *Irreparabledb, err error) {
@ -5071,41 +4939,6 @@ func (obj *postgresImpl) deleteAll(ctx context.Context) (count int64, err error)
}
func (obj *sqlite3Impl) Create_Bwagreement(ctx context.Context,
bwagreement_serialnum Bwagreement_Serialnum_Field,
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
bwagreement_uplink_id Bwagreement_UplinkId_Field,
bwagreement_action Bwagreement_Action_Field,
bwagreement_total Bwagreement_Total_Field,
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
bwagreement *Bwagreement, err error) {
__now := obj.db.Hooks.Now().UTC()
__serialnum_val := bwagreement_serialnum.value()
__storage_node_id_val := bwagreement_storage_node_id.value()
__uplink_id_val := bwagreement_uplink_id.value()
__action_val := bwagreement_action.value()
__total_val := bwagreement_total.value()
__created_at_val := __now
__expires_at_val := bwagreement_expires_at.value()
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bwagreements ( serialnum, storage_node_id, uplink_id, action, total, created_at, expires_at ) VALUES ( ?, ?, ?, ?, ?, ?, ? )")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val)
__res, err := obj.driver.Exec(__stmt, __serialnum_val, __storage_node_id_val, __uplink_id_val, __action_val, __total_val, __created_at_val, __expires_at_val)
if err != nil {
return nil, obj.makeErr(err)
}
__pk, err := __res.LastInsertId()
if err != nil {
return nil, obj.makeErr(err)
}
return obj.getLastBwagreement(ctx, __pk)
}
func (obj *sqlite3Impl) Create_Irreparabledb(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field,
irreparabledb_segmentdetail Irreparabledb_Segmentdetail_Field,
@ -5532,106 +5365,6 @@ func (obj *sqlite3Impl) Create_CertRecord(ctx context.Context,
}
func (obj *sqlite3Impl) Limited_Bwagreement(ctx context.Context,
limit int, offset int64) (
rows []*Bwagreement, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values)
__values = append(__values, limit, offset)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
bwagreement := &Bwagreement{}
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, bwagreement)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *sqlite3Impl) All_Bwagreement(ctx context.Context) (
rows []*Bwagreement, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements")
var __values []interface{}
__values = append(__values)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
bwagreement := &Bwagreement{}
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, bwagreement)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *sqlite3Impl) All_Bwagreement_By_CreatedAt_Greater(ctx context.Context,
bwagreement_created_at_greater Bwagreement_CreatedAt_Field) (
rows []*Bwagreement, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE bwagreements.created_at > ?")
var __values []interface{}
__values = append(__values, bwagreement_created_at_greater.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.Query(__stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
bwagreement := &Bwagreement{}
err = __rows.Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, bwagreement)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *sqlite3Impl) Get_Irreparabledb_By_Segmentpath(ctx context.Context,
irreparabledb_segmentpath Irreparabledb_Segmentpath_Field) (
irreparabledb *Irreparabledb, err error) {
@ -7248,24 +6981,6 @@ func (obj *sqlite3Impl) Delete_CertRecord_By_Id(ctx context.Context,
}
func (obj *sqlite3Impl) getLastBwagreement(ctx context.Context,
pk int64) (
bwagreement *Bwagreement, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bwagreements.serialnum, bwagreements.storage_node_id, bwagreements.uplink_id, bwagreements.action, bwagreements.total, bwagreements.created_at, bwagreements.expires_at FROM bwagreements WHERE _rowid_ = ?")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, pk)
bwagreement = &Bwagreement{}
err = obj.driver.QueryRow(__stmt, pk).Scan(&bwagreement.Serialnum, &bwagreement.StorageNodeId, &bwagreement.UplinkId, &bwagreement.Action, &bwagreement.Total, &bwagreement.CreatedAt, &bwagreement.ExpiresAt)
if err != nil {
return nil, obj.makeErr(err)
}
return bwagreement, nil
}
func (obj *sqlite3Impl) getLastIrreparabledb(ctx context.Context,
pk int64) (
irreparabledb *Irreparabledb, err error) {
@ -7744,25 +7459,6 @@ func (rx *Rx) All_ApiKey_By_ProjectId_OrderBy_Asc_Name(ctx context.Context,
return tx.All_ApiKey_By_ProjectId_OrderBy_Asc_Name(ctx, api_key_project_id)
}
func (rx *Rx) All_Bwagreement(ctx context.Context) (
rows []*Bwagreement, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_Bwagreement(ctx)
}
func (rx *Rx) All_Bwagreement_By_CreatedAt_Greater(ctx context.Context,
bwagreement_created_at_greater Bwagreement_CreatedAt_Field) (
rows []*Bwagreement, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.All_Bwagreement_By_CreatedAt_Greater(ctx, bwagreement_created_at_greater)
}
func (rx *Rx) All_Node_Id(ctx context.Context) (
rows []*Id_Row, err error) {
var tx *Tx
@ -7882,22 +7578,6 @@ func (rx *Rx) Create_BucketUsage(ctx context.Context,
}
func (rx *Rx) Create_Bwagreement(ctx context.Context,
bwagreement_serialnum Bwagreement_Serialnum_Field,
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
bwagreement_uplink_id Bwagreement_UplinkId_Field,
bwagreement_action Bwagreement_Action_Field,
bwagreement_total Bwagreement_Total_Field,
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
bwagreement *Bwagreement, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Create_Bwagreement(ctx, bwagreement_serialnum, bwagreement_storage_node_id, bwagreement_uplink_id, bwagreement_action, bwagreement_total, bwagreement_expires_at)
}
func (rx *Rx) Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
@ -8306,16 +7986,6 @@ func (rx *Rx) Limited_BucketUsage_By_BucketId_And_RollupEndTime_Greater_And_Roll
return tx.Limited_BucketUsage_By_BucketId_And_RollupEndTime_Greater_And_RollupEndTime_LessOrEqual_OrderBy_Desc_RollupEndTime(ctx, bucket_usage_bucket_id, bucket_usage_rollup_end_time_greater, bucket_usage_rollup_end_time_less_or_equal, limit, offset)
}
func (rx *Rx) Limited_Bwagreement(ctx context.Context,
limit int, offset int64) (
rows []*Bwagreement, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Limited_Bwagreement(ctx, limit, offset)
}
func (rx *Rx) Limited_Injuredsegment(ctx context.Context,
limit int, offset int64) (
rows []*Injuredsegment, err error) {
@ -8452,13 +8122,6 @@ type Methods interface {
api_key_project_id ApiKey_ProjectId_Field) (
rows []*ApiKey, err error)
All_Bwagreement(ctx context.Context) (
rows []*Bwagreement, err error)
All_Bwagreement_By_CreatedAt_Greater(ctx context.Context,
bwagreement_created_at_greater Bwagreement_CreatedAt_Field) (
rows []*Bwagreement, err error)
All_Node_Id(ctx context.Context) (
rows []*Id_Row, err error)
@ -8519,15 +8182,6 @@ type Methods interface {
bucket_usage_audit_egress BucketUsage_AuditEgress_Field) (
bucket_usage *BucketUsage, err error)
Create_Bwagreement(ctx context.Context,
bwagreement_serialnum Bwagreement_Serialnum_Field,
bwagreement_storage_node_id Bwagreement_StorageNodeId_Field,
bwagreement_uplink_id Bwagreement_UplinkId_Field,
bwagreement_action Bwagreement_Action_Field,
bwagreement_total Bwagreement_Total_Field,
bwagreement_expires_at Bwagreement_ExpiresAt_Field) (
bwagreement *Bwagreement, err error)
Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
@ -8712,10 +8366,6 @@ type Methods interface {
limit int, offset int64) (
rows []*BucketUsage, err error)
Limited_Bwagreement(ctx context.Context,
limit int, offset int64) (
rows []*Bwagreement, err error)
Limited_Injuredsegment(ctx context.Context,
limit int, offset int64) (
rows []*Injuredsegment, err error)

View File

@ -119,11 +119,18 @@ type lockedBandwidthAgreement struct {
db bwagreement.DB
}
// CreateAgreement adds a new bandwidth agreement.
func (m *lockedBandwidthAgreement) CreateAgreement(ctx context.Context, a1 *pb.RenterBandwidthAllocation) error {
// DeleteExpired deletes orders that are expired and were created before some time
func (m *lockedBandwidthAgreement) DeleteExpired(a0 time.Time, a1 time.Time) error {
m.Lock()
defer m.Unlock()
return m.db.CreateAgreement(ctx, a1)
return m.db.DeleteExpired(a0, a1)
}
// GetExpired gets orders that are expired and were created before some time
func (m *lockedBandwidthAgreement) GetExpired(a0 time.Time, a1 time.Time) ([]bwagreement.SavedOrder, error) {
m.Lock()
defer m.Unlock()
return m.db.GetExpired(a0, a1)
}
// GetTotalsSince returns the sum of each bandwidth type after (exluding) a given date range
@ -140,6 +147,13 @@ func (m *lockedBandwidthAgreement) GetUplinkStats(ctx context.Context, a1 time.T
return m.db.GetUplinkStats(ctx, a1, a2)
}
// SaveOrder saves an order for accounting
func (m *lockedBandwidthAgreement) SaveOrder(a0 *pb.RenterBandwidthAllocation) error {
m.Lock()
defer m.Unlock()
return m.db.SaveOrder(a0)
}
// CertDB returns database for storing uplink's public key & ID
func (m *locked) CertDB() certdb.DB {
m.Lock()