546d099cf5
When an unsent order stored in the DB cannot be unmarshalled due to an unmarshal error the rest unsent orders must be processed as usual. This changes will avoid that a Storage Node with unsent orders with invalid protobuf serialized values get blocked without sending orders until those invalid ones get removed from the DB.
326 lines
8.2 KiB
Go
326 lines
8.2 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package storagenodedb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/storj"
|
|
"storj.io/storj/storagenode/orders"
|
|
)
|
|
|
|
type ordersdb struct{ *InfoDB }
|
|
|
|
// Orders returns database for storing orders
|
|
func (db *DB) Orders() orders.DB { return db.info.Orders() }
|
|
|
|
// Orders returns database for storing orders
|
|
func (db *InfoDB) Orders() orders.DB { return &ordersdb{db} }
|
|
|
|
// Enqueue inserts order to the unsent list
|
|
func (db *ordersdb) Enqueue(ctx context.Context, info *orders.Info) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
limitSerialized, err := proto.Marshal(info.Limit)
|
|
if err != nil {
|
|
return ErrInfo.Wrap(err)
|
|
}
|
|
|
|
orderSerialized, err := proto.Marshal(info.Order)
|
|
if err != nil {
|
|
return ErrInfo.Wrap(err)
|
|
}
|
|
|
|
// TODO: remove uplink_cert_id
|
|
_, err = db.db.Exec(`
|
|
INSERT INTO unsent_order(
|
|
satellite_id, serial_number,
|
|
order_limit_serialized, order_serialized, order_limit_expiration,
|
|
uplink_cert_id
|
|
) VALUES (?,?, ?,?,?, ?)
|
|
`, info.Limit.SatelliteId, info.Limit.SerialNumber, limitSerialized, orderSerialized, info.Limit.OrderExpiration.UTC(), 0)
|
|
|
|
return ErrInfo.Wrap(err)
|
|
}
|
|
|
|
// ListUnsent returns orders that haven't been sent yet.
|
|
//
|
|
// If there is some unmarshal error while reading an order, the method proceed
|
|
// with the following ones and the function will return the ones which have
|
|
// been successfully read but returning an error with information of the ones
|
|
// which have not. In case of database or other system error, the method will
|
|
// stop without any further processing and will return an error without any
|
|
// order.
|
|
func (db *ordersdb) ListUnsent(ctx context.Context, limit int) (_ []*orders.Info, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
rows, err := db.db.Query(`
|
|
SELECT order_limit_serialized, order_serialized
|
|
FROM unsent_order
|
|
LIMIT ?
|
|
`, limit)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
|
|
var unmarshalErrors errs.Group
|
|
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
|
|
|
|
var infos []*orders.Info
|
|
for rows.Next() {
|
|
var limitSerialized []byte
|
|
var orderSerialized []byte
|
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized)
|
|
if err != nil {
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
|
|
var info orders.Info
|
|
info.Limit = &pb.OrderLimit{}
|
|
info.Order = &pb.Order{}
|
|
|
|
err = proto.Unmarshal(limitSerialized, info.Limit)
|
|
if err != nil {
|
|
unmarshalErrors.Add(ErrInfo.Wrap(err))
|
|
continue
|
|
}
|
|
|
|
err = proto.Unmarshal(orderSerialized, info.Order)
|
|
if err != nil {
|
|
unmarshalErrors.Add(ErrInfo.Wrap(err))
|
|
continue
|
|
}
|
|
|
|
infos = append(infos, &info)
|
|
}
|
|
|
|
return infos, ErrInfo.Wrap(rows.Err())
|
|
}
|
|
|
|
// ListUnsentBySatellite returns orders that haven't been sent yet grouped by
|
|
// satellite. Does not return uplink identity.
|
|
//
|
|
// If there is some unmarshal error while reading an order, the method proceed
|
|
// with the following ones and the function will return the ones which have
|
|
// been successfully read but returning an error with information of the ones
|
|
// which have not. In case of database or other system error, the method will
|
|
// stop without any further processing and will return an error without any
|
|
// order.
|
|
func (db *ordersdb) ListUnsentBySatellite(ctx context.Context) (_ map[storj.NodeID][]*orders.Info, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
// TODO: add some limiting
|
|
|
|
rows, err := db.db.Query(`
|
|
SELECT order_limit_serialized, order_serialized
|
|
FROM unsent_order
|
|
`)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
|
|
var unmarshalErrors errs.Group
|
|
defer func() { err = errs.Combine(err, unmarshalErrors.Err(), rows.Close()) }()
|
|
|
|
infos := map[storj.NodeID][]*orders.Info{}
|
|
for rows.Next() {
|
|
var limitSerialized []byte
|
|
var orderSerialized []byte
|
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized)
|
|
if err != nil {
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
|
|
var info orders.Info
|
|
info.Limit = &pb.OrderLimit{}
|
|
info.Order = &pb.Order{}
|
|
|
|
err = proto.Unmarshal(limitSerialized, info.Limit)
|
|
if err != nil {
|
|
unmarshalErrors.Add(ErrInfo.Wrap(err))
|
|
continue
|
|
}
|
|
|
|
err = proto.Unmarshal(orderSerialized, info.Order)
|
|
if err != nil {
|
|
unmarshalErrors.Add(ErrInfo.Wrap(err))
|
|
continue
|
|
}
|
|
|
|
infos[info.Limit.SatelliteId] = append(infos[info.Limit.SatelliteId], &info)
|
|
}
|
|
|
|
return infos, ErrInfo.Wrap(rows.Err())
|
|
}
|
|
|
|
// Archive marks order as being handled.
|
|
//
|
|
// If any of the request contains an order which doesn't exist the method will
|
|
// follow with the next ones without interrupting the operation and it will
|
|
// return an error of the class orders.OrderNotFoundError. Any other error, will
|
|
// abort the operation, rolling back the transaction.
|
|
func (db *ordersdb) Archive(ctx context.Context, requests ...orders.ArchiveRequest) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
txn, err := db.Begin()
|
|
if err != nil {
|
|
return ErrInfo.Wrap(err)
|
|
}
|
|
|
|
var notFoundErrs errs.Group
|
|
defer func() {
|
|
if err == nil {
|
|
err = txn.Commit()
|
|
if err == nil {
|
|
if len(notFoundErrs) > 0 {
|
|
// Return a class error to allow to the caler to identify this case
|
|
err = orders.OrderNotFoundError.New(notFoundErrs.Err().Error())
|
|
}
|
|
}
|
|
} else {
|
|
err = errs.Combine(err, txn.Rollback())
|
|
}
|
|
}()
|
|
|
|
for _, req := range requests {
|
|
err := db.archiveOne(ctx, txn, req)
|
|
if err != nil {
|
|
if orders.OrderNotFoundError.Has(err) {
|
|
notFoundErrs.Add(err)
|
|
continue
|
|
}
|
|
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// archiveOne marks order as being handled.
|
|
func (db *ordersdb) archiveOne(ctx context.Context, txn *sql.Tx, req orders.ArchiveRequest) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
result, err := txn.Exec(`
|
|
INSERT INTO order_archive_ (
|
|
satellite_id, serial_number,
|
|
order_limit_serialized, order_serialized,
|
|
uplink_cert_id,
|
|
status, archived_at
|
|
) SELECT
|
|
satellite_id, serial_number,
|
|
order_limit_serialized, order_serialized,
|
|
uplink_cert_id,
|
|
?, ?
|
|
FROM unsent_order
|
|
WHERE satellite_id = ? AND serial_number = ?;
|
|
|
|
DELETE FROM unsent_order
|
|
WHERE satellite_id = ? AND serial_number = ?;
|
|
`, int(req.Status), time.Now().UTC(), req.Satellite, req.Serial, req.Satellite, req.Serial)
|
|
if err != nil {
|
|
return ErrInfo.Wrap(err)
|
|
}
|
|
|
|
count, err := result.RowsAffected()
|
|
if err != nil {
|
|
return ErrInfo.Wrap(err)
|
|
}
|
|
if count == 0 {
|
|
return orders.OrderNotFoundError.New("satellite: %s, serial number: %s",
|
|
req.Satellite.String(), req.Serial.String(),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListArchived returns orders that have been sent.
|
|
func (db *ordersdb) ListArchived(ctx context.Context, limit int) (_ []*orders.ArchivedInfo, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
rows, err := db.db.Query(`
|
|
SELECT order_limit_serialized, order_serialized, status, archived_at
|
|
FROM order_archive_
|
|
LIMIT ?
|
|
`, limit)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
|
|
|
var infos []*orders.ArchivedInfo
|
|
for rows.Next() {
|
|
var limitSerialized []byte
|
|
var orderSerialized []byte
|
|
|
|
var status int
|
|
var archivedAt time.Time
|
|
|
|
err := rows.Scan(&limitSerialized, &orderSerialized, &status, &archivedAt)
|
|
if err != nil {
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
|
|
var info orders.ArchivedInfo
|
|
info.Limit = &pb.OrderLimit{}
|
|
info.Order = &pb.Order{}
|
|
|
|
info.Status = orders.Status(status)
|
|
info.ArchivedAt = archivedAt
|
|
|
|
err = proto.Unmarshal(limitSerialized, info.Limit)
|
|
if err != nil {
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
|
|
err = proto.Unmarshal(orderSerialized, info.Order)
|
|
if err != nil {
|
|
return nil, ErrInfo.Wrap(err)
|
|
}
|
|
|
|
infos = append(infos, &info)
|
|
}
|
|
|
|
return infos, ErrInfo.Wrap(rows.Err())
|
|
}
|
|
|
|
// CleanArchive deletes all entries older than ttl
|
|
func (db *ordersdb) CleanArchive(ctx context.Context, ttl time.Duration) (_ int, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
deleteBefore := time.Now().UTC().Add(-1 * ttl)
|
|
result, err := db.db.Exec(`
|
|
DELETE FROM order_archive_
|
|
WHERE archived_at <= ?
|
|
`, deleteBefore)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return 0, nil
|
|
}
|
|
return 0, ErrInfo.Wrap(err)
|
|
}
|
|
count, err := result.RowsAffected()
|
|
if err != nil {
|
|
return 0, ErrInfo.Wrap(err)
|
|
}
|
|
return int(count), nil
|
|
}
|