storagenode: batch archiving unsent_orders (#2507)

This commit is contained in:
Jeff Wendling 2019-07-31 12:40:08 -04:00 committed by Egon Elbre
parent 4adafd056c
commit 26a2fbb719
4 changed files with 97 additions and 23 deletions

View File

@ -99,11 +99,11 @@ func TestDB(t *testing.T) {
require.Empty(t, cmp.Diff(expectedGrouped, unsentGrouped, cmp.Comparer(pb.Equal)))
// test archival
err = ordersdb.Archive(ctx, satellite0.ID, serialNumber, orders.StatusAccepted)
err = ordersdb.Archive(ctx, orders.ArchiveRequest{satellite0.ID, serialNumber, orders.StatusAccepted})
require.NoError(t, err)
// duplicate archive
err = ordersdb.Archive(ctx, satellite0.ID, serialNumber, orders.StatusRejected)
err = ordersdb.Archive(ctx, orders.ArchiveRequest{satellite0.ID, serialNumber, orders.StatusRejected})
require.Error(t, err)
// shouldn't be in unsent list
@ -159,7 +159,7 @@ func TestDB_Trivial(t *testing.T) {
}
{ // Ensure Archive works at all
err := db.Orders().Archive(ctx, satelliteID, serial, orders.StatusAccepted)
err := db.Orders().Archive(ctx, orders.ArchiveRequest{satelliteID, serial, orders.StatusAccepted})
require.NoError(t, err)
}

View File

@ -52,6 +52,13 @@ const (
StatusRejected
)
// ArchiveRequest defines arguments for archiving a single order.
type ArchiveRequest struct {
Satellite storj.NodeID
Serial storj.SerialNumber
Status Status
}
// DB implements storing orders for sending to the satellite.
type DB interface {
// Enqueue inserts order to the list of orders needing to be sent to the satellite.
@ -62,8 +69,7 @@ type DB interface {
ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*Info, error)
// Archive marks order as being handled.
Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status Status) error
Archive(ctx context.Context, requests ...ArchiveRequest) error
// ListArchived returns orders that have been sent.
ListArchived(ctx context.Context, limit int) ([]*ArchivedInfo, error)
}
@ -109,12 +115,18 @@ func (sender *Sender) runOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
sender.log.Debug("sending")
const batchSize = 1000
ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx)
if err != nil {
sender.log.Error("listing orders", zap.Error(err))
return nil
}
requests := make(chan ArchiveRequest, batchSize)
var batchGroup errgroup.Group
batchGroup.Go(func() error { return sender.handleBatches(ctx, requests) })
if len(ordersBySatellite) > 0 {
var group errgroup.Group
ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout)
@ -123,29 +135,59 @@ func (sender *Sender) runOnce(ctx context.Context) (err error) {
for satelliteID, orders := range ordersBySatellite {
satelliteID, orders := satelliteID, orders
group.Go(func() error {
sender.Settle(ctx, satelliteID, orders)
sender.Settle(ctx, satelliteID, orders, requests)
return nil
})
}
_ = group.Wait() // doesn't return errors
} else {
sender.log.Debug("no orders to send")
}
return nil
close(requests)
return batchGroup.Wait()
}
// Settle uploads orders to the satellite.
func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info) {
func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) {
log := sender.log.Named(satelliteID.String())
err := sender.settle(ctx, log, satelliteID, orders)
err := sender.settle(ctx, log, satelliteID, orders, requests)
if err != nil {
log.Error("failed to settle orders", zap.Error(err))
}
}
func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info) (err error) {
func (sender *Sender) handleBatches(ctx context.Context, requests chan ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
// In case anything goes wrong, discard everything from the channel.
defer func() {
for range requests {
}
}()
buffer := make([]ArchiveRequest, 0, cap(requests))
for request := range requests {
buffer = append(buffer, request)
if len(buffer) < cap(buffer) {
continue
}
if err := sender.orders.Archive(ctx, buffer...); err != nil {
return err
}
buffer = buffer[:0]
}
if len(buffer) > 0 {
return sender.orders.Archive(ctx, buffer...)
}
return nil
}
func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
log.Info("sending", zap.Int("count", len(orders)))
@ -208,19 +250,21 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s
break
}
var status Status
switch response.Status {
case pb.SettlementResponse_ACCEPTED:
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusAccepted)
if err != nil {
errHandle(OrderError, "failed to archive order as accepted: serial: %v, %v", response.SerialNumber, err)
}
status = StatusAccepted
case pb.SettlementResponse_REJECTED:
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusRejected)
if err != nil {
errHandle(OrderError, "failed to archive order as rejected: serial: %v, %v", response.SerialNumber, err)
}
status = StatusRejected
default:
errHandle(OrderError, "unexpected response: %v", response.Status)
continue
}
requests <- ArchiveRequest{
Satellite: satelliteID,
Serial: response.SerialNumber,
Status: status,
}
}

View File

@ -202,7 +202,11 @@ func TestPieceInfo_Trivial(t *testing.T) {
}
{ // Ensure Archive works at all
err := db.Orders().Archive(ctx, satelliteID, serial, orders.StatusAccepted)
err := db.Orders().Archive(ctx, orders.ArchiveRequest{
Satellite: satelliteID,
Serial: serial,
Status: orders.StatusAccepted,
})
require.NoError(t, err)
}

View File

@ -146,10 +146,36 @@ func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.Node
}
// Archive marks order as being handled.
func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status orders.Status) (err error) {
func (db *ordersdb) Archive(ctx context.Context, requests ...orders.ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
result, err := db.db.Exec(`
txn, err := db.Begin()
if err != nil {
return ErrInfo.Wrap(err)
}
defer func() {
if err == nil {
err = txn.Commit()
} else {
err = errs.Combine(err, txn.Rollback())
}
}()
for _, req := range requests {
err := db.archiveOne(ctx, txn, req)
if err != nil {
return ErrInfo.Wrap(err)
}
}
return nil
}
// archiveOne marks order as being handled.
func (db *ordersdb) archiveOne(ctx context.Context, txn *sql.Tx, req orders.ArchiveRequest) (err error) {
defer mon.Task()(&ctx)(&err)
result, err := txn.Exec(`
INSERT INTO order_archive_ (
satellite_id, serial_number,
order_limit_serialized, order_serialized,
@ -165,7 +191,7 @@ func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial
DELETE FROM unsent_order
WHERE satellite_id = ? AND serial_number = ?;
`, int(status), time.Now().UTC(), satellite, serial, satellite, serial)
`, int(req.Status), time.Now().UTC(), req.Satellite, req.Serial, req.Satellite, req.Serial)
if err != nil {
return ErrInfo.Wrap(err)
}