diff --git a/pkg/piecestore/psserver/agreementsender/agreementsender.go b/pkg/piecestore/psserver/agreementsender/agreementsender.go index a6c760016..ef9d1c521 100644 --- a/pkg/piecestore/psserver/agreementsender/agreementsender.go +++ b/pkg/piecestore/psserver/agreementsender/agreementsender.go @@ -50,9 +50,15 @@ func (as *AgreementSender) Run(ctx context.Context) error { as.log.Error("Agreementsender could not retrieve bandwidth allocations", zap.Error(err)) continue } + // send agreement payouts for satellite, agreements := range agreementGroups { as.SendAgreementsToSatellite(ctx, satellite, agreements) } + + // Delete older payout irrespective of its status + if err = as.DB.DeleteBandwidthAllocationPayouts(); err != nil { + as.log.Error("Agreementsender failed to delete bandwidth allocation", zap.Error(err)) + } select { case <-ticker.C: case <-ctx.Done(): @@ -79,8 +85,7 @@ func (as *AgreementSender) SendAgreementsToSatellite(ctx context.Context, satID } client := pb.NewBandwidthClient(conn) defer func() { - err := conn.Close() - if err != nil { + if err := conn.Close(); err != nil { as.log.Warn("Agreementsender failed to close connection", zap.Error(err)) } }() @@ -88,22 +93,29 @@ func (as *AgreementSender) SendAgreementsToSatellite(ctx context.Context, satID //todo: stop sending these one-by-one, send all at once for _, agreement := range agreements { rba := agreement.Agreement + // Send agreement to satellite + sat, err := client.BandwidthAgreements(ctx, &rba) if err != nil { - as.log.Warn("Agreementsender failed to deserialize agreement : will delete", zap.Error(err)) - } else { - // Send agreement to satellite - r, err := client.BandwidthAgreements(ctx, &rba) - if err != nil || r.GetStatus() == pb.AgreementsSummary_FAIL { - as.log.Warn("Agreementsender failed to send agreement to satellite : will retry", zap.Error(err)) - continue - } else if r.GetStatus() == pb.AgreementsSummary_REJECTED { - //todo: something better than a delete here? - as.log.Error("Agreementsender had agreement explicitly rejected by satellite : will delete", zap.Error(err)) + switch sat.GetStatus() { + case pb.AgreementsSummary_FAIL: + // CASE FAILED: connection with sat couldnt be established or connection + // is established but lost before receiving response from satellite + // no updates to the bwa status is done, so it remains "UNSENT" + as.log.Warn("Agreementsender lost connection", zap.Error(err)) + default: + // CASE REJECTED: successful connection with sat established but either failed or rejected received + as.log.Warn("Agreementsender had agreement explicitly rejected/failed by satellite") + err = as.DB.UpdateBandwidthAllocationStatus(rba.PayerAllocation.SerialNumber, psdb.AgreementStatusReject) + if err != nil { + as.log.Error("Agreementsender error", zap.Error(err)) + } + } + } else { + // updates the status to "SENT" + err = as.DB.UpdateBandwidthAllocationStatus(rba.PayerAllocation.SerialNumber, psdb.AgreementStatusSent) + if err != nil { + as.log.Error("Agreementsender error", zap.Error(err)) } - } - // Delete from PSDB by signature - if err = as.DB.DeleteBandwidthAllocationBySerialnum(rba.PayerAllocation.SerialNumber); err != nil { - as.log.Error("Agreementsender failed to delete bandwidth allocation", zap.Error(err)) } } } diff --git a/pkg/piecestore/psserver/psdb/psdb.go b/pkg/piecestore/psserver/psdb/psdb.go index cd585b99d..c4ead30f7 100644 --- a/pkg/piecestore/psserver/psdb/psdb.go +++ b/pkg/piecestore/psserver/psdb/psdb.go @@ -30,6 +30,19 @@ var ( Error = errs.Class("psdb") ) +// AgreementStatus keep tracks of the agreement payout status +type AgreementStatus int32 + +const ( + // AgreementStatusUnsent sets the agreement status to UNSENT + AgreementStatusUnsent = iota + // AgreementStatusSent sets the agreement status to SENT + AgreementStatusSent + // AgreementStatusReject sets the agreement status to REJECT + AgreementStatusReject + // add new status here ... +) + // DB is a piece store database type DB struct { mu sync.Mutex @@ -178,6 +191,13 @@ func (db *DB) Migration() *migrate.Migration { return nil }), }, + { + Description: "Add status column for bandwidth_agreements", + Version: 2, + Action: migrate.SQL{ + `ALTER TABLE bandwidth_agreements ADD COLUMN status INT(10) DEFAULT 0`, + }, + }, }, } return migration @@ -245,15 +265,35 @@ func (db *DB) WriteBandwidthAllocToDB(rba *pb.Order) error { // If the agreements are sorted we can send them in bulk streams to the satellite t := time.Now() startofthedayunixsec := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()).Unix() - _, err = db.db.Exec(`INSERT INTO bandwidth_agreements (satellite, agreement, signature, uplink, serial_num, total, max_size, created_utc_sec, expiration_utc_sec, action, daystart_utc_sec) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + _, err = db.db.Exec(`INSERT INTO bandwidth_agreements (satellite, agreement, signature, uplink, serial_num, total, max_size, created_utc_sec, status, expiration_utc_sec, action, daystart_utc_sec) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, rba.PayerAllocation.SatelliteId.Bytes(), rbaBytes, rba.GetSignature(), rba.PayerAllocation.UplinkId.Bytes(), rba.PayerAllocation.SerialNumber, - rba.Total, rba.PayerAllocation.MaxSize, rba.PayerAllocation.CreatedUnixSec, + rba.Total, rba.PayerAllocation.MaxSize, rba.PayerAllocation.CreatedUnixSec, AgreementStatusUnsent, rba.PayerAllocation.ExpirationUnixSec, rba.PayerAllocation.GetAction().String(), startofthedayunixsec) return err } +// DeleteBandwidthAllocationPayouts delete paid and/or old payout enteries based on days old +func (db *DB) DeleteBandwidthAllocationPayouts() error { + defer db.locked()() + + //@TODO make a config value for older days + t := time.Now().Add(time.Hour * 24 * -90).Unix() + _, err := db.db.Exec(`DELETE FROM bandwidth_agreements WHERE created_utc_sec < ?`, t) + if err == sql.ErrNoRows { + err = nil + } + return err +} + +// UpdateBandwidthAllocationStatus update the bwa payout status +func (db *DB) UpdateBandwidthAllocationStatus(serialnum string, status AgreementStatus) (err error) { + defer db.locked()() + _, err = db.db.Exec(`UPDATE bandwidth_agreements SET status = ? WHERE serial_num = ?`, status, serialnum) + return err +} + // DeleteBandwidthAllocationBySerialnum finds an allocation by signature and deletes it func (db *DB) DeleteBandwidthAllocationBySerialnum(serialnum string) error { defer db.locked()() @@ -295,11 +335,11 @@ func (db *DB) GetBandwidthAllocationBySignature(signature []byte) ([]*pb.Order, return agreements, nil } -// GetBandwidthAllocations all bandwidth agreements and sorts by satellite +// GetBandwidthAllocations all bandwidth agreements func (db *DB) GetBandwidthAllocations() (map[storj.NodeID][]*Agreement, error) { defer db.locked()() - rows, err := db.db.Query(`SELECT satellite, agreement FROM bandwidth_agreements`) + rows, err := db.db.Query(`SELECT satellite, agreement FROM bandwidth_agreements WHERE status = ?`, AgreementStatusUnsent) if err != nil { return nil, err } @@ -331,6 +371,13 @@ func (db *DB) GetBandwidthAllocations() (map[storj.NodeID][]*Agreement, error) { return agreements, nil } +// GetBwaStatusBySerialNum get BWA status by serial num +func (db *DB) GetBwaStatusBySerialNum(serialnum string) (status AgreementStatus, err error) { + defer db.locked()() + err = db.db.QueryRow(`SELECT status FROM bandwidth_agreements WHERE serial_num=?`, serialnum).Scan(&status) + return status, err +} + // AddTTL adds TTL into database by id func (db *DB) AddTTL(id string, expiration, size int64) error { defer db.locked()() diff --git a/pkg/piecestore/psserver/psdb/psdb_test.go b/pkg/piecestore/psserver/psdb/psdb_test.go index 36da57527..8f1c1ffdd 100644 --- a/pkg/piecestore/psserver/psdb/psdb_test.go +++ b/pkg/piecestore/psserver/psdb/psdb_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -67,9 +68,9 @@ func TestHappyPath(t *testing.T) { {ID: "test", Expiration: 666}, } - bandwidthAllocation := func(signature string, satelliteID storj.NodeID, total int64) *pb.Order { + bandwidthAllocation := func(serialnum, signature string, satelliteID storj.NodeID, total int64) *pb.Order { return &pb.Order{ - PayerAllocation: pb.OrderLimit{SatelliteId: satelliteID}, + PayerAllocation: pb.OrderLimit{SatelliteId: satelliteID, SerialNumber: serialnum}, Total: total, Signature: []byte(signature), } @@ -78,10 +79,10 @@ func TestHappyPath(t *testing.T) { //TODO: use better data nodeIDAB := teststorj.NodeIDFromString("AB") allocationTests := []*pb.Order{ - bandwidthAllocation("signed by test", nodeIDAB, 0), - bandwidthAllocation("signed by sigma", nodeIDAB, 10), - bandwidthAllocation("signed by sigma", nodeIDAB, 98), - bandwidthAllocation("signed by test", nodeIDAB, 3), + bandwidthAllocation("serialnum_1", "signed by test", nodeIDAB, 0), + bandwidthAllocation("serialnum_2", "signed by sigma", nodeIDAB, 10), + bandwidthAllocation("serialnum_3", "signed by sigma", nodeIDAB, 98), + bandwidthAllocation("serialnum_4", "signed by test", nodeIDAB, 3), } type bwUsage struct { @@ -283,6 +284,32 @@ func TestHappyPath(t *testing.T) { }) } }) + + type bwaUsage struct { + serialnum string + status psdb.AgreementStatus + } + + bwatests := []bwaUsage{ + {serialnum: "serialnum_1", status: psdb.AgreementStatusUnsent}, + {serialnum: "serialnum_2", status: psdb.AgreementStatusReject}, + {serialnum: "serialnum_3", status: psdb.AgreementStatusSent}, + } + t.Run("UpdateBandwidthAllocationStatus", func(t *testing.T) { + for P := 0; P < concurrency; P++ { + t.Run("#"+strconv.Itoa(P), func(t *testing.T) { + t.Parallel() + for _, bw := range bwatests { + err := db.UpdateBandwidthAllocationStatus(bw.serialnum, bw.status) + require.NoError(t, err) + status, err := db.GetBwaStatusBySerialNum(bw.serialnum) + require.NoError(t, err) + assert.Equal(t, bw.status, status) + } + }) + } + }) + } func BenchmarkWriteBandwidthAllocation(b *testing.B) { diff --git a/pkg/piecestore/psserver/psdb/testdata/sqlite.v2.sql b/pkg/piecestore/psserver/psdb/testdata/sqlite.v2.sql new file mode 100644 index 000000000..002d22a84 --- /dev/null +++ b/pkg/piecestore/psserver/psdb/testdata/sqlite.v2.sql @@ -0,0 +1,18 @@ +CREATE TABLE `ttl` (`id` BLOB UNIQUE, `created` INT(10), `expires` INT(10), `size` INT(10)); +CREATE TABLE `bandwidth_agreements` (`satellite` BLOB, `agreement` BLOB, `signature` BLOB, `uplink` BLOB, `serial_num` BLOB, `total` INT(10), `max_size` INT(10), `created_utc_sec` INT(10), `expiration_utc_sec` INT(10), `action` INT(10), `daystart_utc_sec` INT(10), `status` INT(10) DEFAULT 0); +CREATE INDEX idx_ttl_expires ON ttl (expires); +INSERT INTO ttl VALUES(1,2,3,4); +INSERT INTO bandwidth_agreements VALUES( + X'0fac57151affd454b6884e2ee085097ef9581edea7ccfe6b6ba6401beac06500', + X'0a8a070a200fac57151affd454b6884e2ee085097ef9581edea7ccfe6b6ba6401beac06500122018964640433f24595097466bd08d35bdd1ddf5082f75a821613a6c8b445e0000208fbda2e5052a2430343039363865662d653435342d343432342d616131662d336437333635666365393031388f96b5e30542e6023082016230820108a003020102021100b8be6e42d5b98e0757cda2fa5088e3dd300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004e94b91d488fd492e0097199632ce29dc1e0ada0edd03387d63b5f64e3108519eaaa391cafe0e74a5ae0c39f3056a630a57d55e05c2b160461541aa2bdf1f5978a33f303d300e0603551d0f0101ff0404030205a0301d0603551d250416301406082b0601050507030106082b06010505070302300c0603551d130101ff04023000300a06082a8648ce3d040302034800304502202ed2c4bdfab3a56cd7694e0e67cadaa48d2e6ae3a87dbd4e310395190374e3a0022100eba5c24ed07e4e539948bbd6bdcdc6000633a95373380e6bddc23b2e43f4288142e0023082015c30820101a003020102021100c387ca34d032179981de0f1c642ab820300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004d5c4e88ea884c28d98ed162f9c43d6e056e83a6d9315dc08b2a63e5416867a1e3a2c2f670e4a47e5b6cef6b9df9c0eb46debbf7a70557065479da617ce50b1aaa3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d0403020349003046022100a078a46470e233cd315329725435357fc93f778bb85cdab00a2178660fabf48f022100ed4d90f661ccb7abb7c996626179707c00497957c75dfb8db1ed17d2d80b4cf04a46304402203288c3a91734196d64dbb44390204899e7fb46fb569910665ce7ef924aadf2b502203565082f9b370c1cf89e854fece05033f9ab6e0d3d6dc08b6defb729ea6574301080f8a2011a20555937661ba23a14e4f32e69ba8b8d503f046bc94f2622b50ceae5327bf1f90022e6023082016230820107a00302010202104edd0ae5652847fb1293a10892c25f24300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d030107034200041ed2af8a599e9ed6ea5c85c92a09a42b4530fd1fda0ef481d891ae08c9cea684046073f57f50ee23c1def0ca9158ba470a0f445ffe99d977a60f6e6fc6f6a57aa33f303d300e0603551d0f0101ff0404030205a0301d0603551d250416301406082b0601050507030106082b06010505070302300c0603551d130101ff04023000300a06082a8648ce3d04030203490030460221009585d52e9e12223b11153d8d187349f1f591f6acad41594635e51d359f00d39e0221009a11bd9e81d94a4ee2c3c7adbe362df1c47d3ef4f8ba11d94971f9d4eb59eb1b22df023082015b30820100a00302010202106861fa849f76dabd1a66a3d2a5fb4eb7300a06082a8648ce3d0403023010310e300c060355040a130553746f726a3022180f30303031303130313030303030305a180f30303031303130313030303030305a3010310e300c060355040a130553746f726a3059301306072a8648ce3d020106082a8648ce3d03010703420004ea0ea4bda75425e4277a3a8f0b2fb7adc718c4bad684b082f798b1a2fc6847949719f2230a8b0ef70ea03d18acba93e4b6a3b9b664abc0383a57ec6b8c442905a3383036300e0603551d0f0101ff04040302020430130603551d25040c300a06082b06010505070301300f0603551d130101ff040530030101ff300a06082a8648ce3d04030203490030460221009795c5e7a3cc6ff459cb5816cc5f96cce1894c6ab60c8df3bfe5066cd8aede22022100f61dc807be119c4248de952f5e5ab2e0cf447992749ed3cbe75f351819433e2e2a46304402207eb5bf07d082b299351e89633e82f2e9514f2982d3fddcacf2e331e28c61c95202200c1790520d2983bb768b97f0ed772262ceed5230ecd42a7ba06b7f4a32a1fdcd', + X'7369676e6174757265', + X'18964640433F24595097466BD08D35BDD1DDF5082F75A821613A6C8B445E0000', + '040968ef-e454-4424-aa1f-3d7365fce901', + 2669568, + 0, + 1550666511, + 1554554511, + 0, + 1550620800, + 0 +); \ No newline at end of file