From 26a2fbb719ded7473ffaffd1bb38f835b4875e47 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Wed, 31 Jul 2019 12:40:08 -0400 Subject: [PATCH] storagenode: batch archiving unsent_orders (#2507) --- storagenode/orders/db_test.go | 6 +-- storagenode/orders/sender.go | 76 +++++++++++++++++++++++------ storagenode/pieces/db_test.go | 6 ++- storagenode/storagenodedb/orders.go | 32 ++++++++++-- 4 files changed, 97 insertions(+), 23 deletions(-) diff --git a/storagenode/orders/db_test.go b/storagenode/orders/db_test.go index 547b6c8be..08a5e082e 100644 --- a/storagenode/orders/db_test.go +++ b/storagenode/orders/db_test.go @@ -99,11 +99,11 @@ func TestDB(t *testing.T) { require.Empty(t, cmp.Diff(expectedGrouped, unsentGrouped, cmp.Comparer(pb.Equal))) // test archival - err = ordersdb.Archive(ctx, satellite0.ID, serialNumber, orders.StatusAccepted) + err = ordersdb.Archive(ctx, orders.ArchiveRequest{satellite0.ID, serialNumber, orders.StatusAccepted}) require.NoError(t, err) // duplicate archive - err = ordersdb.Archive(ctx, satellite0.ID, serialNumber, orders.StatusRejected) + err = ordersdb.Archive(ctx, orders.ArchiveRequest{satellite0.ID, serialNumber, orders.StatusRejected}) require.Error(t, err) // shouldn't be in unsent list @@ -159,7 +159,7 @@ func TestDB_Trivial(t *testing.T) { } { // Ensure Archive works at all - err := db.Orders().Archive(ctx, satelliteID, serial, orders.StatusAccepted) + err := db.Orders().Archive(ctx, orders.ArchiveRequest{satelliteID, serial, orders.StatusAccepted}) require.NoError(t, err) } diff --git a/storagenode/orders/sender.go b/storagenode/orders/sender.go index 14120ffc1..84db01d90 100644 --- a/storagenode/orders/sender.go +++ b/storagenode/orders/sender.go @@ -52,6 +52,13 @@ const ( StatusRejected ) +// ArchiveRequest defines arguments for archiving a single order. +type ArchiveRequest struct { + Satellite storj.NodeID + Serial storj.SerialNumber + Status Status +} + // DB implements storing orders for sending to the satellite. type DB interface { // Enqueue inserts order to the list of orders needing to be sent to the satellite. @@ -62,8 +69,7 @@ type DB interface { ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*Info, error) // Archive marks order as being handled. - Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status Status) error - + Archive(ctx context.Context, requests ...ArchiveRequest) error // ListArchived returns orders that have been sent. ListArchived(ctx context.Context, limit int) ([]*ArchivedInfo, error) } @@ -109,12 +115,18 @@ func (sender *Sender) runOnce(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) sender.log.Debug("sending") + const batchSize = 1000 + ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx) if err != nil { sender.log.Error("listing orders", zap.Error(err)) return nil } + requests := make(chan ArchiveRequest, batchSize) + var batchGroup errgroup.Group + batchGroup.Go(func() error { return sender.handleBatches(ctx, requests) }) + if len(ordersBySatellite) > 0 { var group errgroup.Group ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout) @@ -123,29 +135,59 @@ func (sender *Sender) runOnce(ctx context.Context) (err error) { for satelliteID, orders := range ordersBySatellite { satelliteID, orders := satelliteID, orders group.Go(func() error { - - sender.Settle(ctx, satelliteID, orders) + sender.Settle(ctx, satelliteID, orders, requests) return nil }) } + _ = group.Wait() // doesn't return errors } else { sender.log.Debug("no orders to send") } - return nil + close(requests) + return batchGroup.Wait() } // Settle uploads orders to the satellite. -func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info) { +func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) { log := sender.log.Named(satelliteID.String()) - err := sender.settle(ctx, log, satelliteID, orders) + err := sender.settle(ctx, log, satelliteID, orders, requests) if err != nil { log.Error("failed to settle orders", zap.Error(err)) } } -func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info) (err error) { +func (sender *Sender) handleBatches(ctx context.Context, requests chan ArchiveRequest) (err error) { + defer mon.Task()(&ctx)(&err) + + // In case anything goes wrong, discard everything from the channel. + defer func() { + for range requests { + } + }() + + buffer := make([]ArchiveRequest, 0, cap(requests)) + + for request := range requests { + buffer = append(buffer, request) + if len(buffer) < cap(buffer) { + continue + } + + if err := sender.orders.Archive(ctx, buffer...); err != nil { + return err + } + buffer = buffer[:0] + } + + if len(buffer) > 0 { + return sender.orders.Archive(ctx, buffer...) + } + return nil +} + +func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info, requests chan ArchiveRequest) (err error) { defer mon.Task()(&ctx)(&err) log.Info("sending", zap.Int("count", len(orders))) @@ -208,19 +250,21 @@ func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID s break } + var status Status switch response.Status { case pb.SettlementResponse_ACCEPTED: - err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusAccepted) - if err != nil { - errHandle(OrderError, "failed to archive order as accepted: serial: %v, %v", response.SerialNumber, err) - } + status = StatusAccepted case pb.SettlementResponse_REJECTED: - err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusRejected) - if err != nil { - errHandle(OrderError, "failed to archive order as rejected: serial: %v, %v", response.SerialNumber, err) - } + status = StatusRejected default: errHandle(OrderError, "unexpected response: %v", response.Status) + continue + } + + requests <- ArchiveRequest{ + Satellite: satelliteID, + Serial: response.SerialNumber, + Status: status, } } diff --git a/storagenode/pieces/db_test.go b/storagenode/pieces/db_test.go index 1a9ec9723..43cabdbd4 100644 --- a/storagenode/pieces/db_test.go +++ b/storagenode/pieces/db_test.go @@ -202,7 +202,11 @@ func TestPieceInfo_Trivial(t *testing.T) { } { // Ensure Archive works at all - err := db.Orders().Archive(ctx, satelliteID, serial, orders.StatusAccepted) + err := db.Orders().Archive(ctx, orders.ArchiveRequest{ + Satellite: satelliteID, + Serial: serial, + Status: orders.StatusAccepted, + }) require.NoError(t, err) } diff --git a/storagenode/storagenodedb/orders.go b/storagenode/storagenodedb/orders.go index d1228d3ba..297b7b309 100644 --- a/storagenode/storagenodedb/orders.go +++ b/storagenode/storagenodedb/orders.go @@ -146,10 +146,36 @@ func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.Node } // Archive marks order as being handled. -func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status orders.Status) (err error) { +func (db *ordersdb) Archive(ctx context.Context, requests ...orders.ArchiveRequest) (err error) { defer mon.Task()(&ctx)(&err) - result, err := db.db.Exec(` + txn, err := db.Begin() + if err != nil { + return ErrInfo.Wrap(err) + } + defer func() { + if err == nil { + err = txn.Commit() + } else { + err = errs.Combine(err, txn.Rollback()) + } + }() + + for _, req := range requests { + err := db.archiveOne(ctx, txn, req) + if err != nil { + return ErrInfo.Wrap(err) + } + } + + return nil +} + +// archiveOne marks order as being handled. +func (db *ordersdb) archiveOne(ctx context.Context, txn *sql.Tx, req orders.ArchiveRequest) (err error) { + defer mon.Task()(&ctx)(&err) + + result, err := txn.Exec(` INSERT INTO order_archive_ ( satellite_id, serial_number, order_limit_serialized, order_serialized, @@ -165,7 +191,7 @@ func (db *ordersdb) Archive(ctx context.Context, satellite storj.NodeID, serial DELETE FROM unsent_order WHERE satellite_id = ? AND serial_number = ?; - `, int(status), time.Now().UTC(), satellite, serial, satellite, serial) + `, int(req.Status), time.Now().UTC(), req.Satellite, req.Serial, req.Satellite, req.Serial) if err != nil { return ErrInfo.Wrap(err) }