2019-03-27 10:24:35 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package orders
|
|
|
|
|
|
|
|
import (
|
2020-01-15 21:45:17 +00:00
|
|
|
"bytes"
|
2019-03-27 10:24:35 +00:00
|
|
|
"context"
|
2020-07-14 14:04:38 +01:00
|
|
|
"errors"
|
2019-03-27 10:24:35 +00:00
|
|
|
"io"
|
2020-01-15 21:45:17 +00:00
|
|
|
"sort"
|
2019-03-27 10:24:35 +00:00
|
|
|
"time"
|
|
|
|
|
2020-10-09 14:40:12 +01:00
|
|
|
"github.com/spacemonkeygo/monkit/v3"
|
2019-03-27 10:24:35 +00:00
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2023-02-08 13:09:21 +00:00
|
|
|
"storj.io/common/context2"
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/identity"
|
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/rpc/rpcstatus"
|
|
|
|
"storj.io/common/signing"
|
|
|
|
"storj.io/common/storj"
|
2020-03-30 10:08:50 +01:00
|
|
|
"storj.io/common/uuid"
|
2020-06-11 19:31:45 +01:00
|
|
|
"storj.io/storj/private/date"
|
2021-04-21 13:42:57 +01:00
|
|
|
"storj.io/storj/satellite/metabase"
|
2020-07-15 18:08:24 +01:00
|
|
|
"storj.io/storj/satellite/nodeapiversion"
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
|
|
|
|
2020-12-05 16:01:42 +00:00
|
|
|
// DB implements saving order after receiving from storage node.
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Database
|
2019-03-27 10:24:35 +00:00
|
|
|
type DB interface {
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
|
2021-05-29 23:16:12 +01:00
|
|
|
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2021-11-23 22:50:42 +00:00
|
|
|
// UpdateBandwidthBatch updates bucket and project bandwidth rollups in the database
|
|
|
|
UpdateBandwidthBatch(ctx context.Context, rollups []BucketBandwidthRollup) error
|
2019-04-01 21:14:58 +01:00
|
|
|
|
|
|
|
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
|
2019-04-04 16:20:59 +01:00
|
|
|
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2020-06-11 19:31:45 +01:00
|
|
|
// UpdateStoragenodeBandwidthSettleWithWindow updates 'settled' bandwidth for given storage node
|
|
|
|
UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error)
|
2019-04-01 21:14:58 +01:00
|
|
|
|
|
|
|
// GetBucketBandwidth gets total bucket bandwidth from period of time
|
2019-06-25 16:58:42 +01:00
|
|
|
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
|
2019-04-01 21:14:58 +01:00
|
|
|
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
|
|
|
|
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
|
2020-01-10 18:53:42 +00:00
|
|
|
}
|
|
|
|
|
2023-01-12 12:16:36 +00:00
|
|
|
type noopDB struct {
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) UpdateBandwidthBatch(ctx context.Context, rollups []BucketBandwidthRollup) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error) {
|
|
|
|
return pb.SettlementWithWindowResponse_ACCEPTED, false, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error) {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (noopDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error) {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewNoopDB creates noop orders DB.
|
|
|
|
func NewNoopDB() DB {
|
|
|
|
return &noopDB{}
|
|
|
|
}
|
|
|
|
|
2020-11-17 16:00:56 +00:00
|
|
|
// SerialDeleteOptions are option when deleting from serial tables.
|
|
|
|
type SerialDeleteOptions struct {
|
|
|
|
BatchSize int
|
|
|
|
}
|
|
|
|
|
2020-03-02 18:47:22 +00:00
|
|
|
// ConsumedSerial is a serial that has been consumed and its bandwidth recorded.
|
2020-02-14 00:03:41 +00:00
|
|
|
type ConsumedSerial struct {
|
|
|
|
NodeID storj.NodeID
|
|
|
|
SerialNumber storj.SerialNumber
|
|
|
|
ExpiresAt time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
// PendingSerial is a serial number reported by a storagenode waiting to be
|
2020-07-16 15:18:02 +01:00
|
|
|
// settled.
|
2020-02-14 00:03:41 +00:00
|
|
|
type PendingSerial struct {
|
|
|
|
NodeID storj.NodeID
|
|
|
|
BucketID []byte
|
|
|
|
Action uint
|
|
|
|
SerialNumber storj.SerialNumber
|
|
|
|
ExpiresAt time.Time
|
|
|
|
Settled uint64
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
2020-08-11 15:50:01 +01:00
|
|
|
// Error the default orders errs class.
|
2021-04-28 09:06:17 +01:00
|
|
|
Error = errs.Class("orders")
|
2020-08-11 15:50:01 +01:00
|
|
|
// ErrUsingSerialNumber error class for serial number.
|
2019-04-04 15:42:01 +01:00
|
|
|
ErrUsingSerialNumber = errs.Class("serial number")
|
|
|
|
|
|
|
|
mon = monkit.Package()
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup.
|
2020-01-15 21:45:17 +00:00
|
|
|
type BucketBandwidthRollup struct {
|
2021-11-23 22:50:42 +00:00
|
|
|
ProjectID uuid.UUID
|
|
|
|
BucketName string
|
|
|
|
Action pb.PieceAction
|
|
|
|
IntervalStart time.Time
|
|
|
|
Inline int64
|
|
|
|
Allocated int64
|
|
|
|
Settled int64
|
|
|
|
Dead int64
|
2020-01-15 21:45:17 +00:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup.
|
2020-01-15 21:45:17 +00:00
|
|
|
type StoragenodeBandwidthRollup struct {
|
|
|
|
NodeID storj.NodeID
|
|
|
|
Action pb.PieceAction
|
|
|
|
Allocated int64
|
|
|
|
Settled int64
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// SortStoragenodeBandwidthRollups sorts the rollups.
|
2020-01-15 21:45:17 +00:00
|
|
|
func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup) {
|
|
|
|
sort.SliceStable(rollups, func(i, j int) bool {
|
|
|
|
nodeCompare := bytes.Compare(rollups[i].NodeID.Bytes(), rollups[j].NodeID.Bytes())
|
|
|
|
switch {
|
|
|
|
case nodeCompare == -1:
|
|
|
|
return true
|
|
|
|
case nodeCompare == 1:
|
|
|
|
return false
|
|
|
|
case rollups[i].Action < rollups[j].Action:
|
|
|
|
return true
|
|
|
|
case rollups[i].Action > rollups[j].Action:
|
|
|
|
return false
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-12-05 16:01:42 +00:00
|
|
|
// Endpoint for orders receiving.
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Endpoint
|
2019-03-27 10:24:35 +00:00
|
|
|
type Endpoint struct {
|
2021-03-29 09:58:04 +01:00
|
|
|
pb.DRPCOrdersUnimplementedServer
|
2021-01-22 13:51:29 +00:00
|
|
|
log *zap.Logger
|
|
|
|
satelliteSignee signing.Signee
|
|
|
|
DB DB
|
|
|
|
nodeAPIVersionDB nodeapiversion.DB
|
|
|
|
ordersSemaphore chan struct{}
|
|
|
|
ordersService *Service
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// NewEndpoint new orders receiving endpoint.
|
2020-10-09 21:22:57 +01:00
|
|
|
//
|
|
|
|
// ordersSemaphoreSize controls the number of concurrent clients allowed to submit orders at once.
|
|
|
|
// A value of zero means unlimited.
|
2020-12-05 16:01:42 +00:00
|
|
|
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB,
|
|
|
|
ordersSemaphoreSize int, ordersService *Service) *Endpoint {
|
2020-10-09 21:22:57 +01:00
|
|
|
var ordersSemaphore chan struct{}
|
|
|
|
if ordersSemaphoreSize > 0 {
|
|
|
|
ordersSemaphore = make(chan struct{}, ordersSemaphoreSize)
|
|
|
|
}
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
return &Endpoint{
|
2021-01-22 13:51:29 +00:00
|
|
|
log: log,
|
|
|
|
satelliteSignee: satelliteSignee,
|
|
|
|
DB: db,
|
|
|
|
nodeAPIVersionDB: nodeAPIVersionDB,
|
|
|
|
ordersSemaphore: ordersSemaphore,
|
|
|
|
ordersService: ordersService,
|
2020-10-09 21:22:57 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
type bucketIDAction struct {
|
|
|
|
bucketname string
|
|
|
|
projectID uuid.UUID
|
|
|
|
action pb.PieceAction
|
|
|
|
}
|
|
|
|
|
|
|
|
// SettlementWithWindow processes all orders that were created in a 1 hour window.
|
|
|
|
// Only one window is processed at a time.
|
|
|
|
// Batches are atomic, all orders are settled successfully or they all fail.
|
|
|
|
func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
|
2021-01-22 13:51:29 +00:00
|
|
|
return endpoint.SettlementWithWindowFinal(stream)
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
}
|
|
|
|
|
2020-10-20 19:54:17 +01:00
|
|
|
func trackFinalStatus(status pb.SettlementWithWindowResponse_Status) {
|
|
|
|
switch status {
|
|
|
|
case pb.SettlementWithWindowResponse_ACCEPTED:
|
|
|
|
mon.Event("settlement_response_accepted")
|
|
|
|
case pb.SettlementWithWindowResponse_REJECTED:
|
|
|
|
mon.Event("settlement_response_rejected")
|
|
|
|
default:
|
|
|
|
mon.Event("settlement_response_unknown")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
// SettlementWithWindowFinal processes all orders that were created in a 1 hour window.
|
|
|
|
// Only one window is processed at a time.
|
|
|
|
// Batches are atomic, all orders are settled successfully or they all fail.
|
|
|
|
func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
|
|
|
|
ctx := stream.Context()
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-10-20 19:54:17 +01:00
|
|
|
var alreadyProcessed bool
|
|
|
|
var status pb.SettlementWithWindowResponse_Status
|
|
|
|
defer trackFinalStatus(status)
|
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
|
|
if err != nil {
|
|
|
|
endpoint.log.Debug("err peer identity from context", zap.Error(err))
|
|
|
|
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
|
|
|
}
|
|
|
|
|
2021-06-22 20:16:09 +01:00
|
|
|
versionAtLeast, err := endpoint.nodeAPIVersionDB.VersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders)
|
2020-07-15 18:08:24 +01:00
|
|
|
if err != nil {
|
2021-06-22 20:16:09 +01:00
|
|
|
endpoint.log.Info("could not query if node version was new enough", zap.Error(err))
|
|
|
|
versionAtLeast = false
|
|
|
|
}
|
|
|
|
if !versionAtLeast {
|
|
|
|
err = endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders)
|
|
|
|
if err != nil {
|
|
|
|
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
|
|
|
}
|
2020-07-15 18:08:24 +01:00
|
|
|
}
|
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
log := endpoint.log.Named(peer.ID.String())
|
|
|
|
log.Debug("SettlementWithWindow")
|
|
|
|
|
2021-05-29 23:16:12 +01:00
|
|
|
type bandwidthAmount struct {
|
2023-01-30 09:15:26 +00:00
|
|
|
Settled int64
|
|
|
|
Dead int64
|
2021-05-29 23:16:12 +01:00
|
|
|
}
|
|
|
|
|
2020-11-04 17:24:11 +00:00
|
|
|
storagenodeSettled := map[int32]int64{}
|
2021-05-29 23:16:12 +01:00
|
|
|
bucketSettled := map[bucketIDAction]bandwidthAmount{}
|
2020-11-04 17:24:11 +00:00
|
|
|
seenSerials := map[storj.SerialNumber]struct{}{}
|
2020-06-11 19:31:45 +01:00
|
|
|
|
|
|
|
var window int64
|
|
|
|
var request *pb.SettlementRequest
|
|
|
|
var receivedCount int
|
|
|
|
for {
|
|
|
|
request, err = stream.Recv()
|
|
|
|
if err != nil {
|
2020-07-14 14:04:38 +01:00
|
|
|
if errors.Is(err, io.EOF) {
|
2020-06-11 19:31:45 +01:00
|
|
|
break
|
|
|
|
}
|
|
|
|
log.Debug("err streaming order request", zap.Error(err))
|
|
|
|
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
|
|
|
|
}
|
|
|
|
receivedCount++
|
|
|
|
|
|
|
|
orderLimit := request.Limit
|
|
|
|
if orderLimit == nil {
|
|
|
|
log.Debug("request.OrderLimit is nil")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if window == 0 {
|
|
|
|
window = date.TruncateToHourInNano(orderLimit.OrderCreation)
|
|
|
|
}
|
|
|
|
order := request.Order
|
|
|
|
if order == nil {
|
|
|
|
log.Debug("request.Order is nil")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
serialNum := order.SerialNumber
|
|
|
|
|
|
|
|
// don't process orders that aren't valid
|
|
|
|
if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// don't process orders with serial numbers we've already seen
|
|
|
|
if _, ok := seenSerials[serialNum]; ok {
|
|
|
|
log.Debug("seen serial", zap.String("serial number", serialNum.String()))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
seenSerials[serialNum] = struct{}{}
|
|
|
|
|
|
|
|
storagenodeSettled[int32(orderLimit.Action)] += order.Amount
|
|
|
|
|
2023-02-01 10:03:39 +00:00
|
|
|
// user can do only two actions which are important for bucket bandwidth usage
|
|
|
|
userAction := orderLimit.Action == pb.PieceAction_PUT || orderLimit.Action == pb.PieceAction_GET
|
|
|
|
|
|
|
|
// don't store anything else than user actions in bucket_bandwidth_rollups table. amounts for other
|
|
|
|
// actions will be stored in storagenode_bandwidth_rollups.
|
|
|
|
if !userAction {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-12-17 14:46:46 +00:00
|
|
|
metadata, err := endpoint.ordersService.DecryptOrderMetadata(ctx, orderLimit)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("decrypt order metadata err:", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_1")
|
|
|
|
continue
|
2020-06-11 19:31:45 +01:00
|
|
|
}
|
2021-01-08 16:04:46 +00:00
|
|
|
|
|
|
|
var bucketInfo metabase.BucketLocation
|
|
|
|
switch {
|
|
|
|
case len(metadata.CompactProjectBucketPrefix) > 0:
|
|
|
|
bucketInfo, err = metabase.ParseCompactBucketPrefix(metadata.GetCompactProjectBucketPrefix())
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("decrypt order: ParseCompactBucketPrefix", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_compact")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
case len(metadata.ProjectBucketPrefix) > 0:
|
|
|
|
bucketInfo, err = metabase.ParseBucketPrefix(metabase.BucketPrefix(metadata.GetProjectBucketPrefix()))
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("decrypt order: ParseBucketPrefix", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_uncompact")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
log.Debug("decrypt order: project bucket prefix missing", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_default")
|
2020-12-17 14:46:46 +00:00
|
|
|
continue
|
|
|
|
}
|
2021-01-08 16:04:46 +00:00
|
|
|
|
2021-12-10 10:10:00 +00:00
|
|
|
// log error only for orders created by users, for satellite actions order limits are created
|
|
|
|
// without bucket name and project ID because segments loop doesn't have access to it
|
2023-02-01 10:03:39 +00:00
|
|
|
if bucketInfo.BucketName == "" || bucketInfo.ProjectID.IsZero() {
|
2022-01-06 15:31:26 +00:00
|
|
|
log.Warn("decrypt order: bucketName or projectID not set",
|
2020-12-17 14:46:46 +00:00
|
|
|
zap.String("bucketName", bucketInfo.BucketName),
|
|
|
|
zap.String("projectID", bucketInfo.ProjectID.String()),
|
|
|
|
)
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_3")
|
|
|
|
continue
|
2020-06-11 19:31:45 +01:00
|
|
|
}
|
2020-11-18 21:39:13 +00:00
|
|
|
|
2021-05-29 23:16:12 +01:00
|
|
|
currentBucketIDAction := bucketIDAction{
|
2020-12-17 14:46:46 +00:00
|
|
|
bucketname: bucketInfo.BucketName,
|
|
|
|
projectID: bucketInfo.ProjectID,
|
2020-06-11 19:31:45 +01:00
|
|
|
action: orderLimit.Action,
|
2021-05-29 23:16:12 +01:00
|
|
|
}
|
|
|
|
bucketSettled[currentBucketIDAction] = bandwidthAmount{
|
2023-01-30 09:15:26 +00:00
|
|
|
Settled: bucketSettled[currentBucketIDAction].Settled + order.Amount,
|
|
|
|
Dead: bucketSettled[currentBucketIDAction].Dead + orderLimit.Limit - order.Amount,
|
|
|
|
// we are not collecting Allocated bandwidth as it won't be stored with UpdateBucketBandwidthSettle
|
2021-05-29 23:16:12 +01:00
|
|
|
}
|
2020-06-11 19:31:45 +01:00
|
|
|
}
|
2020-11-18 21:39:13 +00:00
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
if len(storagenodeSettled) == 0 {
|
|
|
|
log.Debug("no orders were successfully processed", zap.Int("received count", receivedCount))
|
2020-10-20 19:54:17 +01:00
|
|
|
status = pb.SettlementWithWindowResponse_REJECTED
|
2020-06-11 19:31:45 +01:00
|
|
|
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
|
2020-10-20 19:54:17 +01:00
|
|
|
Status: status,
|
2020-06-11 19:31:45 +01:00
|
|
|
ActionSettled: storagenodeSettled,
|
|
|
|
})
|
|
|
|
}
|
2020-10-20 19:54:17 +01:00
|
|
|
status, alreadyProcessed, err = endpoint.DB.UpdateStoragenodeBandwidthSettleWithWindow(
|
2020-06-11 19:31:45 +01:00
|
|
|
ctx, peer.ID, storagenodeSettled, time.Unix(0, window),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("err updating storagenode bandwidth settle", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
log.Debug("orders processed",
|
|
|
|
zap.Int("total orders received", receivedCount),
|
|
|
|
zap.Time("window", time.Unix(0, window)),
|
|
|
|
zap.String("status", status.String()),
|
|
|
|
)
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed {
|
2023-02-08 13:09:21 +00:00
|
|
|
// we would like to update bandwidth even if context was canceled because
|
|
|
|
// underlying implementation is flushing cache using this context in separate
|
|
|
|
// goroutine so it can be executed after this stream will be closed
|
|
|
|
ctx := context2.WithoutCancellation(ctx)
|
2021-05-29 23:16:12 +01:00
|
|
|
for bucketIDAction, bwAmount := range bucketSettled {
|
2020-06-11 19:31:45 +01:00
|
|
|
err = endpoint.DB.UpdateBucketBandwidthSettle(ctx,
|
2021-05-29 23:16:12 +01:00
|
|
|
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, bwAmount.Settled, bwAmount.Dead, time.Unix(0, window),
|
2020-06-11 19:31:45 +01:00
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Info("err updating bucket bandwidth settle", zap.Error(err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
mon.Event("orders_already_processed")
|
|
|
|
}
|
|
|
|
|
|
|
|
if status == pb.SettlementWithWindowResponse_REJECTED {
|
|
|
|
storagenodeSettled = map[int32]int64{}
|
|
|
|
}
|
|
|
|
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
|
|
|
|
Status: status,
|
|
|
|
ActionSettled: storagenodeSettled,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-12-05 16:01:42 +00:00
|
|
|
func (endpoint *Endpoint) isValid(ctx context.Context, log *zap.Logger, order *pb.Order,
|
|
|
|
orderLimit *pb.OrderLimit, peerID storj.NodeID, window int64) bool {
|
2020-06-11 19:31:45 +01:00
|
|
|
if orderLimit.StorageNodeId != peerID {
|
|
|
|
log.Debug("storage node id mismatch")
|
|
|
|
mon.Event("order_not_valid_storagenodeid")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// check expiration first before the signatures so that we can throw out the large amount
|
|
|
|
// of expired orders being sent to us before doing expensive signature verification.
|
|
|
|
if orderLimit.OrderExpiration.Before(time.Now().UTC()) {
|
|
|
|
log.Debug("invalid settlement: order limit expired")
|
|
|
|
mon.Event("order_not_valid_expired")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// satellite verifies that it signed the order limit
|
|
|
|
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
|
|
|
|
log.Debug("invalid settlement: unable to verify order limit")
|
|
|
|
mon.Event("order_not_valid_satellite_signature")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// satellite verifies that the order signature matches pub key in order limit
|
|
|
|
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
|
|
|
|
log.Debug("invalid settlement: unable to verify order")
|
|
|
|
mon.Event("order_not_valid_uplink_signature")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if orderLimit.SerialNumber != order.SerialNumber {
|
|
|
|
log.Debug("invalid settlement: invalid serial number")
|
|
|
|
mon.Event("order_not_valid_serialnum_mismatch")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// verify the 1 hr windows match
|
|
|
|
if window != date.TruncateToHourInNano(orderLimit.OrderCreation) {
|
|
|
|
log.Debug("invalid settlement: window mismatch")
|
|
|
|
mon.Event("order_not_valid_window_mismatch")
|
|
|
|
return false
|
|
|
|
}
|
2021-05-29 23:16:12 +01:00
|
|
|
if orderLimit.Limit < order.Amount {
|
|
|
|
log.Debug("invalid settlement: amounts mismatch")
|
|
|
|
mon.Event("order_not_valid_amounts_mismatch")
|
|
|
|
return false
|
|
|
|
}
|
2020-06-11 19:31:45 +01:00
|
|
|
return true
|
|
|
|
}
|