Enhance the Storage Node to keep all BWA's for 90 days (#1374)
* Enhance the Storage Node to keep all BWA's for 90 days * warn -> error * rename enum
This commit is contained in:
parent
3c9d83dbfe
commit
15034526cd
@ -50,9 +50,15 @@ func (as *AgreementSender) Run(ctx context.Context) error {
|
||||
as.log.Error("Agreementsender could not retrieve bandwidth allocations", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
// send agreement payouts
|
||||
for satellite, agreements := range agreementGroups {
|
||||
as.SendAgreementsToSatellite(ctx, satellite, agreements)
|
||||
}
|
||||
|
||||
// Delete older payout irrespective of its status
|
||||
if err = as.DB.DeleteBandwidthAllocationPayouts(); err != nil {
|
||||
as.log.Error("Agreementsender failed to delete bandwidth allocation", zap.Error(err))
|
||||
}
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-ctx.Done():
|
||||
@ -79,8 +85,7 @@ func (as *AgreementSender) SendAgreementsToSatellite(ctx context.Context, satID
|
||||
}
|
||||
client := pb.NewBandwidthClient(conn)
|
||||
defer func() {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
if err := conn.Close(); err != nil {
|
||||
as.log.Warn("Agreementsender failed to close connection", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
@ -88,22 +93,29 @@ func (as *AgreementSender) SendAgreementsToSatellite(ctx context.Context, satID
|
||||
//todo: stop sending these one-by-one, send all at once
|
||||
for _, agreement := range agreements {
|
||||
rba := agreement.Agreement
|
||||
// Send agreement to satellite
|
||||
sat, err := client.BandwidthAgreements(ctx, &rba)
|
||||
if err != nil {
|
||||
as.log.Warn("Agreementsender failed to deserialize agreement : will delete", zap.Error(err))
|
||||
} else {
|
||||
// Send agreement to satellite
|
||||
r, err := client.BandwidthAgreements(ctx, &rba)
|
||||
if err != nil || r.GetStatus() == pb.AgreementsSummary_FAIL {
|
||||
as.log.Warn("Agreementsender failed to send agreement to satellite : will retry", zap.Error(err))
|
||||
continue
|
||||
} else if r.GetStatus() == pb.AgreementsSummary_REJECTED {
|
||||
//todo: something better than a delete here?
|
||||
as.log.Error("Agreementsender had agreement explicitly rejected by satellite : will delete", zap.Error(err))
|
||||
switch sat.GetStatus() {
|
||||
case pb.AgreementsSummary_FAIL:
|
||||
// CASE FAILED: connection with sat couldnt be established or connection
|
||||
// is established but lost before receiving response from satellite
|
||||
// no updates to the bwa status is done, so it remains "UNSENT"
|
||||
as.log.Warn("Agreementsender lost connection", zap.Error(err))
|
||||
default:
|
||||
// CASE REJECTED: successful connection with sat established but either failed or rejected received
|
||||
as.log.Warn("Agreementsender had agreement explicitly rejected/failed by satellite")
|
||||
err = as.DB.UpdateBandwidthAllocationStatus(rba.PayerAllocation.SerialNumber, psdb.AgreementStatusReject)
|
||||
if err != nil {
|
||||
as.log.Error("Agreementsender error", zap.Error(err))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// updates the status to "SENT"
|
||||
err = as.DB.UpdateBandwidthAllocationStatus(rba.PayerAllocation.SerialNumber, psdb.AgreementStatusSent)
|
||||
if err != nil {
|
||||
as.log.Error("Agreementsender error", zap.Error(err))
|
||||
}
|
||||
}
|
||||
// Delete from PSDB by signature
|
||||
if err = as.DB.DeleteBandwidthAllocationBySerialnum(rba.PayerAllocation.SerialNumber); err != nil {
|
||||
as.log.Error("Agreementsender failed to delete bandwidth allocation", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,19 @@ var (
|
||||
Error = errs.Class("psdb")
|
||||
)
|
||||
|
||||
// AgreementStatus keep tracks of the agreement payout status
|
||||
type AgreementStatus int32
|
||||
|
||||
const (
|
||||
// AgreementStatusUnsent sets the agreement status to UNSENT
|
||||
AgreementStatusUnsent = iota
|
||||
// AgreementStatusSent sets the agreement status to SENT
|
||||
AgreementStatusSent
|
||||
// AgreementStatusReject sets the agreement status to REJECT
|
||||
AgreementStatusReject
|
||||
// add new status here ...
|
||||
)
|
||||
|
||||
// DB is a piece store database
|
||||
type DB struct {
|
||||
mu sync.Mutex
|
||||
@ -178,6 +191,13 @@ func (db *DB) Migration() *migrate.Migration {
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
Description: "Add status column for bandwidth_agreements",
|
||||
Version: 2,
|
||||
Action: migrate.SQL{
|
||||
`ALTER TABLE bandwidth_agreements ADD COLUMN status INT(10) DEFAULT 0`,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return migration
|
||||
@ -245,15 +265,35 @@ func (db *DB) WriteBandwidthAllocToDB(rba *pb.Order) error {
|
||||
// If the agreements are sorted we can send them in bulk streams to the satellite
|
||||
t := time.Now()
|
||||
startofthedayunixsec := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix()
|
||||
_, err = db.db.Exec(`INSERT INTO bandwidth_agreements (satellite, agreement, signature, uplink, serial_num, total, max_size, created_utc_sec, expiration_utc_sec, action, daystart_utc_sec) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
_, err = db.db.Exec(`INSERT INTO bandwidth_agreements (satellite, agreement, signature, uplink, serial_num, total, max_size, created_utc_sec, status, expiration_utc_sec, action, daystart_utc_sec) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
rba.PayerAllocation.SatelliteId.Bytes(), rbaBytes, rba.GetSignature(),
|
||||
rba.PayerAllocation.UplinkId.Bytes(), rba.PayerAllocation.SerialNumber,
|
||||
rba.Total, rba.PayerAllocation.MaxSize, rba.PayerAllocation.CreatedUnixSec,
|
||||
rba.Total, rba.PayerAllocation.MaxSize, rba.PayerAllocation.CreatedUnixSec, AgreementStatusUnsent,
|
||||
rba.PayerAllocation.ExpirationUnixSec, rba.PayerAllocation.GetAction().String(),
|
||||
startofthedayunixsec)
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteBandwidthAllocationPayouts delete paid and/or old payout enteries based on days old
|
||||
func (db *DB) DeleteBandwidthAllocationPayouts() error {
|
||||
defer db.locked()()
|
||||
|
||||
//@TODO make a config value for older days
|
||||
t := time.Now().Add(time.Hour * 24 * -90).Unix()
|
||||
_, err := db.db.Exec(`DELETE FROM bandwidth_agreements WHERE created_utc_sec < ?`, t)
|
||||
if err == sql.ErrNoRows {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateBandwidthAllocationStatus update the bwa payout status
|
||||
func (db *DB) UpdateBandwidthAllocationStatus(serialnum string, status AgreementStatus) (err error) {
|
||||
defer db.locked()()
|
||||
_, err = db.db.Exec(`UPDATE bandwidth_agreements SET status = ? WHERE serial_num = ?`, status, serialnum)
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteBandwidthAllocationBySerialnum finds an allocation by signature and deletes it
|
||||
func (db *DB) DeleteBandwidthAllocationBySerialnum(serialnum string) error {
|
||||
defer db.locked()()
|
||||
@ -295,11 +335,11 @@ func (db *DB) GetBandwidthAllocationBySignature(signature []byte) ([]*pb.Order,
|
||||
return agreements, nil
|
||||
}
|
||||
|
||||
// GetBandwidthAllocations all bandwidth agreements and sorts by satellite
|
||||
// GetBandwidthAllocations all bandwidth agreements
|
||||
func (db *DB) GetBandwidthAllocations() (map[storj.NodeID][]*Agreement, error) {
|
||||
defer db.locked()()
|
||||
|
||||
rows, err := db.db.Query(`SELECT satellite, agreement FROM bandwidth_agreements`)
|
||||
rows, err := db.db.Query(`SELECT satellite, agreement FROM bandwidth_agreements WHERE status = ?`, AgreementStatusUnsent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -331,6 +371,13 @@ func (db *DB) GetBandwidthAllocations() (map[storj.NodeID][]*Agreement, error) {
|
||||
return agreements, nil
|
||||
}
|
||||
|
||||
// GetBwaStatusBySerialNum get BWA status by serial num
|
||||
func (db *DB) GetBwaStatusBySerialNum(serialnum string) (status AgreementStatus, err error) {
|
||||
defer db.locked()()
|
||||
err = db.db.QueryRow(`SELECT status FROM bandwidth_agreements WHERE serial_num=?`, serialnum).Scan(&status)
|
||||
return status, err
|
||||
}
|
||||
|
||||
// AddTTL adds TTL into database by id
|
||||
func (db *DB) AddTTL(id string, expiration, size int64) error {
|
||||
defer db.locked()()
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
@ -67,9 +68,9 @@ func TestHappyPath(t *testing.T) {
|
||||
{ID: "test", Expiration: 666},
|
||||
}
|
||||
|
||||
bandwidthAllocation := func(signature string, satelliteID storj.NodeID, total int64) *pb.Order {
|
||||
bandwidthAllocation := func(serialnum, signature string, satelliteID storj.NodeID, total int64) *pb.Order {
|
||||
return &pb.Order{
|
||||
PayerAllocation: pb.OrderLimit{SatelliteId: satelliteID},
|
||||
PayerAllocation: pb.OrderLimit{SatelliteId: satelliteID, SerialNumber: serialnum},
|
||||
Total: total,
|
||||
Signature: []byte(signature),
|
||||
}
|
||||
@ -78,10 +79,10 @@ func TestHappyPath(t *testing.T) {
|
||||
//TODO: use better data
|
||||
nodeIDAB := teststorj.NodeIDFromString("AB")
|
||||
allocationTests := []*pb.Order{
|
||||
bandwidthAllocation("signed by test", nodeIDAB, 0),
|
||||
bandwidthAllocation("signed by sigma", nodeIDAB, 10),
|
||||
bandwidthAllocation("signed by sigma", nodeIDAB, 98),
|
||||
bandwidthAllocation("signed by test", nodeIDAB, 3),
|
||||
bandwidthAllocation("serialnum_1", "signed by test", nodeIDAB, 0),
|
||||
bandwidthAllocation("serialnum_2", "signed by sigma", nodeIDAB, 10),
|
||||
bandwidthAllocation("serialnum_3", "signed by sigma", nodeIDAB, 98),
|
||||
bandwidthAllocation("serialnum_4", "signed by test", nodeIDAB, 3),
|
||||
}
|
||||
|
||||
type bwUsage struct {
|
||||
@ -283,6 +284,32 @@ func TestHappyPath(t *testing.T) {
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
type bwaUsage struct {
|
||||
serialnum string
|
||||
status psdb.AgreementStatus
|
||||
}
|
||||
|
||||
bwatests := []bwaUsage{
|
||||
{serialnum: "serialnum_1", status: psdb.AgreementStatusUnsent},
|
||||
{serialnum: "serialnum_2", status: psdb.AgreementStatusReject},
|
||||
{serialnum: "serialnum_3", status: psdb.AgreementStatusSent},
|
||||
}
|
||||
t.Run("UpdateBandwidthAllocationStatus", func(t *testing.T) {
|
||||
for P := 0; P < concurrency; P++ {
|
||||
t.Run("#"+strconv.Itoa(P), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
for _, bw := range bwatests {
|
||||
err := db.UpdateBandwidthAllocationStatus(bw.serialnum, bw.status)
|
||||
require.NoError(t, err)
|
||||
status, err := db.GetBwaStatusBySerialNum(bw.serialnum)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, bw.status, status)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func BenchmarkWriteBandwidthAllocation(b *testing.B) {
|
||||
|
18
pkg/piecestore/psserver/psdb/testdata/sqlite.v2.sql
vendored
Normal file
18
pkg/piecestore/psserver/psdb/testdata/sqlite.v2.sql
vendored
Normal file
@ -0,0 +1,18 @@
|
||||
CREATE TABLE `ttl` (`id` BLOB UNIQUE, `created` INT(10), `expires` INT(10), `size` INT(10));
|
||||
CREATE TABLE `bandwidth_agreements` (`satellite` BLOB, `agreement` BLOB, `signature` BLOB, `uplink` BLOB, `serial_num` BLOB, `total` INT(10), `max_size` INT(10), `created_utc_sec` INT(10), `expiration_utc_sec` INT(10), `action` INT(10), `daystart_utc_sec` INT(10), `status` INT(10) DEFAULT 0);
|
||||
CREATE INDEX idx_ttl_expires ON ttl (expires);
|
||||
INSERT INTO ttl VALUES(1,2,3,4);
|
||||
INSERT INTO bandwidth_agreements VALUES(
|
||||
X'0fac57151affd454b6884e2ee085097ef9581edea7ccfe6b6ba6401beac06500',
|
||||
X'0a8a070a200fac57151affd454b6884e2ee085097ef9581edea7ccfe6b6ba6401beac06500122018964640433f24595097466bd08d35bdd1ddf5082f75a821613a6c8b445e0000208fbda2e5052a2430343039363865662d653435342d343432342d616131662d336437333635666365393031388f96b5e30542e6023082016230820108a003020102021100b8be6e42d5b98e0757cda2fa5088e3dd300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004e94b91d488fd492e0097199632ce29dc1e0ada0edd03387d63b5f64e3108519eaaa391cafe0e74a5ae0c39f3056a630a57d55e05c2b160461541aa2bdf1f5978a33f303d300e0603551d0f0101ff0404030205a0301d0603551d250416301406082b0601050507030106082b06010505070302300c0603551d130101ff04023000300a06082a8648ce3d040302034800304502202ed2c4bdfab3a56cd7694e0e67cadaa48d2e6ae3a87dbd4e310395190374e3a0022100eba5c24ed07e4e539948bbd6bdcdc6000633a95373380e6bddc23b2e43f4288142e0023082015c30820101a003020102021100c387ca34d032179981de0f1c642ab820300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004d5c4e88ea884c28d98ed162f9c43d6e056e83a6d9315dc08b2a63e5416867a1e3a2c2f670e4a47e5b6cef6b9df9c0eb46debbf7a70557065479da617ce50b1aaa3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d0403020349003046022100a078a46470e233cd315329725435357fc93f778bb85cdab00a2178660fabf48f022100ed4d90f661ccb7abb7c996626179707c00497957c75dfb8db1ed17d2d80b4cf04a46304402203288c3a91734196d64dbb44390204899e7fb46fb569910665ce7ef924aadf2b502203565082f9b370c1cf89e854fece05033f9ab6e0d3d6dc08b6defb729ea6574301080f8a2011a20555937661ba23a14e4f32e69ba8b8d503f046bc94f2622b50ceae5327bf1f90022e6023082016230820107a00302010202104edd0ae5652847fb1293a10892c25f24300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d030107034200041ed2af8a599e9ed6ea5c85c92a09a42b4530fd1fda0ef481d891ae08c9cea684046073f57f50ee23c1def0ca9158ba470a0f445ffe99d977a60f6e6fc6f6a57aa33f303d300e0603551d0f0101ff0404030205a0301d0603551d250416301406082b0601050507030106082b06010505070302300c0603551d130101ff04023000300a06082a8648ce3d04030203490030460221009585d52e9e12223b11153d8d187349f1f591f6acad41594635e51d359f00d39e0221009a11bd9e81d94a4ee2c3c7adbe362df1c47d3ef4f8ba11d94971f9d4eb59eb1b22df023082015b30820100a00302010202106861fa849f76dabd1a66a3d2a5fb4eb7300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004ea0ea4bda75425e4277a3a8f0b2fb7adc718c4bad684b082f798b1a2fc6847949719f2230a8b0ef70ea03d18acba93e4b6a3b9b664abc0383a57ec6b8c442905a3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d04030203490030460221009795c5e7a3cc6ff459cb5816cc5f96cce1894c6ab60c8df3bfe5066cd8aede22022100f61dc807be119c4248de952f5e5ab2e0cf447992749ed3cbe75f351819433e2e2a46304402207eb5bf07d082b299351e89633e82f2e9514f2982d3fddcacf2e331e28c61c95202200c1790520d2983bb768b97f0ed772262ceed5230ecd42a7ba06b7f4a32a1fdcd',
|
||||
X'7369676e6174757265',
|
||||
X'18964640433F24595097466BD08D35BDD1DDF5082F75A821613A6C8B445E0000',
|
||||
'040968ef-e454-4424-aa1f-3d7365fce901',
|
||||
2669568,
|
||||
0,
|
||||
1550666511,
|
||||
1554554511,
|
||||
0,
|
||||
1550620800,
|
||||
0
|
||||
);
|
Loading…
Reference in New Issue
Block a user