32f95a14fd
* satellitedb/certDB: refactors of the node certificate storage DB table The existing implementation doesnt allow to store the complete certificate chain of uplinkIDs or storagenodeIDs, so the current table is dropped and new table will be added which addresses the storage and retrieval of certificates pkg/identity: fixes spelling mistakes that I missed on PR#2754 Fixes V3-1992/V3-2388
222 lines
8.2 KiB
Go
222 lines
8.2 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package orders
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"gopkg.in/spacemonkeygo/monkit.v2"
|
|
|
|
"storj.io/storj/pkg/identity"
|
|
"storj.io/storj/pkg/pb"
|
|
"storj.io/storj/pkg/signing"
|
|
"storj.io/storj/pkg/storj"
|
|
)
|
|
|
|
// DB implements saving order after receiving from storage node
|
|
type DB interface {
|
|
// CreateSerialInfo creates serial number entry in database
|
|
CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
|
|
// UseSerialNumber creates serial number entry in database
|
|
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
|
|
// UnuseSerialNumber removes pair serial number -> storage node id from database
|
|
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
|
|
|
|
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
|
|
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
|
|
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
|
|
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
|
|
// UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage nodes
|
|
UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNodes []storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
|
|
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
|
|
|
|
// GetBucketBandwidth gets total bucket bandwidth from period of time
|
|
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
|
|
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
|
|
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
|
|
}
|
|
|
|
var (
|
|
// Error the default orders errs class
|
|
Error = errs.Class("orders error")
|
|
// ErrUsingSerialNumber error class for serial number
|
|
ErrUsingSerialNumber = errs.Class("serial number")
|
|
|
|
mon = monkit.Package()
|
|
)
|
|
|
|
// Endpoint for orders receiving
|
|
type Endpoint struct {
|
|
log *zap.Logger
|
|
satelliteSignee signing.Signee
|
|
DB DB
|
|
}
|
|
|
|
// NewEndpoint new orders receiving endpoint
|
|
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB) *Endpoint {
|
|
return &Endpoint{
|
|
log: log,
|
|
satelliteSignee: satelliteSignee,
|
|
DB: db,
|
|
}
|
|
}
|
|
|
|
func monitoredSettlementStreamReceive(ctx context.Context, stream pb.Orders_SettlementServer) (_ *pb.SettlementRequest, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
return stream.Recv()
|
|
}
|
|
|
|
func monitoredSettlementStreamSend(ctx context.Context, stream pb.Orders_SettlementServer, resp *pb.SettlementResponse) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
switch resp.Status {
|
|
case pb.SettlementResponse_ACCEPTED:
|
|
mon.Event("settlement_response_accepted")
|
|
case pb.SettlementResponse_REJECTED:
|
|
mon.Event("settlement_response_rejected")
|
|
default:
|
|
mon.Event("settlement_response_unknown")
|
|
}
|
|
return stream.Send(resp)
|
|
}
|
|
|
|
// Settlement receives and handles orders.
|
|
func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err error) {
|
|
ctx := stream.Context()
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
peer, err := identity.PeerIdentityFromContext(ctx)
|
|
if err != nil {
|
|
return status.Error(codes.Unauthenticated, err.Error())
|
|
}
|
|
|
|
formatError := func(err error) error {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return status.Error(codes.Unknown, err.Error())
|
|
}
|
|
|
|
log := endpoint.log.Named(peer.ID.String())
|
|
log.Debug("Settlement")
|
|
for {
|
|
// TODO: batch these requests so we hit the db in batches
|
|
request, err := monitoredSettlementStreamReceive(ctx, stream)
|
|
if err != nil {
|
|
return formatError(err)
|
|
}
|
|
|
|
if request == nil {
|
|
return status.Error(codes.InvalidArgument, "request missing")
|
|
}
|
|
if request.Limit == nil {
|
|
return status.Error(codes.InvalidArgument, "order limit missing")
|
|
}
|
|
if request.Order == nil {
|
|
return status.Error(codes.InvalidArgument, "order missing")
|
|
}
|
|
|
|
orderLimit := request.Limit
|
|
order := request.Order
|
|
|
|
if orderLimit.StorageNodeId != peer.ID {
|
|
return status.Error(codes.Unauthenticated, "only specified storage node can settle order")
|
|
}
|
|
|
|
rejectErr := func() error {
|
|
// satellite verifies that it signed the order limit
|
|
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
|
|
return Error.New("unable to verify order limit")
|
|
}
|
|
|
|
// satellite verifies that the order signature matches pub key in order limit
|
|
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
|
|
return Error.New("unable to verify order")
|
|
}
|
|
|
|
// TODO should this reject or just error ??
|
|
if orderLimit.SerialNumber != order.SerialNumber {
|
|
return Error.New("invalid serial number")
|
|
}
|
|
|
|
if orderLimit.OrderExpiration.Before(time.Now()) {
|
|
return Error.New("order limit expired")
|
|
}
|
|
return nil
|
|
}()
|
|
if rejectErr != err {
|
|
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(err), zap.Error(rejectErr))
|
|
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
|
|
SerialNumber: orderLimit.SerialNumber,
|
|
Status: pb.SettlementResponse_REJECTED,
|
|
})
|
|
if err != nil {
|
|
return formatError(err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
bucketID, err := endpoint.DB.UseSerialNumber(ctx, orderLimit.SerialNumber, orderLimit.StorageNodeId)
|
|
if err != nil {
|
|
log.Warn("unable to use serial number", zap.Error(err))
|
|
if ErrUsingSerialNumber.Has(err) {
|
|
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
|
|
SerialNumber: orderLimit.SerialNumber,
|
|
Status: pb.SettlementResponse_REJECTED,
|
|
})
|
|
if err != nil {
|
|
return formatError(err)
|
|
}
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
now := time.Now().UTC()
|
|
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
|
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := endpoint.DB.UpdateBucketBandwidthSettle(ctx, *projectID, bucketName, orderLimit.Action, order.Amount, intervalStart); err != nil {
|
|
// TODO: we should use the serial number in the same transaction we settle the bandwidth? that way we don't need this undo in an error case?
|
|
if err := endpoint.DB.UnuseSerialNumber(ctx, orderLimit.SerialNumber, orderLimit.StorageNodeId); err != nil {
|
|
log.Error("unable to unuse serial number", zap.Error(err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
// TODO: whoa this should also be in the same transaction
|
|
if err := endpoint.DB.UpdateStoragenodeBandwidthSettle(ctx, orderLimit.StorageNodeId, orderLimit.Action, order.Amount, intervalStart); err != nil {
|
|
if err := endpoint.DB.UnuseSerialNumber(ctx, orderLimit.SerialNumber, orderLimit.StorageNodeId); err != nil {
|
|
log.Error("unable to unuse serial number", zap.Error(err))
|
|
}
|
|
if err := endpoint.DB.UpdateBucketBandwidthSettle(ctx, *projectID, bucketName, orderLimit.Action, -order.Amount, intervalStart); err != nil {
|
|
log.Error("unable to rollback bucket bandwidth", zap.Error(err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
err = monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
|
|
SerialNumber: orderLimit.SerialNumber,
|
|
Status: pb.SettlementResponse_ACCEPTED,
|
|
})
|
|
if err != nil {
|
|
return formatError(err)
|
|
}
|
|
|
|
// TODO: in fact, why don't we batch these into group transactions as they come in from Recv() ?
|
|
}
|
|
}
|