2019-03-27 10:24:35 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package orders
|
|
|
|
|
|
|
|
import (
|
2020-01-15 21:45:17 +00:00
|
|
|
"bytes"
|
2019-03-27 10:24:35 +00:00
|
|
|
"context"
|
2020-07-14 14:04:38 +01:00
|
|
|
"errors"
|
2019-03-27 10:24:35 +00:00
|
|
|
"io"
|
2020-01-15 21:45:17 +00:00
|
|
|
"sort"
|
2019-03-27 10:24:35 +00:00
|
|
|
"time"
|
|
|
|
|
2020-10-09 14:40:12 +01:00
|
|
|
"github.com/spacemonkeygo/monkit/v3"
|
2019-03-27 10:24:35 +00:00
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/identity"
|
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/rpc/rpcstatus"
|
|
|
|
"storj.io/common/signing"
|
|
|
|
"storj.io/common/storj"
|
2020-03-30 10:08:50 +01:00
|
|
|
"storj.io/common/uuid"
|
2020-06-11 19:31:45 +01:00
|
|
|
"storj.io/storj/private/date"
|
2020-08-28 12:56:09 +01:00
|
|
|
"storj.io/storj/satellite/metainfo/metabase"
|
2020-07-15 18:08:24 +01:00
|
|
|
"storj.io/storj/satellite/nodeapiversion"
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
|
|
|
|
2020-12-05 16:01:42 +00:00
|
|
|
// DB implements saving order after receiving from storage node.
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Database
|
2019-03-27 10:24:35 +00:00
|
|
|
type DB interface {
|
2020-02-18 12:03:23 +00:00
|
|
|
// CreateSerialInfo creates serial number entry in database.
|
2019-03-28 20:09:23 +00:00
|
|
|
CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
|
2020-02-18 12:03:23 +00:00
|
|
|
// UseSerialNumber creates a used serial number entry in database from an
|
|
|
|
// existing serial number.
|
|
|
|
// It returns the bucket ID associated to serialNumber.
|
2019-04-01 21:14:58 +01:00
|
|
|
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
|
|
|
|
// UnuseSerialNumber removes pair serial number -> storage node id from database
|
|
|
|
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
|
2020-06-11 19:31:45 +01:00
|
|
|
// GetBucketIDFromSerialNumber returns the bucket ID associated with the serial number
|
|
|
|
GetBucketIDFromSerialNumber(ctx context.Context, serialNumber storj.SerialNumber) ([]byte, error)
|
2019-04-01 21:14:58 +01:00
|
|
|
|
|
|
|
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
|
2019-06-25 16:58:42 +01:00
|
|
|
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2019-04-01 21:14:58 +01:00
|
|
|
|
|
|
|
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
|
2019-04-04 16:20:59 +01:00
|
|
|
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
2020-06-11 19:31:45 +01:00
|
|
|
// UpdateStoragenodeBandwidthSettleWithWindow updates 'settled' bandwidth for given storage node
|
|
|
|
UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error)
|
2019-04-01 21:14:58 +01:00
|
|
|
|
|
|
|
// GetBucketBandwidth gets total bucket bandwidth from period of time
|
2019-06-25 16:58:42 +01:00
|
|
|
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
|
2019-04-01 21:14:58 +01:00
|
|
|
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
|
|
|
|
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
|
2019-08-15 20:05:43 +01:00
|
|
|
|
|
|
|
// ProcessOrders takes a list of order requests and processes them in a batch
|
2020-02-14 00:03:41 +00:00
|
|
|
ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error)
|
2020-01-15 21:45:17 +00:00
|
|
|
|
2020-01-16 18:02:15 +00:00
|
|
|
// WithTransaction runs the callback and provides it with a Transaction.
|
|
|
|
WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error
|
2020-03-02 18:47:22 +00:00
|
|
|
// WithQueue runs the callback and provides it with a Queue. When the callback returns with
|
|
|
|
// no error, any pending serials returned by the queue are removed from it.
|
2020-02-14 00:03:41 +00:00
|
|
|
WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error
|
2020-01-10 18:53:42 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 16:00:56 +00:00
|
|
|
// SerialDeleteOptions are option when deleting from serial tables.
|
|
|
|
type SerialDeleteOptions struct {
|
|
|
|
BatchSize int
|
|
|
|
}
|
|
|
|
|
2020-01-15 21:45:17 +00:00
|
|
|
// Transaction represents a database transaction but with higher level actions.
|
|
|
|
type Transaction interface {
|
|
|
|
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
|
|
|
|
UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error
|
2020-02-14 00:03:41 +00:00
|
|
|
|
2020-11-12 19:01:55 +00:00
|
|
|
// UpdateStoragenodeBandwidthBatchPhase2 updates all the bandwidth rollups in the database
|
|
|
|
UpdateStoragenodeBandwidthBatchPhase2(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error
|
2020-02-14 00:03:41 +00:00
|
|
|
|
2020-03-02 18:47:22 +00:00
|
|
|
// CreateConsumedSerialsBatch creates the batch of ConsumedSerials.
|
2020-02-14 00:03:41 +00:00
|
|
|
CreateConsumedSerialsBatch(ctx context.Context, consumedSerials []ConsumedSerial) (err error)
|
|
|
|
|
2020-03-02 18:47:22 +00:00
|
|
|
// HasConsumedSerial returns true if the node and serial number have been consumed.
|
2020-02-14 00:03:41 +00:00
|
|
|
HasConsumedSerial(ctx context.Context, nodeID storj.NodeID, serialNumber storj.SerialNumber) (bool, error)
|
|
|
|
}
|
|
|
|
|
2020-03-02 18:47:22 +00:00
|
|
|
// Queue is an abstraction around a queue of pending serials.
|
2020-02-14 00:03:41 +00:00
|
|
|
type Queue interface {
|
2020-03-02 18:47:22 +00:00
|
|
|
// GetPendingSerialsBatch returns a batch of pending serials containing at most size
|
|
|
|
// entries. It returns a boolean indicating true if the queue is empty.
|
|
|
|
GetPendingSerialsBatch(ctx context.Context, size int) ([]PendingSerial, bool, error)
|
2020-02-14 00:03:41 +00:00
|
|
|
}
|
|
|
|
|
2020-03-02 18:47:22 +00:00
|
|
|
// ConsumedSerial is a serial that has been consumed and its bandwidth recorded.
|
2020-02-14 00:03:41 +00:00
|
|
|
type ConsumedSerial struct {
|
|
|
|
NodeID storj.NodeID
|
|
|
|
SerialNumber storj.SerialNumber
|
|
|
|
ExpiresAt time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
// PendingSerial is a serial number reported by a storagenode waiting to be
|
2020-07-16 15:18:02 +01:00
|
|
|
// settled.
|
2020-02-14 00:03:41 +00:00
|
|
|
type PendingSerial struct {
|
|
|
|
NodeID storj.NodeID
|
|
|
|
BucketID []byte
|
|
|
|
Action uint
|
|
|
|
SerialNumber storj.SerialNumber
|
|
|
|
ExpiresAt time.Time
|
|
|
|
Settled uint64
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
2020-08-11 15:50:01 +01:00
|
|
|
// Error the default orders errs class.
|
2019-03-27 10:24:35 +00:00
|
|
|
Error = errs.Class("orders error")
|
2020-08-11 15:50:01 +01:00
|
|
|
// ErrUsingSerialNumber error class for serial number.
|
2019-04-04 15:42:01 +01:00
|
|
|
ErrUsingSerialNumber = errs.Class("serial number")
|
|
|
|
|
2020-03-27 03:21:35 +00:00
|
|
|
errExpiredOrder = errs.Class("order limit expired")
|
|
|
|
|
2019-04-04 15:42:01 +01:00
|
|
|
mon = monkit.Package()
|
2019-03-27 10:24:35 +00:00
|
|
|
)
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup.
|
2020-01-15 21:45:17 +00:00
|
|
|
type BucketBandwidthRollup struct {
|
|
|
|
ProjectID uuid.UUID
|
|
|
|
BucketName string
|
|
|
|
Action pb.PieceAction
|
|
|
|
Inline int64
|
|
|
|
Allocated int64
|
|
|
|
Settled int64
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// SortBucketBandwidthRollups sorts the rollups.
|
2020-01-15 21:45:17 +00:00
|
|
|
func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup) {
|
|
|
|
sort.SliceStable(rollups, func(i, j int) bool {
|
|
|
|
uuidCompare := bytes.Compare(rollups[i].ProjectID[:], rollups[j].ProjectID[:])
|
|
|
|
switch {
|
|
|
|
case uuidCompare == -1:
|
|
|
|
return true
|
|
|
|
case uuidCompare == 1:
|
|
|
|
return false
|
|
|
|
case rollups[i].BucketName < rollups[j].BucketName:
|
|
|
|
return true
|
|
|
|
case rollups[i].BucketName > rollups[j].BucketName:
|
|
|
|
return false
|
|
|
|
case rollups[i].Action < rollups[j].Action:
|
|
|
|
return true
|
|
|
|
case rollups[i].Action > rollups[j].Action:
|
|
|
|
return false
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup.
|
2020-01-15 21:45:17 +00:00
|
|
|
type StoragenodeBandwidthRollup struct {
|
|
|
|
NodeID storj.NodeID
|
|
|
|
Action pb.PieceAction
|
|
|
|
Allocated int64
|
|
|
|
Settled int64
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// SortStoragenodeBandwidthRollups sorts the rollups.
|
2020-01-15 21:45:17 +00:00
|
|
|
func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup) {
|
|
|
|
sort.SliceStable(rollups, func(i, j int) bool {
|
|
|
|
nodeCompare := bytes.Compare(rollups[i].NodeID.Bytes(), rollups[j].NodeID.Bytes())
|
|
|
|
switch {
|
|
|
|
case nodeCompare == -1:
|
|
|
|
return true
|
|
|
|
case nodeCompare == 1:
|
|
|
|
return false
|
|
|
|
case rollups[i].Action < rollups[j].Action:
|
|
|
|
return true
|
|
|
|
case rollups[i].Action > rollups[j].Action:
|
|
|
|
return false
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// ProcessOrderRequest for batch order processing.
|
2019-08-15 20:05:43 +01:00
|
|
|
type ProcessOrderRequest struct {
|
|
|
|
Order *pb.Order
|
|
|
|
OrderLimit *pb.OrderLimit
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// ProcessOrderResponse for batch order processing responses.
|
2019-08-19 14:36:11 +01:00
|
|
|
type ProcessOrderResponse struct {
|
|
|
|
SerialNumber storj.SerialNumber
|
|
|
|
Status pb.SettlementResponse_Status
|
|
|
|
}
|
|
|
|
|
2020-12-05 16:01:42 +00:00
|
|
|
// Endpoint for orders receiving.
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Endpoint
|
2019-03-27 10:24:35 +00:00
|
|
|
type Endpoint struct {
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
log *zap.Logger
|
|
|
|
satelliteSignee signing.Signee
|
|
|
|
DB DB
|
|
|
|
nodeAPIVersionDB nodeapiversion.DB
|
|
|
|
settlementBatchSize int
|
|
|
|
windowEndpointRolloutPhase WindowEndpointRolloutPhase
|
2020-10-09 21:22:57 +01:00
|
|
|
ordersSemaphore chan struct{}
|
2020-11-18 21:39:13 +00:00
|
|
|
ordersService *Service
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// NewEndpoint new orders receiving endpoint.
|
2020-10-09 21:22:57 +01:00
|
|
|
//
|
|
|
|
// ordersSemaphoreSize controls the number of concurrent clients allowed to submit orders at once.
|
|
|
|
// A value of zero means unlimited.
|
2020-12-05 16:01:42 +00:00
|
|
|
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB,
|
|
|
|
settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase,
|
|
|
|
ordersSemaphoreSize int, ordersService *Service) *Endpoint {
|
|
|
|
|
2020-10-09 21:22:57 +01:00
|
|
|
var ordersSemaphore chan struct{}
|
|
|
|
if ordersSemaphoreSize > 0 {
|
|
|
|
ordersSemaphore = make(chan struct{}, ordersSemaphoreSize)
|
|
|
|
}
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
return &Endpoint{
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
log: log,
|
|
|
|
satelliteSignee: satelliteSignee,
|
|
|
|
DB: db,
|
|
|
|
nodeAPIVersionDB: nodeAPIVersionDB,
|
|
|
|
settlementBatchSize: settlementBatchSize,
|
|
|
|
windowEndpointRolloutPhase: windowEndpointRolloutPhase,
|
2020-10-09 21:22:57 +01:00
|
|
|
ordersSemaphore: ordersSemaphore,
|
2020-11-18 21:39:13 +00:00
|
|
|
ordersService: ordersService,
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-11 06:20:34 +01:00
|
|
|
func monitoredSettlementStreamReceive(ctx context.Context, stream pb.DRPCOrders_SettlementStream) (_ *pb.SettlementRequest, err error) {
|
2019-06-05 16:43:41 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
return stream.Recv()
|
|
|
|
}
|
|
|
|
|
2020-05-11 06:20:34 +01:00
|
|
|
func monitoredSettlementStreamSend(ctx context.Context, stream pb.DRPCOrders_SettlementStream, resp *pb.SettlementResponse) (err error) {
|
2019-06-05 16:43:41 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
switch resp.Status {
|
|
|
|
case pb.SettlementResponse_ACCEPTED:
|
|
|
|
mon.Event("settlement_response_accepted")
|
|
|
|
case pb.SettlementResponse_REJECTED:
|
|
|
|
mon.Event("settlement_response_rejected")
|
|
|
|
default:
|
|
|
|
mon.Event("settlement_response_unknown")
|
|
|
|
}
|
|
|
|
return stream.Send(resp)
|
|
|
|
}
|
|
|
|
|
2020-10-10 20:40:48 +01:00
|
|
|
// withOrdersSemaphore acquires a slot with the ordersSemaphore if one exists and returns
|
2020-10-09 21:22:57 +01:00
|
|
|
// a function to exit it. If the context expires, it returns an error.
|
2020-10-10 20:40:48 +01:00
|
|
|
func (endpoint *Endpoint) withOrdersSemaphore(ctx context.Context, cb func(ctx context.Context) error) error {
|
2020-10-09 21:22:57 +01:00
|
|
|
if endpoint.ordersSemaphore == nil {
|
2020-10-10 20:40:48 +01:00
|
|
|
return cb(ctx)
|
2020-10-09 21:22:57 +01:00
|
|
|
}
|
|
|
|
select {
|
|
|
|
case endpoint.ordersSemaphore <- struct{}{}:
|
2020-10-10 20:40:48 +01:00
|
|
|
err := cb(ctx)
|
|
|
|
<-endpoint.ordersSemaphore
|
|
|
|
return err
|
2020-10-09 21:22:57 +01:00
|
|
|
case <-ctx.Done():
|
2020-10-10 20:40:48 +01:00
|
|
|
return ctx.Err()
|
2020-10-09 21:22:57 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-11 06:20:34 +01:00
|
|
|
// Settlement receives orders and handles them in batches.
|
|
|
|
func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) {
|
2019-03-27 10:24:35 +00:00
|
|
|
ctx := stream.Context()
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
switch endpoint.windowEndpointRolloutPhase {
|
|
|
|
case WindowEndpointRolloutPhase1:
|
|
|
|
case WindowEndpointRolloutPhase2, WindowEndpointRolloutPhase3:
|
|
|
|
return rpcstatus.Error(rpcstatus.Unavailable, "endpoint disabled")
|
|
|
|
default:
|
|
|
|
return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase")
|
|
|
|
}
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
|
|
if err != nil {
|
2019-09-19 05:46:39 +01:00
|
|
|
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
formatError := func(err error) error {
|
2020-07-14 14:04:38 +01:00
|
|
|
if errors.Is(err, io.EOF) {
|
2019-03-27 10:24:35 +00:00
|
|
|
return nil
|
|
|
|
}
|
2019-09-19 05:46:39 +01:00
|
|
|
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2019-05-23 23:47:05 +01:00
|
|
|
log := endpoint.log.Named(peer.ID.String())
|
|
|
|
log.Debug("Settlement")
|
2019-08-15 20:05:43 +01:00
|
|
|
|
|
|
|
requests := make([]*ProcessOrderRequest, 0, endpoint.settlementBatchSize)
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if len(requests) > 0 {
|
|
|
|
err = errs.Combine(err, endpoint.processOrders(ctx, stream, requests))
|
|
|
|
if err != nil {
|
|
|
|
err = formatError(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2020-03-27 03:21:35 +00:00
|
|
|
var expirationCount int64
|
|
|
|
defer func() {
|
|
|
|
if expirationCount > 0 {
|
|
|
|
log.Debug("order verification found expired orders", zap.Int64("amount", expirationCount))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2019-03-27 10:24:35 +00:00
|
|
|
for {
|
2019-06-05 16:43:41 +01:00
|
|
|
request, err := monitoredSettlementStreamReceive(ctx, stream)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return formatError(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if request == nil {
|
2019-09-19 05:46:39 +01:00
|
|
|
return rpcstatus.Error(rpcstatus.InvalidArgument, "request missing")
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
if request.Limit == nil {
|
2019-09-19 05:46:39 +01:00
|
|
|
return rpcstatus.Error(rpcstatus.InvalidArgument, "order limit missing")
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
if request.Order == nil {
|
2019-09-19 05:46:39 +01:00
|
|
|
return rpcstatus.Error(rpcstatus.InvalidArgument, "order missing")
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
orderLimit := request.Limit
|
|
|
|
order := request.Order
|
|
|
|
|
2019-06-21 12:44:14 +01:00
|
|
|
rejectErr := func() error {
|
2020-10-15 22:09:17 +01:00
|
|
|
if orderLimit.StorageNodeId != peer.ID {
|
|
|
|
return rpcstatus.Error(rpcstatus.Unauthenticated, "only specified storage node can settle order")
|
|
|
|
}
|
|
|
|
|
2020-04-02 19:35:05 +01:00
|
|
|
// check expiration first before the signatures so that we can throw out the large
|
|
|
|
// amount of expired orders being sent to us before doing expensive signature
|
|
|
|
// verification.
|
|
|
|
if orderLimit.OrderExpiration.Before(time.Now()) {
|
|
|
|
mon.Event("order_verification_failed_expired")
|
|
|
|
expirationCount++
|
|
|
|
return errExpiredOrder.New("order limit expired")
|
|
|
|
}
|
|
|
|
|
2019-08-12 15:41:34 +01:00
|
|
|
// satellite verifies that it signed the order limit
|
2019-06-05 14:47:01 +01:00
|
|
|
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
|
2020-03-27 03:21:35 +00:00
|
|
|
mon.Event("order_verification_failed_satellite_signature")
|
2019-03-27 10:24:35 +00:00
|
|
|
return Error.New("unable to verify order limit")
|
|
|
|
}
|
|
|
|
|
2019-08-12 15:41:34 +01:00
|
|
|
// satellite verifies that the order signature matches pub key in order limit
|
|
|
|
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
|
2020-03-27 03:21:35 +00:00
|
|
|
mon.Event("order_verification_failed_uplink_signature")
|
2019-08-12 15:41:34 +01:00
|
|
|
return Error.New("unable to verify order")
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// TODO should this reject or just error ??
|
|
|
|
if orderLimit.SerialNumber != order.SerialNumber {
|
2020-03-27 03:21:35 +00:00
|
|
|
mon.Event("order_verification_failed_serial_mismatch")
|
2019-03-27 10:24:35 +00:00
|
|
|
return Error.New("invalid serial number")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}()
|
2019-08-16 15:53:22 +01:00
|
|
|
if rejectErr != nil {
|
2020-03-27 03:21:35 +00:00
|
|
|
mon.Event("order_verification_failed")
|
|
|
|
if !errExpiredOrder.Has(rejectErr) {
|
|
|
|
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(rejectErr))
|
|
|
|
}
|
2019-06-05 16:43:41 +01:00
|
|
|
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
|
2019-03-27 10:24:35 +00:00
|
|
|
SerialNumber: orderLimit.SerialNumber,
|
|
|
|
Status: pb.SettlementResponse_REJECTED,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return formatError(err)
|
|
|
|
}
|
2019-06-12 16:00:29 +01:00
|
|
|
continue
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
requests = append(requests, &ProcessOrderRequest{Order: order, OrderLimit: orderLimit})
|
2019-04-01 21:14:58 +01:00
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
if len(requests) >= endpoint.settlementBatchSize {
|
|
|
|
err = endpoint.processOrders(ctx, stream, requests)
|
|
|
|
requests = requests[:0]
|
|
|
|
if err != nil {
|
|
|
|
return formatError(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
}
|
2019-08-15 20:05:43 +01:00
|
|
|
}
|
|
|
|
}
|
2019-04-01 21:14:58 +01:00
|
|
|
|
2020-05-11 06:20:34 +01:00
|
|
|
func (endpoint *Endpoint) processOrders(ctx context.Context, stream pb.DRPCOrders_SettlementStream, requests []*ProcessOrderRequest) (err error) {
|
2019-08-15 20:05:43 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-10-10 20:40:48 +01:00
|
|
|
var responses []*ProcessOrderResponse
|
|
|
|
err = endpoint.withOrdersSemaphore(ctx, func(ctx context.Context) error {
|
|
|
|
responses, err = endpoint.DB.ProcessOrders(ctx, requests)
|
|
|
|
return err
|
|
|
|
})
|
2019-08-15 20:05:43 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-03-27 10:24:35 +00:00
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
for _, response := range responses {
|
2019-08-19 14:36:11 +01:00
|
|
|
r := &pb.SettlementResponse{
|
|
|
|
SerialNumber: response.SerialNumber,
|
|
|
|
Status: response.Status,
|
|
|
|
}
|
|
|
|
err = monitoredSettlementStreamSend(ctx, stream, r)
|
2019-03-27 10:24:35 +00:00
|
|
|
if err != nil {
|
2019-08-15 20:05:43 +01:00
|
|
|
return err
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
|
|
|
}
|
2020-06-11 19:31:45 +01:00
|
|
|
|
2019-08-15 20:05:43 +01:00
|
|
|
return nil
|
2019-03-27 10:24:35 +00:00
|
|
|
}
|
2020-06-11 19:31:45 +01:00
|
|
|
|
|
|
|
type bucketIDAction struct {
|
|
|
|
bucketname string
|
|
|
|
projectID uuid.UUID
|
|
|
|
action pb.PieceAction
|
|
|
|
}
|
|
|
|
|
|
|
|
// SettlementWithWindow processes all orders that were created in a 1 hour window.
|
|
|
|
// Only one window is processed at a time.
|
|
|
|
// Batches are atomic, all orders are settled successfully or they all fail.
|
|
|
|
func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
switch endpoint.windowEndpointRolloutPhase {
|
|
|
|
case WindowEndpointRolloutPhase1, WindowEndpointRolloutPhase2:
|
|
|
|
return endpoint.SettlementWithWindowMigration(stream)
|
|
|
|
case WindowEndpointRolloutPhase3:
|
|
|
|
return endpoint.SettlementWithWindowFinal(stream)
|
|
|
|
default:
|
|
|
|
return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// SettlementWithWindowMigration implements phase 1 and phase 2 of the windowed order rollout where
|
|
|
|
// it uses the same backend as the non-windowed settlement and inserts entries containing 0 for
|
|
|
|
// the window which ensures that it is either entirely handled by the queue or entirely handled by
|
|
|
|
// the phase 3 endpoint.
|
|
|
|
func (endpoint *Endpoint) SettlementWithWindowMigration(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
|
2020-06-11 19:31:45 +01:00
|
|
|
ctx := stream.Context()
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
|
|
if err != nil {
|
|
|
|
endpoint.log.Debug("err peer identity from context", zap.Error(err))
|
|
|
|
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
|
|
|
}
|
|
|
|
|
2020-10-10 20:40:48 +01:00
|
|
|
// update the node api version inside of the semaphore
|
|
|
|
err = endpoint.withOrdersSemaphore(ctx, func(ctx context.Context) error {
|
|
|
|
return endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders)
|
|
|
|
})
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
if err != nil {
|
|
|
|
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
2020-07-14 15:10:54 +01:00
|
|
|
}
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
log := endpoint.log.Named(peer.ID.String())
|
|
|
|
log.Debug("SettlementWithWindow")
|
|
|
|
|
|
|
|
var receivedCount int
|
|
|
|
var window int64
|
2020-11-04 17:24:11 +00:00
|
|
|
actions := map[pb.PieceAction]struct{}{}
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
var requests []*ProcessOrderRequest
|
|
|
|
var finished bool
|
|
|
|
|
|
|
|
for !finished {
|
|
|
|
requests = requests[:0]
|
|
|
|
|
|
|
|
for len(requests) < endpoint.settlementBatchSize {
|
|
|
|
request, err := stream.Recv()
|
|
|
|
if err != nil {
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
|
|
finished = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
log.Debug("err streaming order request", zap.Error(err))
|
|
|
|
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
|
|
|
|
}
|
|
|
|
receivedCount++
|
|
|
|
|
|
|
|
orderLimit := request.Limit
|
|
|
|
if orderLimit == nil {
|
|
|
|
log.Debug("request.OrderLimit is nil")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
order := request.Order
|
|
|
|
if order == nil {
|
|
|
|
log.Debug("request.Order is nil")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if window == 0 {
|
|
|
|
window = date.TruncateToHourInNano(orderLimit.OrderCreation)
|
|
|
|
}
|
|
|
|
|
|
|
|
// don't process orders that aren't valid
|
|
|
|
if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
actions[orderLimit.Action] = struct{}{}
|
|
|
|
|
|
|
|
requests = append(requests, &ProcessOrderRequest{
|
|
|
|
Order: order,
|
|
|
|
OrderLimit: orderLimit,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-10-10 20:40:48 +01:00
|
|
|
// process all of the orders in the old way inside of the semaphore
|
|
|
|
err := endpoint.withOrdersSemaphore(ctx, func(ctx context.Context) error {
|
|
|
|
_, err = endpoint.DB.ProcessOrders(ctx, requests)
|
|
|
|
return err
|
|
|
|
})
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
if err != nil {
|
|
|
|
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// if we received no valid orders, then respond with rejected
|
|
|
|
if len(actions) == 0 || window == 0 {
|
|
|
|
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
|
|
|
|
Status: pb.SettlementWithWindowResponse_REJECTED,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// insert zero rows for every action involved in the set of orders. this prevents
|
|
|
|
// many problems (double spends and underspends) by ensuring that any window is
|
|
|
|
// either handled entirely by the queue or entirely with the phase 3 windowed endpoint.
|
2020-10-10 20:40:48 +01:00
|
|
|
// enter the semaphore for the duration of the updates.
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
windowTime := time.Unix(0, window)
|
2020-10-10 20:40:48 +01:00
|
|
|
err = endpoint.withOrdersSemaphore(ctx, func(ctx context.Context) error {
|
|
|
|
for action := range actions {
|
|
|
|
if err := endpoint.DB.UpdateStoragenodeBandwidthSettle(ctx, peer.ID, action, 0, windowTime); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
}
|
2020-10-10 20:40:48 +01:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("orders processed",
|
|
|
|
zap.Int("total orders received", receivedCount),
|
|
|
|
zap.Time("window", windowTime),
|
|
|
|
)
|
|
|
|
|
|
|
|
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
|
|
|
|
Status: pb.SettlementWithWindowResponse_ACCEPTED,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-10-20 19:54:17 +01:00
|
|
|
func trackFinalStatus(status pb.SettlementWithWindowResponse_Status) {
|
|
|
|
switch status {
|
|
|
|
case pb.SettlementWithWindowResponse_ACCEPTED:
|
|
|
|
mon.Event("settlement_response_accepted")
|
|
|
|
case pb.SettlementWithWindowResponse_REJECTED:
|
|
|
|
mon.Event("settlement_response_rejected")
|
|
|
|
default:
|
|
|
|
mon.Event("settlement_response_unknown")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
// SettlementWithWindowFinal processes all orders that were created in a 1 hour window.
|
|
|
|
// Only one window is processed at a time.
|
|
|
|
// Batches are atomic, all orders are settled successfully or they all fail.
|
|
|
|
func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
|
|
|
|
ctx := stream.Context()
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-10-20 19:54:17 +01:00
|
|
|
var alreadyProcessed bool
|
|
|
|
var status pb.SettlementWithWindowResponse_Status
|
|
|
|
defer trackFinalStatus(status)
|
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
|
|
if err != nil {
|
|
|
|
endpoint.log.Debug("err peer identity from context", zap.Error(err))
|
|
|
|
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
|
|
|
}
|
|
|
|
|
2020-07-15 18:08:24 +01:00
|
|
|
err = endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders)
|
|
|
|
if err != nil {
|
|
|
|
return rpcstatus.Wrap(rpcstatus.Internal, err)
|
|
|
|
}
|
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
log := endpoint.log.Named(peer.ID.String())
|
|
|
|
log.Debug("SettlementWithWindow")
|
|
|
|
|
2020-11-04 17:24:11 +00:00
|
|
|
storagenodeSettled := map[int32]int64{}
|
|
|
|
bucketSettled := map[bucketIDAction]int64{}
|
|
|
|
seenSerials := map[storj.SerialNumber]struct{}{}
|
2020-06-11 19:31:45 +01:00
|
|
|
|
|
|
|
var window int64
|
|
|
|
var request *pb.SettlementRequest
|
|
|
|
var receivedCount int
|
|
|
|
for {
|
|
|
|
request, err = stream.Recv()
|
|
|
|
if err != nil {
|
2020-07-14 14:04:38 +01:00
|
|
|
if errors.Is(err, io.EOF) {
|
2020-06-11 19:31:45 +01:00
|
|
|
break
|
|
|
|
}
|
|
|
|
log.Debug("err streaming order request", zap.Error(err))
|
|
|
|
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
|
|
|
|
}
|
|
|
|
receivedCount++
|
|
|
|
|
|
|
|
orderLimit := request.Limit
|
|
|
|
if orderLimit == nil {
|
|
|
|
log.Debug("request.OrderLimit is nil")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if window == 0 {
|
|
|
|
window = date.TruncateToHourInNano(orderLimit.OrderCreation)
|
|
|
|
}
|
|
|
|
order := request.Order
|
|
|
|
if order == nil {
|
|
|
|
log.Debug("request.Order is nil")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
serialNum := order.SerialNumber
|
|
|
|
|
|
|
|
// don't process orders that aren't valid
|
|
|
|
if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// don't process orders with serial numbers we've already seen
|
|
|
|
if _, ok := seenSerials[serialNum]; ok {
|
|
|
|
log.Debug("seen serial", zap.String("serial number", serialNum.String()))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
seenSerials[serialNum] = struct{}{}
|
|
|
|
|
|
|
|
storagenodeSettled[int32(orderLimit.Action)] += order.Amount
|
|
|
|
|
2020-12-17 14:46:46 +00:00
|
|
|
metadata, err := endpoint.ordersService.DecryptOrderMetadata(ctx, orderLimit)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("decrypt order metadata err:", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_1")
|
|
|
|
continue
|
2020-06-11 19:31:45 +01:00
|
|
|
}
|
2021-01-08 16:04:46 +00:00
|
|
|
|
|
|
|
var bucketInfo metabase.BucketLocation
|
|
|
|
switch {
|
|
|
|
case len(metadata.CompactProjectBucketPrefix) > 0:
|
|
|
|
bucketInfo, err = metabase.ParseCompactBucketPrefix(metadata.GetCompactProjectBucketPrefix())
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("decrypt order: ParseCompactBucketPrefix", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_compact")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
case len(metadata.ProjectBucketPrefix) > 0:
|
|
|
|
bucketInfo, err = metabase.ParseBucketPrefix(metabase.BucketPrefix(metadata.GetProjectBucketPrefix()))
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("decrypt order: ParseBucketPrefix", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_uncompact")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
log.Debug("decrypt order: project bucket prefix missing", zap.Error(err))
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_default")
|
2020-12-17 14:46:46 +00:00
|
|
|
continue
|
|
|
|
}
|
2021-01-08 16:04:46 +00:00
|
|
|
|
2020-12-17 14:46:46 +00:00
|
|
|
if bucketInfo.BucketName == "" || bucketInfo.ProjectID.IsZero() {
|
|
|
|
log.Info("decrypt order: bucketName or projectID not set",
|
|
|
|
zap.String("bucketName", bucketInfo.BucketName),
|
|
|
|
zap.String("projectID", bucketInfo.ProjectID.String()),
|
|
|
|
)
|
|
|
|
mon.Event("bucketinfo_from_orders_metadata_error_3")
|
|
|
|
continue
|
2020-06-11 19:31:45 +01:00
|
|
|
}
|
2020-11-18 21:39:13 +00:00
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
bucketSettled[bucketIDAction{
|
2020-12-17 14:46:46 +00:00
|
|
|
bucketname: bucketInfo.BucketName,
|
|
|
|
projectID: bucketInfo.ProjectID,
|
2020-06-11 19:31:45 +01:00
|
|
|
action: orderLimit.Action,
|
|
|
|
}] += order.Amount
|
|
|
|
}
|
2020-11-18 21:39:13 +00:00
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
if len(storagenodeSettled) == 0 {
|
|
|
|
log.Debug("no orders were successfully processed", zap.Int("received count", receivedCount))
|
2020-10-20 19:54:17 +01:00
|
|
|
status = pb.SettlementWithWindowResponse_REJECTED
|
2020-06-11 19:31:45 +01:00
|
|
|
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
|
2020-10-20 19:54:17 +01:00
|
|
|
Status: status,
|
2020-06-11 19:31:45 +01:00
|
|
|
ActionSettled: storagenodeSettled,
|
|
|
|
})
|
|
|
|
}
|
2020-10-20 19:54:17 +01:00
|
|
|
status, alreadyProcessed, err = endpoint.DB.UpdateStoragenodeBandwidthSettleWithWindow(
|
2020-06-11 19:31:45 +01:00
|
|
|
ctx, peer.ID, storagenodeSettled, time.Unix(0, window),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Debug("err updating storagenode bandwidth settle", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
log.Debug("orders processed",
|
|
|
|
zap.Int("total orders received", receivedCount),
|
|
|
|
zap.Time("window", time.Unix(0, window)),
|
|
|
|
zap.String("status", status.String()),
|
|
|
|
)
|
|
|
|
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed {
|
2020-06-11 19:31:45 +01:00
|
|
|
for bucketIDAction, amount := range bucketSettled {
|
|
|
|
err = endpoint.DB.UpdateBucketBandwidthSettle(ctx,
|
|
|
|
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, amount, time.Unix(0, window),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Info("err updating bucket bandwidth settle", zap.Error(err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
mon.Event("orders_already_processed")
|
|
|
|
}
|
|
|
|
|
|
|
|
if status == pb.SettlementWithWindowResponse_REJECTED {
|
|
|
|
storagenodeSettled = map[int32]int64{}
|
|
|
|
}
|
|
|
|
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
|
|
|
|
Status: status,
|
|
|
|
ActionSettled: storagenodeSettled,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-12-05 16:01:42 +00:00
|
|
|
func (endpoint *Endpoint) isValid(ctx context.Context, log *zap.Logger, order *pb.Order,
|
|
|
|
orderLimit *pb.OrderLimit, peerID storj.NodeID, window int64) bool {
|
|
|
|
|
2020-06-11 19:31:45 +01:00
|
|
|
if orderLimit.StorageNodeId != peerID {
|
|
|
|
log.Debug("storage node id mismatch")
|
|
|
|
mon.Event("order_not_valid_storagenodeid")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// check expiration first before the signatures so that we can throw out the large amount
|
|
|
|
// of expired orders being sent to us before doing expensive signature verification.
|
|
|
|
if orderLimit.OrderExpiration.Before(time.Now().UTC()) {
|
|
|
|
log.Debug("invalid settlement: order limit expired")
|
|
|
|
mon.Event("order_not_valid_expired")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// satellite verifies that it signed the order limit
|
|
|
|
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
|
|
|
|
log.Debug("invalid settlement: unable to verify order limit")
|
|
|
|
mon.Event("order_not_valid_satellite_signature")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// satellite verifies that the order signature matches pub key in order limit
|
|
|
|
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
|
|
|
|
log.Debug("invalid settlement: unable to verify order")
|
|
|
|
mon.Event("order_not_valid_uplink_signature")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if orderLimit.SerialNumber != order.SerialNumber {
|
|
|
|
log.Debug("invalid settlement: invalid serial number")
|
|
|
|
mon.Event("order_not_valid_serialnum_mismatch")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// verify the 1 hr windows match
|
|
|
|
if window != date.TruncateToHourInNano(orderLimit.OrderCreation) {
|
|
|
|
log.Debug("invalid settlement: window mismatch")
|
|
|
|
mon.Event("order_not_valid_window_mismatch")
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|