// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package orders import ( "bytes" "context" "errors" "io" "sort" "time" monkit "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/identity" "storj.io/common/pb" "storj.io/common/rpc/rpcstatus" "storj.io/common/signing" "storj.io/common/storj" "storj.io/common/uuid" "storj.io/storj/private/date" "storj.io/storj/satellite/nodeapiversion" ) // DB implements saving order after receiving from storage node // // architecture: Database type DB interface { // CreateSerialInfo creates serial number entry in database. CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error // UseSerialNumber creates a used serial number entry in database from an // existing serial number. // It returns the bucket ID associated to serialNumber. UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error) // UnuseSerialNumber removes pair serial number -> storage node id from database UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error // DeleteExpiredSerials deletes all expired serials in serial_number, used_serials, and consumed_serials table. DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error) // DeleteExpiredConsumedSerials deletes all expired serials in the consumed_serials table. DeleteExpiredConsumedSerials(ctx context.Context, now time.Time) (_ int, err error) // GetBucketIDFromSerialNumber returns the bucket ID associated with the serial number GetBucketIDFromSerialNumber(ctx context.Context, serialNumber storj.SerialNumber) ([]byte, error) // UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateStoragenodeBandwidthSettleWithWindow updates 'settled' bandwidth for given storage node UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error) // GetBucketBandwidth gets total bucket bandwidth from period of time GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error) // GetStorageNodeBandwidth gets total storage node bandwidth from period of time GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error) // ProcessOrders takes a list of order requests and processes them in a batch ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error) // WithTransaction runs the callback and provides it with a Transaction. WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error // WithQueue runs the callback and provides it with a Queue. When the callback returns with // no error, any pending serials returned by the queue are removed from it. WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error } // Transaction represents a database transaction but with higher level actions. type Transaction interface { // UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error // UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error // CreateConsumedSerialsBatch creates the batch of ConsumedSerials. CreateConsumedSerialsBatch(ctx context.Context, consumedSerials []ConsumedSerial) (err error) // HasConsumedSerial returns true if the node and serial number have been consumed. HasConsumedSerial(ctx context.Context, nodeID storj.NodeID, serialNumber storj.SerialNumber) (bool, error) } // Queue is an abstraction around a queue of pending serials. type Queue interface { // GetPendingSerialsBatch returns a batch of pending serials containing at most size // entries. It returns a boolean indicating true if the queue is empty. GetPendingSerialsBatch(ctx context.Context, size int) ([]PendingSerial, bool, error) } // ConsumedSerial is a serial that has been consumed and its bandwidth recorded. type ConsumedSerial struct { NodeID storj.NodeID SerialNumber storj.SerialNumber ExpiresAt time.Time } // PendingSerial is a serial number reported by a storagenode waiting to be // settled type PendingSerial struct { NodeID storj.NodeID BucketID []byte Action uint SerialNumber storj.SerialNumber ExpiresAt time.Time Settled uint64 } var ( // Error the default orders errs class Error = errs.Class("orders error") // ErrUsingSerialNumber error class for serial number ErrUsingSerialNumber = errs.Class("serial number") errExpiredOrder = errs.Class("order limit expired") mon = monkit.Package() ) // BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup type BucketBandwidthRollup struct { ProjectID uuid.UUID BucketName string Action pb.PieceAction Inline int64 Allocated int64 Settled int64 } // SortBucketBandwidthRollups sorts the rollups func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup) { sort.SliceStable(rollups, func(i, j int) bool { uuidCompare := bytes.Compare(rollups[i].ProjectID[:], rollups[j].ProjectID[:]) switch { case uuidCompare == -1: return true case uuidCompare == 1: return false case rollups[i].BucketName < rollups[j].BucketName: return true case rollups[i].BucketName > rollups[j].BucketName: return false case rollups[i].Action < rollups[j].Action: return true case rollups[i].Action > rollups[j].Action: return false default: return false } }) } // StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup type StoragenodeBandwidthRollup struct { NodeID storj.NodeID Action pb.PieceAction Allocated int64 Settled int64 } // SortStoragenodeBandwidthRollups sorts the rollups func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup) { sort.SliceStable(rollups, func(i, j int) bool { nodeCompare := bytes.Compare(rollups[i].NodeID.Bytes(), rollups[j].NodeID.Bytes()) switch { case nodeCompare == -1: return true case nodeCompare == 1: return false case rollups[i].Action < rollups[j].Action: return true case rollups[i].Action > rollups[j].Action: return false default: return false } }) } // ProcessOrderRequest for batch order processing type ProcessOrderRequest struct { Order *pb.Order OrderLimit *pb.OrderLimit } // ProcessOrderResponse for batch order processing responses type ProcessOrderResponse struct { SerialNumber storj.SerialNumber Status pb.SettlementResponse_Status } // Endpoint for orders receiving // // architecture: Endpoint type Endpoint struct { log *zap.Logger satelliteSignee signing.Signee DB DB nodeAPIVersionDB nodeapiversion.DB settlementBatchSize int } // NewEndpoint new orders receiving endpoint func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int) *Endpoint { return &Endpoint{ log: log, satelliteSignee: satelliteSignee, DB: db, nodeAPIVersionDB: nodeAPIVersionDB, settlementBatchSize: settlementBatchSize, } } func monitoredSettlementStreamReceive(ctx context.Context, stream pb.DRPCOrders_SettlementStream) (_ *pb.SettlementRequest, err error) { defer mon.Task()(&ctx)(&err) return stream.Recv() } func monitoredSettlementStreamSend(ctx context.Context, stream pb.DRPCOrders_SettlementStream, resp *pb.SettlementResponse) (err error) { defer mon.Task()(&ctx)(&err) switch resp.Status { case pb.SettlementResponse_ACCEPTED: mon.Event("settlement_response_accepted") case pb.SettlementResponse_REJECTED: mon.Event("settlement_response_rejected") default: mon.Event("settlement_response_unknown") } return stream.Send(resp) } // Settlement receives orders and handles them in batches. func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) { ctx := stream.Context() defer mon.Task()(&ctx)(&err) peer, err := identity.PeerIdentityFromContext(ctx) if err != nil { return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) } ok, err := endpoint.nodeAPIVersionDB.VersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders) if err != nil { return rpcstatus.Wrap(rpcstatus.Internal, err) } else if ok { return rpcstatus.Error(rpcstatus.PermissionDenied, "node api version too new") } formatError := func(err error) error { if errors.Is(err, io.EOF) { return nil } return rpcstatus.Error(rpcstatus.Unknown, err.Error()) } log := endpoint.log.Named(peer.ID.String()) log.Debug("Settlement") requests := make([]*ProcessOrderRequest, 0, endpoint.settlementBatchSize) defer func() { if len(requests) > 0 { err = errs.Combine(err, endpoint.processOrders(ctx, stream, requests)) if err != nil { err = formatError(err) } } }() var expirationCount int64 defer func() { if expirationCount > 0 { log.Debug("order verification found expired orders", zap.Int64("amount", expirationCount)) } }() for { request, err := monitoredSettlementStreamReceive(ctx, stream) if err != nil { return formatError(err) } if request == nil { return rpcstatus.Error(rpcstatus.InvalidArgument, "request missing") } if request.Limit == nil { return rpcstatus.Error(rpcstatus.InvalidArgument, "order limit missing") } if request.Order == nil { return rpcstatus.Error(rpcstatus.InvalidArgument, "order missing") } orderLimit := request.Limit order := request.Order if orderLimit.StorageNodeId != peer.ID { return rpcstatus.Error(rpcstatus.Unauthenticated, "only specified storage node can settle order") } rejectErr := func() error { // check expiration first before the signatures so that we can throw out the large // amount of expired orders being sent to us before doing expensive signature // verification. if orderLimit.OrderExpiration.Before(time.Now()) { mon.Event("order_verification_failed_expired") expirationCount++ return errExpiredOrder.New("order limit expired") } // satellite verifies that it signed the order limit if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil { mon.Event("order_verification_failed_satellite_signature") return Error.New("unable to verify order limit") } // satellite verifies that the order signature matches pub key in order limit if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil { mon.Event("order_verification_failed_uplink_signature") return Error.New("unable to verify order") } // TODO should this reject or just error ?? if orderLimit.SerialNumber != order.SerialNumber { mon.Event("order_verification_failed_serial_mismatch") return Error.New("invalid serial number") } return nil }() if rejectErr != nil { mon.Event("order_verification_failed") if !errExpiredOrder.Has(rejectErr) { log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(rejectErr)) } err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{ SerialNumber: orderLimit.SerialNumber, Status: pb.SettlementResponse_REJECTED, }) if err != nil { return formatError(err) } continue } requests = append(requests, &ProcessOrderRequest{Order: order, OrderLimit: orderLimit}) if len(requests) >= endpoint.settlementBatchSize { err = endpoint.processOrders(ctx, stream, requests) requests = requests[:0] if err != nil { return formatError(err) } } } } func (endpoint *Endpoint) processOrders(ctx context.Context, stream pb.DRPCOrders_SettlementStream, requests []*ProcessOrderRequest) (err error) { defer mon.Task()(&ctx)(&err) responses, err := endpoint.DB.ProcessOrders(ctx, requests) if err != nil { return err } for _, response := range responses { r := &pb.SettlementResponse{ SerialNumber: response.SerialNumber, Status: response.Status, } err = monitoredSettlementStreamSend(ctx, stream, r) if err != nil { return err } } return nil } type bucketIDAction struct { bucketname string projectID uuid.UUID action pb.PieceAction } // SettlementWithWindow processes all orders that were created in a 1 hour window. // Only one window is processed at a time. // Batches are atomic, all orders are settled successfully or they all fail. func (endpoint *Endpoint) SettlementWithWindow(stream pb.DRPCOrders_SettlementWithWindowStream) (err error) { ctx := stream.Context() defer mon.Task()(&ctx)(&err) // TODO: remove once the storagenode side of this endpoint is implemented if true { return rpcstatus.Error(rpcstatus.Unimplemented, "endpoint not supporrted") } peer, err := identity.PeerIdentityFromContext(ctx) if err != nil { endpoint.log.Debug("err peer identity from context", zap.Error(err)) return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error()) } err = endpoint.nodeAPIVersionDB.UpdateVersionAtLeast(ctx, peer.ID, nodeapiversion.HasWindowedOrders) if err != nil { return rpcstatus.Wrap(rpcstatus.Internal, err) } log := endpoint.log.Named(peer.ID.String()) log.Debug("SettlementWithWindow") var storagenodeSettled = map[int32]int64{} var bucketSettled = map[bucketIDAction]int64{} var seenSerials = map[storj.SerialNumber]struct{}{} var window int64 var request *pb.SettlementRequest var receivedCount int for { request, err = stream.Recv() if err != nil { if errors.Is(err, io.EOF) { break } log.Debug("err streaming order request", zap.Error(err)) return rpcstatus.Error(rpcstatus.Unknown, err.Error()) } receivedCount++ orderLimit := request.Limit if orderLimit == nil { log.Debug("request.OrderLimit is nil") continue } if window == 0 { window = date.TruncateToHourInNano(orderLimit.OrderCreation) } order := request.Order if order == nil { log.Debug("request.Order is nil") continue } serialNum := order.SerialNumber // don't process orders that aren't valid if !endpoint.isValid(ctx, log, order, orderLimit, peer.ID, window) { continue } // don't process orders with serial numbers we've already seen if _, ok := seenSerials[serialNum]; ok { log.Debug("seen serial", zap.String("serial number", serialNum.String())) continue } seenSerials[serialNum] = struct{}{} storagenodeSettled[int32(orderLimit.Action)] += order.Amount bucketID, err := endpoint.DB.GetBucketIDFromSerialNumber(ctx, serialNum) if err != nil { log.Info("get bucketID from serial number table err", zap.Error(err)) continue } projectID, bucketname, err := SplitBucketID([]byte(bucketID)) if err != nil { log.Info("split bucket err", zap.Error(err), zap.String("bucketID", string(bucketID))) continue } bucketSettled[bucketIDAction{ bucketname: string(bucketname), projectID: projectID, action: orderLimit.Action, }] += order.Amount } if len(storagenodeSettled) == 0 { log.Debug("no orders were successfully processed", zap.Int("received count", receivedCount)) return stream.SendAndClose(&pb.SettlementWithWindowResponse{ Status: pb.SettlementWithWindowResponse_REJECTED, ActionSettled: storagenodeSettled, }) } status, alreadyProcessed, err := endpoint.DB.UpdateStoragenodeBandwidthSettleWithWindow( ctx, peer.ID, storagenodeSettled, time.Unix(0, window), ) if err != nil { log.Debug("err updating storagenode bandwidth settle", zap.Error(err)) return err } log.Debug("orders processed", zap.Int("total orders received", receivedCount), zap.Time("window", time.Unix(0, window)), zap.String("status", status.String()), ) if !alreadyProcessed { for bucketIDAction, amount := range bucketSettled { err = endpoint.DB.UpdateBucketBandwidthSettle(ctx, bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, amount, time.Unix(0, window), ) if err != nil { log.Info("err updating bucket bandwidth settle", zap.Error(err)) } } } else { mon.Event("orders_already_processed") } if status == pb.SettlementWithWindowResponse_REJECTED { storagenodeSettled = map[int32]int64{} } return stream.SendAndClose(&pb.SettlementWithWindowResponse{ Status: status, ActionSettled: storagenodeSettled, }) } func (endpoint *Endpoint) isValid(ctx context.Context, log *zap.Logger, order *pb.Order, orderLimit *pb.OrderLimit, peerID storj.NodeID, window int64) bool { if orderLimit.StorageNodeId != peerID { log.Debug("storage node id mismatch") mon.Event("order_not_valid_storagenodeid") return false } // check expiration first before the signatures so that we can throw out the large amount // of expired orders being sent to us before doing expensive signature verification. if orderLimit.OrderExpiration.Before(time.Now().UTC()) { log.Debug("invalid settlement: order limit expired") mon.Event("order_not_valid_expired") return false } // satellite verifies that it signed the order limit if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil { log.Debug("invalid settlement: unable to verify order limit") mon.Event("order_not_valid_satellite_signature") return false } // satellite verifies that the order signature matches pub key in order limit if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil { log.Debug("invalid settlement: unable to verify order") mon.Event("order_not_valid_uplink_signature") return false } if orderLimit.SerialNumber != order.SerialNumber { log.Debug("invalid settlement: invalid serial number") mon.Event("order_not_valid_serialnum_mismatch") return false } // verify the 1 hr windows match if window != date.TruncateToHourInNano(orderLimit.OrderCreation) { log.Debug("invalid settlement: window mismatch") mon.Event("order_not_valid_window_mismatch") return false } return true }