78c6d5bb32
this commit introduces the reported_serials table. its purpose is to allow for blind writes into it as nodes report in so that we have minimal contention. in order to continue to accurately account for used bandwidth, though, we cannot immediately add the settled amount. if we did, we would have to give up on blind writes. the table's primary key is structured precisely so that we can quickly find expired orders and so that we maximally benefit from rocksdb path prefix compression. we do this by rounding the expires at time forward to the next day, effectively giving us storagenode petnames for free. and since there's no secondary index or foreign key constraints, this design should use significantly less space than the current used_serials table while also reducing contention. after inserting the orders into the table, we have a chore that periodically consumes all of the expired orders in it and inserts them into the existing rollups tables. this is as if we changed the nodes to report as the order expired rather than as soon as possible, so the belief in correctness of the refactor is higher. since we are able to process large batches of orders (typically a day's worth), we can use the code to maximally batch inserts into the rollup tables to make inserts as friendly as possible to cockroach. Change-Id: I25d609ca2679b8331979184f16c6d46d4f74c1a6
339 lines
11 KiB
Go
339 lines
11 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package orders
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"gopkg.in/spacemonkeygo/monkit.v2"
|
|
|
|
"storj.io/common/identity"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/rpc/rpcstatus"
|
|
"storj.io/common/signing"
|
|
"storj.io/common/storj"
|
|
)
|
|
|
|
// DB implements saving order after receiving from storage node
|
|
//
|
|
// architecture: Database
|
|
type DB interface {
|
|
// CreateSerialInfo creates serial number entry in database
|
|
CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
|
|
// UseSerialNumber creates serial number entry in database
|
|
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
|
|
// UnuseSerialNumber removes pair serial number -> storage node id from database
|
|
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
|
|
// DeleteExpiredSerials deletes all expired serials in serial_number and used_serials table.
|
|
DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error)
|
|
|
|
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
|
|
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
|
|
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
|
|
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
|
|
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
|
|
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
|
|
// GetBucketBandwidth gets total bucket bandwidth from period of time
|
|
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
|
|
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
|
|
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
|
|
|
|
// ProcessOrders takes a list of order requests and processes them in a batch
|
|
ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error)
|
|
|
|
// GetBillableBandwidth gets total billable (expired reported serial) bandwidth for nodes and buckets for all actions.
|
|
GetBillableBandwidth(ctx context.Context, now time.Time) (bucketRollups []BucketBandwidthRollup, storagenodeRollups []StoragenodeBandwidthRollup, err error)
|
|
|
|
// ExecuteInTx runs the callback and provides it with a Transaction.
|
|
ExecuteInTx(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error
|
|
}
|
|
|
|
// Transaction represents a database transaction but with higher level actions.
|
|
type Transaction interface {
|
|
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
|
|
UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error
|
|
// UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database
|
|
UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error
|
|
// DeleteExpiredReportedSerials deletes any expired reported serials as of now.
|
|
DeleteExpiredReportedSerials(ctx context.Context, now time.Time) (err error)
|
|
}
|
|
|
|
var (
|
|
// Error the default orders errs class
|
|
Error = errs.Class("orders error")
|
|
// ErrUsingSerialNumber error class for serial number
|
|
ErrUsingSerialNumber = errs.Class("serial number")
|
|
|
|
mon = monkit.Package()
|
|
)
|
|
|
|
// BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup
|
|
type BucketBandwidthRollup struct {
|
|
ProjectID uuid.UUID
|
|
BucketName string
|
|
Action pb.PieceAction
|
|
Inline int64
|
|
Allocated int64
|
|
Settled int64
|
|
}
|
|
|
|
// SortBucketBandwidthRollups sorts the rollups
|
|
func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup) {
|
|
sort.SliceStable(rollups, func(i, j int) bool {
|
|
uuidCompare := bytes.Compare(rollups[i].ProjectID[:], rollups[j].ProjectID[:])
|
|
switch {
|
|
case uuidCompare == -1:
|
|
return true
|
|
case uuidCompare == 1:
|
|
return false
|
|
case rollups[i].BucketName < rollups[j].BucketName:
|
|
return true
|
|
case rollups[i].BucketName > rollups[j].BucketName:
|
|
return false
|
|
case rollups[i].Action < rollups[j].Action:
|
|
return true
|
|
case rollups[i].Action > rollups[j].Action:
|
|
return false
|
|
default:
|
|
return false
|
|
}
|
|
})
|
|
}
|
|
|
|
// StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup
|
|
type StoragenodeBandwidthRollup struct {
|
|
NodeID storj.NodeID
|
|
Action pb.PieceAction
|
|
Allocated int64
|
|
Settled int64
|
|
}
|
|
|
|
// SortStoragenodeBandwidthRollups sorts the rollups
|
|
func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup) {
|
|
sort.SliceStable(rollups, func(i, j int) bool {
|
|
nodeCompare := bytes.Compare(rollups[i].NodeID.Bytes(), rollups[j].NodeID.Bytes())
|
|
switch {
|
|
case nodeCompare == -1:
|
|
return true
|
|
case nodeCompare == 1:
|
|
return false
|
|
case rollups[i].Action < rollups[j].Action:
|
|
return true
|
|
case rollups[i].Action > rollups[j].Action:
|
|
return false
|
|
default:
|
|
return false
|
|
}
|
|
})
|
|
}
|
|
|
|
// ProcessOrderRequest for batch order processing
|
|
type ProcessOrderRequest struct {
|
|
Order *pb.Order
|
|
OrderLimit *pb.OrderLimit
|
|
}
|
|
|
|
// ProcessOrderResponse for batch order processing responses
|
|
type ProcessOrderResponse struct {
|
|
SerialNumber storj.SerialNumber
|
|
Status pb.SettlementResponse_Status
|
|
}
|
|
|
|
// Endpoint for orders receiving
|
|
//
|
|
// architecture: Endpoint
|
|
type Endpoint struct {
|
|
log *zap.Logger
|
|
satelliteSignee signing.Signee
|
|
DB DB
|
|
settlementBatchSize int
|
|
bigHonkinMutex sync.Mutex
|
|
}
|
|
|
|
// drpcEndpoint wraps streaming methods so that they can be used with drpc
|
|
type drpcEndpoint struct{ *Endpoint }
|
|
|
|
// DRPC returns a DRPC form of the endpoint.
|
|
func (endpoint *Endpoint) DRPC() pb.DRPCOrdersServer { return &drpcEndpoint{Endpoint: endpoint} }
|
|
|
|
// NewEndpoint new orders receiving endpoint
|
|
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, settlementBatchSize int) *Endpoint {
|
|
return &Endpoint{
|
|
log: log,
|
|
satelliteSignee: satelliteSignee,
|
|
DB: db,
|
|
settlementBatchSize: settlementBatchSize,
|
|
}
|
|
}
|
|
|
|
func monitoredSettlementStreamReceive(ctx context.Context, stream settlementStream) (_ *pb.SettlementRequest, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
return stream.Recv()
|
|
}
|
|
|
|
func monitoredSettlementStreamSend(ctx context.Context, stream settlementStream, resp *pb.SettlementResponse) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
switch resp.Status {
|
|
case pb.SettlementResponse_ACCEPTED:
|
|
mon.Event("settlement_response_accepted")
|
|
case pb.SettlementResponse_REJECTED:
|
|
mon.Event("settlement_response_rejected")
|
|
default:
|
|
mon.Event("settlement_response_unknown")
|
|
}
|
|
return stream.Send(resp)
|
|
}
|
|
|
|
// Settlement receives orders and handles them in batches
|
|
func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err error) {
|
|
return endpoint.doSettlement(stream)
|
|
}
|
|
|
|
// Settlement receives orders and handles them in batches
|
|
func (endpoint *drpcEndpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) {
|
|
return endpoint.doSettlement(stream)
|
|
}
|
|
|
|
// settlementStream is the minimum interface required to perform settlements.
|
|
type settlementStream interface {
|
|
Context() context.Context
|
|
Send(*pb.SettlementResponse) error
|
|
Recv() (*pb.SettlementRequest, error)
|
|
}
|
|
|
|
// doSettlement receives orders and handles them in batches
|
|
func (endpoint *Endpoint) doSettlement(stream settlementStream) (err error) {
|
|
ctx := stream.Context()
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
if err != nil {
|
|
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
|
|
}
|
|
|
|
formatError := func(err error) error {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
|
|
}
|
|
|
|
log := endpoint.log.Named(peer.ID.String())
|
|
log.Debug("Settlement")
|
|
|
|
requests := make([]*ProcessOrderRequest, 0, endpoint.settlementBatchSize)
|
|
|
|
defer func() {
|
|
if len(requests) > 0 {
|
|
err = errs.Combine(err, endpoint.processOrders(ctx, stream, requests))
|
|
if err != nil {
|
|
err = formatError(err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
request, err := monitoredSettlementStreamReceive(ctx, stream)
|
|
if err != nil {
|
|
return formatError(err)
|
|
}
|
|
|
|
if request == nil {
|
|
return rpcstatus.Error(rpcstatus.InvalidArgument, "request missing")
|
|
}
|
|
if request.Limit == nil {
|
|
return rpcstatus.Error(rpcstatus.InvalidArgument, "order limit missing")
|
|
}
|
|
if request.Order == nil {
|
|
return rpcstatus.Error(rpcstatus.InvalidArgument, "order missing")
|
|
}
|
|
|
|
orderLimit := request.Limit
|
|
order := request.Order
|
|
|
|
if orderLimit.StorageNodeId != peer.ID {
|
|
return rpcstatus.Error(rpcstatus.Unauthenticated, "only specified storage node can settle order")
|
|
}
|
|
|
|
rejectErr := func() error {
|
|
// satellite verifies that it signed the order limit
|
|
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
|
|
return Error.New("unable to verify order limit")
|
|
}
|
|
|
|
// satellite verifies that the order signature matches pub key in order limit
|
|
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
|
|
return Error.New("unable to verify order")
|
|
}
|
|
|
|
// TODO should this reject or just error ??
|
|
if orderLimit.SerialNumber != order.SerialNumber {
|
|
return Error.New("invalid serial number")
|
|
}
|
|
|
|
if orderLimit.OrderExpiration.Before(time.Now()) {
|
|
return Error.New("order limit expired")
|
|
}
|
|
return nil
|
|
}()
|
|
if rejectErr != nil {
|
|
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(rejectErr))
|
|
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
|
|
SerialNumber: orderLimit.SerialNumber,
|
|
Status: pb.SettlementResponse_REJECTED,
|
|
})
|
|
if err != nil {
|
|
return formatError(err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
requests = append(requests, &ProcessOrderRequest{Order: order, OrderLimit: orderLimit})
|
|
|
|
if len(requests) >= endpoint.settlementBatchSize {
|
|
err = endpoint.processOrders(ctx, stream, requests)
|
|
requests = requests[:0]
|
|
if err != nil {
|
|
return formatError(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (endpoint *Endpoint) processOrders(ctx context.Context, stream settlementStream, requests []*ProcessOrderRequest) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
endpoint.bigHonkinMutex.Lock()
|
|
responses, err := endpoint.DB.ProcessOrders(ctx, requests)
|
|
if err != nil {
|
|
endpoint.bigHonkinMutex.Unlock()
|
|
return err
|
|
}
|
|
endpoint.bigHonkinMutex.Unlock()
|
|
|
|
for _, response := range responses {
|
|
r := &pb.SettlementResponse{
|
|
SerialNumber: response.SerialNumber,
|
|
Status: response.Status,
|
|
}
|
|
err = monitoredSettlementStreamSend(ctx, stream, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|