2019-03-18 10:55:06 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package storagenodedb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"database/sql"
|
2020-07-14 14:04:38 +01:00
|
|
|
"errors"
|
2019-03-21 13:24:26 +00:00
|
|
|
"time"
|
2019-03-18 10:55:06 +00:00
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/storj"
|
2021-04-23 10:52:40 +01:00
|
|
|
"storj.io/private/tagsql"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/storagenode/orders"
|
2020-10-01 23:52:22 +01:00
|
|
|
"storj.io/storj/storagenode/orders/ordersfile"
|
2019-03-18 10:55:06 +00:00
|
|
|
)
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
// ErrOrders represents errors from the ordersdb database.
|
2021-04-28 09:06:17 +01:00
|
|
|
var ErrOrders = errs.Class("ordersdb")
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-09-18 17:17:28 +01:00
|
|
|
// OrdersDBName represents the database name.
|
|
|
|
const OrdersDBName = "orders"
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-09-18 17:17:28 +01:00
|
|
|
type ordersDB struct {
|
2019-11-13 16:49:22 +00:00
|
|
|
dbContainerImpl
|
2019-08-21 15:32:25 +01:00
|
|
|
}
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// Enqueue inserts order to the unsent list.
|
2020-10-01 23:52:22 +01:00
|
|
|
func (db *ordersDB) Enqueue(ctx context.Context, info *ordersfile.Info) (err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
limitSerialized, err := pb.Marshal(info.Limit)
|
2019-03-18 10:55:06 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
orderSerialized, err := pb.Marshal(info.Order)
|
2019-03-18 10:55:06 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
2019-07-11 21:51:40 +01:00
|
|
|
// TODO: remove uplink_cert_id
|
2020-01-14 11:07:35 +00:00
|
|
|
_, err = db.ExecContext(ctx, `
|
2019-03-18 10:55:06 +00:00
|
|
|
INSERT INTO unsent_order(
|
|
|
|
satellite_id, serial_number,
|
|
|
|
order_limit_serialized, order_serialized, order_limit_expiration,
|
|
|
|
uplink_cert_id
|
|
|
|
) VALUES (?,?, ?,?,?, ?)
|
2019-07-09 22:54:00 +01:00
|
|
|
`, info.Limit.SatelliteId, info.Limit.SerialNumber, limitSerialized, orderSerialized, info.Limit.OrderExpiration.UTC(), 0)
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ListUnsent returns orders that haven't been sent yet.
|
2019-08-16 16:33:51 +01:00
|
|
|
//
|
|
|
|
// If there is some unmarshal error while reading an order, the method proceed
|
|
|
|
// with the following ones and the function will return the ones which have
|
|
|
|
// been successfully read but returning an error with information of the ones
|
|
|
|
// which have not. In case of database or other system error, the method will
|
|
|
|
// stop without any further processing and will return an error without any
|
|
|
|
// order.
|
2020-10-01 23:52:22 +01:00
|
|
|
func (db *ordersDB) ListUnsent(ctx context.Context, limit int) (_ []*ordersfile.Info, err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2020-01-14 11:07:35 +00:00
|
|
|
rows, err := db.QueryContext(ctx, `
|
2019-07-09 22:33:45 +01:00
|
|
|
SELECT order_limit_serialized, order_serialized
|
2019-03-18 10:55:06 +00:00
|
|
|
FROM unsent_order
|
|
|
|
LIMIT ?
|
|
|
|
`, limit)
|
|
|
|
if err != nil {
|
2020-07-14 14:04:38 +01:00
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
2019-03-18 10:55:06 +00:00
|
|
|
return nil, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
2019-08-16 16:33:51 +01:00
|
|
|
|
|
|
|
var unmarshalErrors errs.Group
|
|
|
|
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2020-10-01 23:52:22 +01:00
|
|
|
var infos []*ordersfile.Info
|
2019-03-18 10:55:06 +00:00
|
|
|
for rows.Next() {
|
|
|
|
var limitSerialized []byte
|
|
|
|
var orderSerialized []byte
|
|
|
|
|
2019-07-09 22:33:45 +01:00
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized)
|
2019-03-18 10:55:06 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
2020-10-01 23:52:22 +01:00
|
|
|
var info ordersfile.Info
|
2019-07-01 16:54:11 +01:00
|
|
|
info.Limit = &pb.OrderLimit{}
|
|
|
|
info.Order = &pb.Order{}
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
err = pb.Unmarshal(limitSerialized, info.Limit)
|
2019-03-18 10:55:06 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
err = pb.Unmarshal(orderSerialized, info.Order)
|
2019-03-18 10:55:06 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
infos = append(infos, &info)
|
|
|
|
}
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return infos, ErrOrders.Wrap(rows.Err())
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2020-10-15 19:57:02 +01:00
|
|
|
// ListUnsentBySatellite returns orders that haven't been sent yet and are not expired.
|
|
|
|
// The orders are ordered by the Satellite ID.
|
2019-08-16 16:33:51 +01:00
|
|
|
//
|
|
|
|
// If there is some unmarshal error while reading an order, the method proceed
|
|
|
|
// with the following ones and the function will return the ones which have
|
|
|
|
// been successfully read but returning an error with information of the ones
|
|
|
|
// which have not. In case of database or other system error, the method will
|
|
|
|
// stop without any further processing and will return an error without any
|
|
|
|
// order.
|
2020-10-01 23:52:22 +01:00
|
|
|
func (db *ordersDB) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*ordersfile.Info, err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-21 13:24:26 +00:00
|
|
|
// TODO: add some limiting
|
|
|
|
|
2020-01-14 11:07:35 +00:00
|
|
|
rows, err := db.QueryContext(ctx, `
|
2019-03-21 13:24:26 +00:00
|
|
|
SELECT order_limit_serialized, order_serialized
|
|
|
|
FROM unsent_order
|
2020-10-15 19:57:02 +01:00
|
|
|
WHERE order_limit_expiration >= $1
|
|
|
|
`, time.Now().UTC())
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2020-07-14 14:04:38 +01:00
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
2019-03-21 13:24:26 +00:00
|
|
|
return nil, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
2019-08-16 16:33:51 +01:00
|
|
|
|
|
|
|
var unmarshalErrors errs.Group
|
|
|
|
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2020-10-01 23:52:22 +01:00
|
|
|
infos := map[storj.NodeID][]*ordersfile.Info{}
|
2019-03-21 13:24:26 +00:00
|
|
|
for rows.Next() {
|
|
|
|
var limitSerialized []byte
|
|
|
|
var orderSerialized []byte
|
|
|
|
|
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
2020-10-01 23:52:22 +01:00
|
|
|
var info ordersfile.Info
|
2019-07-01 16:54:11 +01:00
|
|
|
info.Limit = &pb.OrderLimit{}
|
|
|
|
info.Order = &pb.Order{}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
err = pb.Unmarshal(limitSerialized, info.Limit)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
err = pb.Unmarshal(orderSerialized, info.Order)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
infos[info.Limit.SatelliteId] = append(infos[info.Limit.SatelliteId], &info)
|
|
|
|
}
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return infos, ErrOrders.Wrap(rows.Err())
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Archive marks order as being handled.
|
2019-08-16 15:53:22 +01:00
|
|
|
//
|
|
|
|
// If any of the request contains an order which doesn't exist the method will
|
|
|
|
// follow with the next ones without interrupting the operation and it will
|
|
|
|
// return an error of the class orders.OrderNotFoundError. Any other error, will
|
|
|
|
// abort the operation, rolling back the transaction.
|
2019-08-22 15:33:14 +01:00
|
|
|
func (db *ordersDB) Archive(ctx context.Context, archivedAt time.Time, requests ...orders.ArchiveRequest) (err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-21 13:24:26 +00:00
|
|
|
|
storagenode: live tracking of order window usage
This change accomplishes multiple things:
1. Instead of having a max in flight time, which means
we effectively have a minimum bandwidth for uploads
and downloads, we keep track of what windows have
active requests happening in them.
2. We don't double check when we save the order to see if it
is too old: by then, it's too late. A malicious uplink
could just submit orders outside of the grace window and
receive all the data, but the node would just not commit
it, so the uplink gets free traffic. Because the endpoints
also check for the order being too old, this would be a
very tight race that depends on knowledge of the node system
clock, but best to not have the race exist. Instead, we piggy
back off of the in flight tracking and do the check when
we start to handle the order, and commit at the end.
3. Change the functions that send orders and list unsent
orders to accept a time at which that operation is
happening. This way, in tests, we can pretend we're
listing or sending far into the future after the windows
are available to send, rather than exposing test functions
to modify internal state about the grace period to get
the desired effect. This brings tests closer to actual
usage in production.
4. Change the calculation for if an order is allowed to be
enqueued due to the grace period to just look at the
order creation time, rather than some computation involving
the window it will be in. In this way, you can easily
answer the question of "will this order be accepted?" by
asking "is it older than X?" where X is the grace period.
5. Increases the frequency we check to send up orders to once
every 5 minutes instead of once every hour because we already
have hour-long buffering due to the windows. This decreases
the maximum latency that an order will be reported back to
the satellite by 55 minutes.
Change-Id: Ie08b90d139d45ee89b82347e191a2f8db1b88036
2020-08-12 20:01:43 +01:00
|
|
|
// change input parameter to UTC timezone before we send it to the database
|
|
|
|
archivedAt = archivedAt.UTC()
|
|
|
|
|
2020-01-15 07:25:26 +00:00
|
|
|
tx, err := db.BeginTx(ctx, nil)
|
2019-07-31 17:40:08 +01:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-07-31 17:40:08 +01:00
|
|
|
}
|
2019-08-16 15:53:22 +01:00
|
|
|
|
|
|
|
var notFoundErrs errs.Group
|
2019-07-31 17:40:08 +01:00
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
2020-01-15 07:25:26 +00:00
|
|
|
err = tx.Commit()
|
2019-08-16 15:53:22 +01:00
|
|
|
if err == nil {
|
|
|
|
if len(notFoundErrs) > 0 {
|
|
|
|
// Return a class error to allow to the caler to identify this case
|
2019-08-21 17:30:29 +01:00
|
|
|
err = orders.OrderNotFoundError.Wrap(notFoundErrs.Err())
|
2019-08-16 15:53:22 +01:00
|
|
|
}
|
|
|
|
}
|
2019-07-31 17:40:08 +01:00
|
|
|
} else {
|
2020-01-15 07:25:26 +00:00
|
|
|
err = errs.Combine(err, tx.Rollback())
|
2019-07-31 17:40:08 +01:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for _, req := range requests {
|
2020-01-15 07:25:26 +00:00
|
|
|
err := db.archiveOne(ctx, tx, archivedAt, req)
|
2019-07-31 17:40:08 +01:00
|
|
|
if err != nil {
|
2019-08-16 15:53:22 +01:00
|
|
|
if orders.OrderNotFoundError.Has(err) {
|
|
|
|
notFoundErrs.Add(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
2019-07-31 17:40:08 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// archiveOne marks order as being handled.
|
2020-01-17 19:08:29 +00:00
|
|
|
func (db *ordersDB) archiveOne(ctx context.Context, tx tagsql.Tx, archivedAt time.Time, req orders.ArchiveRequest) (err error) {
|
2019-07-31 17:40:08 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-01-15 07:25:26 +00:00
|
|
|
result, err := tx.ExecContext(ctx, `
|
2019-07-16 17:31:29 +01:00
|
|
|
INSERT INTO order_archive_ (
|
2019-03-21 13:24:26 +00:00
|
|
|
satellite_id, serial_number,
|
|
|
|
order_limit_serialized, order_serialized,
|
|
|
|
uplink_cert_id,
|
|
|
|
status, archived_at
|
2019-06-04 13:31:39 +01:00
|
|
|
) SELECT
|
2019-03-21 13:24:26 +00:00
|
|
|
satellite_id, serial_number,
|
2019-06-04 13:31:39 +01:00
|
|
|
order_limit_serialized, order_serialized,
|
2019-03-21 13:24:26 +00:00
|
|
|
uplink_cert_id,
|
|
|
|
?, ?
|
|
|
|
FROM unsent_order
|
|
|
|
WHERE satellite_id = ? AND serial_number = ?;
|
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
DELETE FROM unsent_order
|
2019-03-21 13:24:26 +00:00
|
|
|
WHERE satellite_id = ? AND serial_number = ?;
|
2019-08-22 15:33:14 +01:00
|
|
|
`, int(req.Status), archivedAt, req.Satellite, req.Serial, req.Satellite, req.Serial)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
count, err := result.RowsAffected()
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
if count == 0 {
|
2019-08-16 15:53:22 +01:00
|
|
|
return orders.OrderNotFoundError.New("satellite: %s, serial number: %s",
|
|
|
|
req.Satellite.String(), req.Serial.String(),
|
|
|
|
)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListArchived returns orders that have been sent.
|
2019-08-21 15:32:25 +01:00
|
|
|
func (db *ordersDB) ListArchived(ctx context.Context, limit int) (_ []*orders.ArchivedInfo, err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2020-01-14 11:07:35 +00:00
|
|
|
rows, err := db.QueryContext(ctx, `
|
2019-07-09 22:33:45 +01:00
|
|
|
SELECT order_limit_serialized, order_serialized, status, archived_at
|
2019-07-16 17:31:29 +01:00
|
|
|
FROM order_archive_
|
2019-03-21 13:24:26 +00:00
|
|
|
LIMIT ?
|
|
|
|
`, limit)
|
|
|
|
if err != nil {
|
2020-07-14 14:04:38 +01:00
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
2019-03-21 13:24:26 +00:00
|
|
|
return nil, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
|
|
|
|
var infos []*orders.ArchivedInfo
|
|
|
|
for rows.Next() {
|
|
|
|
var limitSerialized []byte
|
|
|
|
var orderSerialized []byte
|
|
|
|
|
|
|
|
var status int
|
|
|
|
var archivedAt time.Time
|
|
|
|
|
2019-07-09 22:33:45 +01:00
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized, &status, &archivedAt)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var info orders.ArchivedInfo
|
2019-07-01 16:54:11 +01:00
|
|
|
info.Limit = &pb.OrderLimit{}
|
|
|
|
info.Order = &pb.Order{}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
|
|
|
info.Status = orders.Status(status)
|
|
|
|
info.ArchivedAt = archivedAt
|
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
err = pb.Unmarshal(limitSerialized, info.Limit)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
2020-04-08 13:08:57 +01:00
|
|
|
err = pb.Unmarshal(orderSerialized, info.Order)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
infos = append(infos, &info)
|
|
|
|
}
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return infos, ErrOrders.Wrap(rows.Err())
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
2019-08-15 17:56:33 +01:00
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// CleanArchive deletes all entries older than ttl.
|
storagenode: live tracking of order window usage
This change accomplishes multiple things:
1. Instead of having a max in flight time, which means
we effectively have a minimum bandwidth for uploads
and downloads, we keep track of what windows have
active requests happening in them.
2. We don't double check when we save the order to see if it
is too old: by then, it's too late. A malicious uplink
could just submit orders outside of the grace window and
receive all the data, but the node would just not commit
it, so the uplink gets free traffic. Because the endpoints
also check for the order being too old, this would be a
very tight race that depends on knowledge of the node system
clock, but best to not have the race exist. Instead, we piggy
back off of the in flight tracking and do the check when
we start to handle the order, and commit at the end.
3. Change the functions that send orders and list unsent
orders to accept a time at which that operation is
happening. This way, in tests, we can pretend we're
listing or sending far into the future after the windows
are available to send, rather than exposing test functions
to modify internal state about the grace period to get
the desired effect. This brings tests closer to actual
usage in production.
4. Change the calculation for if an order is allowed to be
enqueued due to the grace period to just look at the
order creation time, rather than some computation involving
the window it will be in. In this way, you can easily
answer the question of "will this order be accepted?" by
asking "is it older than X?" where X is the grace period.
5. Increases the frequency we check to send up orders to once
every 5 minutes instead of once every hour because we already
have hour-long buffering due to the windows. This decreases
the maximum latency that an order will be reported back to
the satellite by 55 minutes.
Change-Id: Ie08b90d139d45ee89b82347e191a2f8db1b88036
2020-08-12 20:01:43 +01:00
|
|
|
func (db *ordersDB) CleanArchive(ctx context.Context, deleteBefore time.Time) (_ int, err error) {
|
2019-08-15 17:56:33 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-01-14 11:07:35 +00:00
|
|
|
result, err := db.ExecContext(ctx, `
|
2019-08-15 17:56:33 +01:00
|
|
|
DELETE FROM order_archive_
|
|
|
|
WHERE archived_at <= ?
|
storagenode: live tracking of order window usage
This change accomplishes multiple things:
1. Instead of having a max in flight time, which means
we effectively have a minimum bandwidth for uploads
and downloads, we keep track of what windows have
active requests happening in them.
2. We don't double check when we save the order to see if it
is too old: by then, it's too late. A malicious uplink
could just submit orders outside of the grace window and
receive all the data, but the node would just not commit
it, so the uplink gets free traffic. Because the endpoints
also check for the order being too old, this would be a
very tight race that depends on knowledge of the node system
clock, but best to not have the race exist. Instead, we piggy
back off of the in flight tracking and do the check when
we start to handle the order, and commit at the end.
3. Change the functions that send orders and list unsent
orders to accept a time at which that operation is
happening. This way, in tests, we can pretend we're
listing or sending far into the future after the windows
are available to send, rather than exposing test functions
to modify internal state about the grace period to get
the desired effect. This brings tests closer to actual
usage in production.
4. Change the calculation for if an order is allowed to be
enqueued due to the grace period to just look at the
order creation time, rather than some computation involving
the window it will be in. In this way, you can easily
answer the question of "will this order be accepted?" by
asking "is it older than X?" where X is the grace period.
5. Increases the frequency we check to send up orders to once
every 5 minutes instead of once every hour because we already
have hour-long buffering due to the windows. This decreases
the maximum latency that an order will be reported back to
the satellite by 55 minutes.
Change-Id: Ie08b90d139d45ee89b82347e191a2f8db1b88036
2020-08-12 20:01:43 +01:00
|
|
|
`, deleteBefore.UTC())
|
2019-08-15 17:56:33 +01:00
|
|
|
if err != nil {
|
2020-07-14 14:04:38 +01:00
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
2019-08-15 17:56:33 +01:00
|
|
|
return 0, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return 0, ErrOrders.Wrap(err)
|
2019-08-15 17:56:33 +01:00
|
|
|
}
|
|
|
|
count, err := result.RowsAffected()
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return 0, ErrOrders.Wrap(err)
|
2019-08-15 17:56:33 +01:00
|
|
|
}
|
|
|
|
return int(count), nil
|
|
|
|
}
|