2019-03-18 10:55:06 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package storagenodedb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"database/sql"
|
2019-03-21 13:24:26 +00:00
|
|
|
"time"
|
2019-03-18 10:55:06 +00:00
|
|
|
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/storj"
|
2019-03-18 10:55:06 +00:00
|
|
|
"storj.io/storj/storagenode/orders"
|
|
|
|
)
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
// ErrOrders represents errors from the ordersdb database.
|
|
|
|
var ErrOrders = errs.Class("ordersdb error")
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-09-18 17:17:28 +01:00
|
|
|
// OrdersDBName represents the database name.
|
|
|
|
const OrdersDBName = "orders"
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-09-18 17:17:28 +01:00
|
|
|
type ordersDB struct {
|
2019-11-13 16:49:22 +00:00
|
|
|
dbContainerImpl
|
2019-08-21 15:32:25 +01:00
|
|
|
}
|
2019-03-18 10:55:06 +00:00
|
|
|
|
|
|
|
// Enqueue inserts order to the unsent list
|
2019-08-21 15:32:25 +01:00
|
|
|
func (db *ordersDB) Enqueue(ctx context.Context, info *orders.Info) (err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-03-18 10:55:06 +00:00
|
|
|
limitSerialized, err := proto.Marshal(info.Limit)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
orderSerialized, err := proto.Marshal(info.Order)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
2019-07-11 21:51:40 +01:00
|
|
|
// TODO: remove uplink_cert_id
|
2019-08-21 15:32:25 +01:00
|
|
|
_, err = db.Exec(`
|
2019-03-18 10:55:06 +00:00
|
|
|
INSERT INTO unsent_order(
|
|
|
|
satellite_id, serial_number,
|
|
|
|
order_limit_serialized, order_serialized, order_limit_expiration,
|
|
|
|
uplink_cert_id
|
|
|
|
) VALUES (?,?, ?,?,?, ?)
|
2019-07-09 22:54:00 +01:00
|
|
|
`, info.Limit.SatelliteId, info.Limit.SerialNumber, limitSerialized, orderSerialized, info.Limit.OrderExpiration.UTC(), 0)
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ListUnsent returns orders that haven't been sent yet.
|
2019-08-16 16:33:51 +01:00
|
|
|
//
|
|
|
|
// If there is some unmarshal error while reading an order, the method proceed
|
|
|
|
// with the following ones and the function will return the ones which have
|
|
|
|
// been successfully read but returning an error with information of the ones
|
|
|
|
// which have not. In case of database or other system error, the method will
|
|
|
|
// stop without any further processing and will return an error without any
|
|
|
|
// order.
|
2019-08-21 15:32:25 +01:00
|
|
|
func (db *ordersDB) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info, err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-18 10:55:06 +00:00
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
rows, err := db.Query(`
|
2019-07-09 22:33:45 +01:00
|
|
|
SELECT order_limit_serialized, order_serialized
|
2019-03-18 10:55:06 +00:00
|
|
|
FROM unsent_order
|
|
|
|
LIMIT ?
|
|
|
|
`, limit)
|
|
|
|
if err != nil {
|
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
return nil, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
2019-08-16 16:33:51 +01:00
|
|
|
|
|
|
|
var unmarshalErrors errs.Group
|
|
|
|
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
|
2019-03-18 10:55:06 +00:00
|
|
|
|
|
|
|
var infos []*orders.Info
|
|
|
|
for rows.Next() {
|
|
|
|
var limitSerialized []byte
|
|
|
|
var orderSerialized []byte
|
|
|
|
|
2019-07-09 22:33:45 +01:00
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized)
|
2019-03-18 10:55:06 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var info orders.Info
|
2019-07-01 16:54:11 +01:00
|
|
|
info.Limit = &pb.OrderLimit{}
|
|
|
|
info.Order = &pb.Order{}
|
2019-03-18 10:55:06 +00:00
|
|
|
|
|
|
|
err = proto.Unmarshal(limitSerialized, info.Limit)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
err = proto.Unmarshal(orderSerialized, info.Order)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
infos = append(infos, &info)
|
|
|
|
}
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return infos, ErrOrders.Wrap(rows.Err())
|
2019-03-18 10:55:06 +00:00
|
|
|
}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2019-08-16 16:33:51 +01:00
|
|
|
// ListUnsentBySatellite returns orders that haven't been sent yet grouped by
|
|
|
|
// satellite. Does not return uplink identity.
|
|
|
|
//
|
|
|
|
// If there is some unmarshal error while reading an order, the method proceed
|
|
|
|
// with the following ones and the function will return the ones which have
|
|
|
|
// been successfully read but returning an error with information of the ones
|
|
|
|
// which have not. In case of database or other system error, the method will
|
|
|
|
// stop without any further processing and will return an error without any
|
|
|
|
// order.
|
2019-08-21 15:32:25 +01:00
|
|
|
func (db *ordersDB) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*orders.Info, err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-21 13:24:26 +00:00
|
|
|
// TODO: add some limiting
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
rows, err := db.Query(`
|
2019-03-21 13:24:26 +00:00
|
|
|
SELECT order_limit_serialized, order_serialized
|
|
|
|
FROM unsent_order
|
|
|
|
`)
|
|
|
|
if err != nil {
|
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
return nil, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
2019-08-16 16:33:51 +01:00
|
|
|
|
|
|
|
var unmarshalErrors errs.Group
|
|
|
|
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
|
2019-03-21 13:24:26 +00:00
|
|
|
|
|
|
|
infos := map[storj.NodeID][]*orders.Info{}
|
|
|
|
for rows.Next() {
|
|
|
|
var limitSerialized []byte
|
|
|
|
var orderSerialized []byte
|
|
|
|
|
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var info orders.Info
|
2019-07-01 16:54:11 +01:00
|
|
|
info.Limit = &pb.OrderLimit{}
|
|
|
|
info.Order = &pb.Order{}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
|
|
|
err = proto.Unmarshal(limitSerialized, info.Limit)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
err = proto.Unmarshal(orderSerialized, info.Order)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
unmarshalErrors.Add(ErrOrders.Wrap(err))
|
2019-08-16 16:33:51 +01:00
|
|
|
continue
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
infos[info.Limit.SatelliteId] = append(infos[info.Limit.SatelliteId], &info)
|
|
|
|
}
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return infos, ErrOrders.Wrap(rows.Err())
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Archive marks order as being handled.
|
2019-08-16 15:53:22 +01:00
|
|
|
//
|
|
|
|
// If any of the request contains an order which doesn't exist the method will
|
|
|
|
// follow with the next ones without interrupting the operation and it will
|
|
|
|
// return an error of the class orders.OrderNotFoundError. Any other error, will
|
|
|
|
// abort the operation, rolling back the transaction.
|
2019-08-22 15:33:14 +01:00
|
|
|
func (db *ordersDB) Archive(ctx context.Context, archivedAt time.Time, requests ...orders.ArchiveRequest) (err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2019-07-31 17:40:08 +01:00
|
|
|
txn, err := db.Begin()
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-07-31 17:40:08 +01:00
|
|
|
}
|
2019-08-16 15:53:22 +01:00
|
|
|
|
|
|
|
var notFoundErrs errs.Group
|
2019-07-31 17:40:08 +01:00
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = txn.Commit()
|
2019-08-16 15:53:22 +01:00
|
|
|
if err == nil {
|
|
|
|
if len(notFoundErrs) > 0 {
|
|
|
|
// Return a class error to allow to the caler to identify this case
|
2019-08-21 17:30:29 +01:00
|
|
|
err = orders.OrderNotFoundError.Wrap(notFoundErrs.Err())
|
2019-08-16 15:53:22 +01:00
|
|
|
}
|
|
|
|
}
|
2019-07-31 17:40:08 +01:00
|
|
|
} else {
|
|
|
|
err = errs.Combine(err, txn.Rollback())
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for _, req := range requests {
|
2019-08-22 15:33:14 +01:00
|
|
|
err := db.archiveOne(ctx, txn, archivedAt, req)
|
2019-07-31 17:40:08 +01:00
|
|
|
if err != nil {
|
2019-08-16 15:53:22 +01:00
|
|
|
if orders.OrderNotFoundError.Has(err) {
|
|
|
|
notFoundErrs.Add(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
2019-07-31 17:40:08 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// archiveOne marks order as being handled.
|
2019-08-22 15:33:14 +01:00
|
|
|
func (db *ordersDB) archiveOne(ctx context.Context, txn *sql.Tx, archivedAt time.Time, req orders.ArchiveRequest) (err error) {
|
2019-07-31 17:40:08 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
result, err := txn.Exec(`
|
2019-07-16 17:31:29 +01:00
|
|
|
INSERT INTO order_archive_ (
|
2019-03-21 13:24:26 +00:00
|
|
|
satellite_id, serial_number,
|
|
|
|
order_limit_serialized, order_serialized,
|
|
|
|
uplink_cert_id,
|
|
|
|
status, archived_at
|
2019-06-04 13:31:39 +01:00
|
|
|
) SELECT
|
2019-03-21 13:24:26 +00:00
|
|
|
satellite_id, serial_number,
|
2019-06-04 13:31:39 +01:00
|
|
|
order_limit_serialized, order_serialized,
|
2019-03-21 13:24:26 +00:00
|
|
|
uplink_cert_id,
|
|
|
|
?, ?
|
|
|
|
FROM unsent_order
|
|
|
|
WHERE satellite_id = ? AND serial_number = ?;
|
|
|
|
|
2019-06-04 13:31:39 +01:00
|
|
|
DELETE FROM unsent_order
|
2019-03-21 13:24:26 +00:00
|
|
|
WHERE satellite_id = ? AND serial_number = ?;
|
2019-08-22 15:33:14 +01:00
|
|
|
`, int(req.Status), archivedAt, req.Satellite, req.Serial, req.Satellite, req.Serial)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
count, err := result.RowsAffected()
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
if count == 0 {
|
2019-08-16 15:53:22 +01:00
|
|
|
return orders.OrderNotFoundError.New("satellite: %s, serial number: %s",
|
|
|
|
req.Satellite.String(), req.Serial.String(),
|
|
|
|
)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListArchived returns orders that have been sent.
|
2019-08-21 15:32:25 +01:00
|
|
|
func (db *ordersDB) ListArchived(ctx context.Context, limit int) (_ []*orders.ArchivedInfo, err error) {
|
2019-06-04 13:31:39 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-21 13:24:26 +00:00
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
rows, err := db.Query(`
|
2019-07-09 22:33:45 +01:00
|
|
|
SELECT order_limit_serialized, order_serialized, status, archived_at
|
2019-07-16 17:31:29 +01:00
|
|
|
FROM order_archive_
|
2019-03-21 13:24:26 +00:00
|
|
|
LIMIT ?
|
|
|
|
`, limit)
|
|
|
|
if err != nil {
|
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
return nil, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
|
|
|
|
var infos []*orders.ArchivedInfo
|
|
|
|
for rows.Next() {
|
|
|
|
var limitSerialized []byte
|
|
|
|
var orderSerialized []byte
|
|
|
|
|
|
|
|
var status int
|
|
|
|
var archivedAt time.Time
|
|
|
|
|
2019-07-09 22:33:45 +01:00
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized, &status, &archivedAt)
|
2019-03-21 13:24:26 +00:00
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var info orders.ArchivedInfo
|
2019-07-01 16:54:11 +01:00
|
|
|
info.Limit = &pb.OrderLimit{}
|
|
|
|
info.Order = &pb.Order{}
|
2019-03-21 13:24:26 +00:00
|
|
|
|
|
|
|
info.Status = orders.Status(status)
|
|
|
|
info.ArchivedAt = archivedAt
|
|
|
|
|
|
|
|
err = proto.Unmarshal(limitSerialized, info.Limit)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
err = proto.Unmarshal(orderSerialized, info.Order)
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return nil, ErrOrders.Wrap(err)
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
infos = append(infos, &info)
|
|
|
|
}
|
|
|
|
|
2019-08-21 15:32:25 +01:00
|
|
|
return infos, ErrOrders.Wrap(rows.Err())
|
2019-03-21 13:24:26 +00:00
|
|
|
}
|
2019-08-15 17:56:33 +01:00
|
|
|
|
|
|
|
// CleanArchive deletes all entries older than ttl
|
2019-08-21 15:32:25 +01:00
|
|
|
func (db *ordersDB) CleanArchive(ctx context.Context, ttl time.Duration) (_ int, err error) {
|
2019-08-15 17:56:33 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
deleteBefore := time.Now().UTC().Add(-1 * ttl)
|
2019-08-21 15:32:25 +01:00
|
|
|
result, err := db.Exec(`
|
2019-08-15 17:56:33 +01:00
|
|
|
DELETE FROM order_archive_
|
|
|
|
WHERE archived_at <= ?
|
|
|
|
`, deleteBefore)
|
|
|
|
if err != nil {
|
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
return 0, nil
|
|
|
|
}
|
2019-08-21 15:32:25 +01:00
|
|
|
return 0, ErrOrders.Wrap(err)
|
2019-08-15 17:56:33 +01:00
|
|
|
}
|
|
|
|
count, err := result.RowsAffected()
|
|
|
|
if err != nil {
|
2019-08-21 15:32:25 +01:00
|
|
|
return 0, ErrOrders.Wrap(err)
|
2019-08-15 17:56:33 +01:00
|
|
|
}
|
|
|
|
return int(count), nil
|
|
|
|
}
|