storj/satellite/orders/endpoint.go
Jeff Wendling 1fecaed7df satellite/orders: don't version check old endpoint
nodes are submitting using both the legacy and windowed endpoints
and thus having their legacy submissions rejected.

it is legal to use both the legacy and windowed endpoints
in phase1 since they use the same backend. the legacy endpoint
is disabled in phase2 and phase3.

therefore, if we wait an order expiration period (2 days) after
we determine enough nodes have started using the windowed
endpoint, we can be sure that any orders they did have to
submit with the legacy endpoint will have expired.

Change-Id: I4418a881bf8bb9377efaef4c651e6103a5dc6ed0
2020-09-09 10:23:48 -04:00

677 lines
23 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"bytes"
"context"
"errors"
"io"
"sort"
"time"
monkit "github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/private/date"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/storj/satellite/nodeapiversion"
)
// DB implements saving order after receiving from storage node
//
// architecture: Database
type DB interface {
// CreateSerialInfo creates serial number entry in database.
CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
// UseSerialNumber creates a used serial number entry in database from an
// existing serial number.
// It returns the bucket ID associated to serialNumber.
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
// UnuseSerialNumber removes pair serial number -> storage node id from database
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
// DeleteExpiredSerials deletes all expired serials in serial_number, used_serials, and consumed_serials table.
DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error)
// DeleteExpiredConsumedSerials deletes all expired serials in the consumed_serials table.
DeleteExpiredConsumedSerials(ctx context.Context, now time.Time) (_ int, err error)
// GetBucketIDFromSerialNumber returns the bucket ID associated with the serial number
GetBucketIDFromSerialNumber(ctx context.Context, serialNumber storj.SerialNumber) ([]byte, error)
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateStoragenodeBandwidthSettleWithWindow updates 'settled' bandwidth for given storage node
UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error)
// GetBucketBandwidth gets total bucket bandwidth from period of time
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
// ProcessOrders takes a list of order requests and processes them in a batch
ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error)
// WithTransaction runs the callback and provides it with a Transaction.
WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error
// WithQueue runs the callback and provides it with a Queue. When the callback returns with
// no error, any pending serials returned by the queue are removed from it.
WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error
}
// Transaction represents a database transaction but with higher level actions.
type Transaction interface {
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error
// UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database
UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error
// CreateConsumedSerialsBatch creates the batch of ConsumedSerials.
CreateConsumedSerialsBatch(ctx context.Context, consumedSerials []ConsumedSerial) (err error)
// HasConsumedSerial returns true if the node and serial number have been consumed.
HasConsumedSerial(ctx context.Context, nodeID storj.NodeID, serialNumber storj.SerialNumber) (bool, error)
}
// Queue is an abstraction around a queue of pending serials.
type Queue interface {
// GetPendingSerialsBatch returns a batch of pending serials containing at most size
// entries. It returns a boolean indicating true if the queue is empty.
GetPendingSerialsBatch(ctx context.Context, size int) ([]PendingSerial, bool, error)
}
// ConsumedSerial is a serial that has been consumed and its bandwidth recorded.
type ConsumedSerial struct {
NodeID storj.NodeID
SerialNumber storj.SerialNumber
ExpiresAt time.Time
}
// PendingSerial is a serial number reported by a storagenode waiting to be
// settled.
type PendingSerial struct {
NodeID storj.NodeID
BucketID []byte
Action uint
SerialNumber storj.SerialNumber
ExpiresAt time.Time
Settled uint64
}
var (
// Error the default orders errs class.
Error = errs.Class("orders error")
// ErrUsingSerialNumber error class for serial number.
ErrUsingSerialNumber = errs.Class("serial number")
errExpiredOrder = errs.Class("order limit expired")
mon = monkit.Package()
)
// BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup.
type BucketBandwidthRollup struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
Inline int64
Allocated int64
Settled int64
}
// SortBucketBandwidthRollups sorts the rollups.
func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup) {
sort.SliceStable(rollups, func(i, j int) bool {
uuidCompare := bytes.Compare(rollups[i].ProjectID[:], rollups[j].ProjectID[:])
switch {
case uuidCompare == -1:
return true
case uuidCompare == 1:
return false
case rollups[i].BucketName < rollups[j].BucketName:
return true
case rollups[i].BucketName > rollups[j].BucketName:
return false
case rollups[i].Action < rollups[j].Action:
return true
case rollups[i].Action > rollups[j].Action:
return false
default:
return false
}
})
}
// StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup.
type StoragenodeBandwidthRollup struct {
NodeID storj.NodeID
Action pb.PieceAction
Allocated int64
Settled int64
}
// SortStoragenodeBandwidthRollups sorts the rollups.
func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup) {
sort.SliceStable(rollups, func(i, j int) bool {
nodeCompare := bytes.Compare(rollups[i].NodeID.Bytes(), rollups[j].NodeID.Bytes())
switch {
case nodeCompare == -1:
return true
case nodeCompare == 1:
return false
case rollups[i].Action < rollups[j].Action:
return true
case rollups[i].Action > rollups[j].Action:
return false
default:
return false
}
})
}
// ProcessOrderRequest for batch order processing.
type ProcessOrderRequest struct {
Order *pb.Order
OrderLimit *pb.OrderLimit
}
// ProcessOrderResponse for batch order processing responses.
type ProcessOrderResponse struct {
SerialNumber storj.SerialNumber
Status pb.SettlementResponse_Status
}
// Endpoint for orders receiving
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
satelliteSignee signing.Signee
DB DB
nodeAPIVersionDB nodeapiversion.DB
settlementBatchSize int
windowEndpointRolloutPhase WindowEndpointRolloutPhase
}
// NewEndpoint new orders receiving endpoint.
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase) *Endpoint {
return &Endpoint{
log: log,
satelliteSignee: satelliteSignee,
DB: db,
nodeAPIVersionDB: nodeAPIVersionDB,
settlementBatchSize: settlementBatchSize,
windowEndpointRolloutPhase: windowEndpointRolloutPhase,
}
}
func monitoredSettlementStreamReceive(ctx context.Context, stream pb.DRPCOrders_SettlementStream) (_ *pb.SettlementRequest, err error) {
defer mon.Task()(&ctx)(&err)
return stream.Recv()
}
func monitoredSettlementStreamSend(ctx context.Context, stream pb.DRPCOrders_SettlementStream, resp *pb.SettlementResponse) (err error) {
defer mon.Task()(&ctx)(&err)
switch resp.Status {
case pb.SettlementResponse_ACCEPTED:
mon.Event("settlement_response_accepted")
case pb.SettlementResponse_REJECTED:
mon.Event("settlement_response_rejected")
default:
mon.Event("settlement_response_unknown")
}
return stream.Send(resp)
}
// Settlement receives orders and handles them in batches.
func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
switch endpoint.windowEndpointRolloutPhase {
case WindowEndpointRolloutPhase1:
case WindowEndpointRolloutPhase2, WindowEndpointRolloutPhase3:
return rpcstatus.Error(rpcstatus.Unavailable, "endpoint disabled")
default:
return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase")
}
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
formatError := func(err error) error {
if errors.Is(err, io.EOF) {
return nil
}
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
}
log := endpoint.log.Named(peer.ID.String())
log.Debug("Settlement")
requests := make([]*ProcessOrderRequest, 0, endpoint.settlementBatchSize)
defer func() {
if len(requests) > 0 {
err = errs.Combine(err, endpoint.processOrders(ctx, stream, requests))
if err != nil {
err = formatError(err)
}
}
}()
var expirationCount int64
defer func() {
if expirationCount > 0 {
log.Debug("order verification found expired orders", zap.Int64("amount", expirationCount))
}
}()
for {
request, err := monitoredSettlementStreamReceive(ctx, stream)
if err != nil {
return formatError(err)
}
if request == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "request missing")
}
if request.Limit == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "order limit missing")
}
if request.Order == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "order missing")
}
orderLimit := request.Limit
order := request.Order
if orderLimit.StorageNodeId != peer.ID {
return rpcstatus.Error(rpcstatus.Unauthenticated, "only specified storage node can settle order")
}
rejectErr := func() error {
// check expiration first before the signatures so that we can throw out the large
// amount of expired orders being sent to us before doing expensive signature
// verification.
if orderLimit.OrderExpiration.Before(time.Now()) {
mon.Event("order_verification_failed_expired")
expirationCount++
return errExpiredOrder.New("order limit expired")
}
// satellite verifies that it signed the order limit
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
mon.Event("order_verification_failed_satellite_signature")
return Error.New("unable to verify order limit")
}
// satellite verifies that the order signature matches pub key in order limit
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
mon.Event("order_verification_failed_uplink_signature")
return Error.New("unable to verify order")
}
// TODO should this reject or just error ??
if orderLimit.SerialNumber != order.SerialNumber {
mon.Event("order_verification_failed_serial_mismatch")
return Error.New("invalid serial number")
}
return nil
}()
if rejectErr != nil {
mon.Event("order_verification_failed")
if !errExpiredOrder.Has(rejectErr) {
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(rejectErr))
}
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
SerialNumber: orderLimit.SerialNumber,
Status: pb.SettlementResponse_REJECTED,
})
if err != nil {
return formatError(err)
}
continue
}
requests = append(requests, &ProcessOrderRequest{Order: order, OrderLimit: orderLimit})
if len(requests) >= endpoint.settlementBatchSize {
err = endpoint.processOrders(ctx, stream, requests)
requests = requests[:0]
if err != nil {
return formatError(err)
}
}
}
}
func (endpoint *Endpoint) processOrders(ctx context.Context, stream pb.DRPCOrders_SettlementStream, requests []*ProcessOrderRequest) (err error) {
defer mon.Task()(&ctx)(&err)
responses, err := endpoint.DB.ProcessOrders(ctx, requests)
if err != nil {
return err
}
for _, response := range responses {
r := &pb.SettlementResponse{
SerialNumber: response.SerialNumber,
Status: response.Status,
}
err = monitoredSettlementStreamSend(ctx, stream, r)
if err != nil {
return err
}
}
return nil
}
type bucketIDAction struct {
bucketname string
projectID uuid.UUID
action pb.PieceAction
}
// SettlementWithWindow processes all orders that were created in a 1 hour window.
// Only one window is processed at a time.
// Batches are atomic, all orders are settled successfully or they all fail.
func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
switch endpoint.windowEndpointRolloutPhase {
case WindowEndpointRolloutPhase1, WindowEndpointRolloutPhase2:
return endpoint.SettlementWithWindowMigration(stream)
case WindowEndpointRolloutPhase3:
return endpoint.SettlementWithWindowFinal(stream)
default:
return rpcstatus.Error(rpcstatus.Internal, "invalid window endpoint rollout phase")
}
}
// SettlementWithWindowMigration implements phase 1 and phase 2 of the windowed order rollout where
// it uses the same backend as the non-windowed settlement and inserts entries containing 0 for
// the window which ensures that it is either entirely handled by the queue or entirely handled by
// the phase 3 endpoint.
func (endpoint *Endpoint) SettlementWithWindowMigration(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
endpoint.log.Debug("err peer identity from context", zap.Error(err))
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
err = endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders)
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
log := endpoint.log.Named(peer.ID.String())
log.Debug("SettlementWithWindow")
var receivedCount int
var window int64
var actions = map[pb.PieceAction]struct{}{}
var requests []*ProcessOrderRequest
var finished bool
for !finished {
requests = requests[:0]
for len(requests) < endpoint.settlementBatchSize {
request, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
finished = true
break
}
log.Debug("err streaming order request", zap.Error(err))
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
}
receivedCount++
orderLimit := request.Limit
if orderLimit == nil {
log.Debug("request.OrderLimit is nil")
continue
}
order := request.Order
if order == nil {
log.Debug("request.Order is nil")
continue
}
if window == 0 {
window = date.TruncateToHourInNano(orderLimit.OrderCreation)
}
// don't process orders that aren't valid
if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) {
continue
}
actions[orderLimit.Action] = struct{}{}
requests = append(requests, &ProcessOrderRequest{
Order: order,
OrderLimit: orderLimit,
})
}
// process all of the orders in the old way
_, err = endpoint.DB.ProcessOrders(ctx, requests)
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
// if we received no valid orders, then respond with rejected
if len(actions) == 0 || window == 0 {
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_REJECTED,
})
}
// insert zero rows for every action involved in the set of orders. this prevents
// many problems (double spends and underspends) by ensuring that any window is
// either handled entirely by the queue or entirely with the phase 3 windowed endpoint.
windowTime := time.Unix(0, window)
for action := range actions {
if err := endpoint.DB.UpdateStoragenodeBandwidthSettle(ctx, peer.ID, action, 0, windowTime); err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
log.Debug("orders processed",
zap.Int("total orders received", receivedCount),
zap.Time("window", windowTime),
)
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_ACCEPTED,
})
}
// SettlementWithWindowFinal processes all orders that were created in a 1 hour window.
// Only one window is processed at a time.
// Batches are atomic, all orders are settled successfully or they all fail.
func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
endpoint.log.Debug("err peer identity from context", zap.Error(err))
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
err = endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders)
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
log := endpoint.log.Named(peer.ID.String())
log.Debug("SettlementWithWindow")
var storagenodeSettled = map[int32]int64{}
var bucketSettled = map[bucketIDAction]int64{}
var seenSerials = map[storj.SerialNumber]struct{}{}
var window int64
var request *pb.SettlementRequest
var receivedCount int
for {
request, err = stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Debug("err streaming order request", zap.Error(err))
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
}
receivedCount++
orderLimit := request.Limit
if orderLimit == nil {
log.Debug("request.OrderLimit is nil")
continue
}
if window == 0 {
window = date.TruncateToHourInNano(orderLimit.OrderCreation)
}
order := request.Order
if order == nil {
log.Debug("request.Order is nil")
continue
}
serialNum := order.SerialNumber
// don't process orders that aren't valid
if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) {
continue
}
// don't process orders with serial numbers we've already seen
if _, ok := seenSerials[serialNum]; ok {
log.Debug("seen serial", zap.String("serial number", serialNum.String()))
continue
}
seenSerials[serialNum] = struct{}{}
storagenodeSettled[int32(orderLimit.Action)] += order.Amount
bucketPrefix, err := endpoint.DB.GetBucketIDFromSerialNumber(ctx, serialNum)
if err != nil {
log.Info("get bucketPrefix from serial number table err", zap.Error(err))
continue
}
bucket, err := metabase.ParseBucketPrefix(metabase.BucketPrefix(bucketPrefix))
if err != nil {
log.Info("split bucket err", zap.Error(err), zap.String("bucketPrefix", string(bucketPrefix)))
continue
}
bucketSettled[bucketIDAction{
bucketname: bucket.BucketName,
projectID: bucket.ProjectID,
action: orderLimit.Action,
}] += order.Amount
}
if len(storagenodeSettled) == 0 {
log.Debug("no orders were successfully processed", zap.Int("received count", receivedCount))
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
Status: pb.SettlementWithWindowResponse_REJECTED,
ActionSettled: storagenodeSettled,
})
}
status, alreadyProcessed, err := endpoint.DB.UpdateStoragenodeBandwidthSettleWithWindow(
ctx, peer.ID, storagenodeSettled, time.Unix(0, window),
)
if err != nil {
log.Debug("err updating storagenode bandwidth settle", zap.Error(err))
return err
}
log.Debug("orders processed",
zap.Int("total orders received", receivedCount),
zap.Time("window", time.Unix(0, window)),
zap.String("status", status.String()),
)
if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed {
for bucketIDAction, amount := range bucketSettled {
err = endpoint.DB.UpdateBucketBandwidthSettle(ctx,
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, amount, time.Unix(0, window),
)
if err != nil {
log.Info("err updating bucket bandwidth settle", zap.Error(err))
}
}
} else {
mon.Event("orders_already_processed")
}
if status == pb.SettlementWithWindowResponse_REJECTED {
storagenodeSettled = map[int32]int64{}
}
return stream.SendAndClose(&pb.SettlementWithWindowResponse{
Status: status,
ActionSettled: storagenodeSettled,
})
}
func (endpoint *Endpoint) isValid(ctx context.Context, log *zap.Logger, order *pb.Order, orderLimit *pb.OrderLimit, peerID storj.NodeID, window int64) bool {
if orderLimit.StorageNodeId != peerID {
log.Debug("storage node id mismatch")
mon.Event("order_not_valid_storagenodeid")
return false
}
// check expiration first before the signatures so that we can throw out the large amount
// of expired orders being sent to us before doing expensive signature verification.
if orderLimit.OrderExpiration.Before(time.Now().UTC()) {
log.Debug("invalid settlement: order limit expired")
mon.Event("order_not_valid_expired")
return false
}
// satellite verifies that it signed the order limit
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
log.Debug("invalid settlement: unable to verify order limit")
mon.Event("order_not_valid_satellite_signature")
return false
}
// satellite verifies that the order signature matches pub key in order limit
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
log.Debug("invalid settlement: unable to verify order")
mon.Event("order_not_valid_uplink_signature")
return false
}
if orderLimit.SerialNumber != order.SerialNumber {
log.Debug("invalid settlement: invalid serial number")
mon.Event("order_not_valid_serialnum_mismatch")
return false
}
// verify the 1 hr windows match
if window != date.TruncateToHourInNano(orderLimit.OrderCreation) {
log.Debug("invalid settlement: window mismatch")
mon.Event("order_not_valid_window_mismatch")
return false
}
return true
}