storj/satellite/orders/endpoint.go
aligeti 32f95a14fd
satellite/certdb: remove certdb that was used to store uplink certificates (#2760)
* satellitedb/certDB: refactors of the node certificate storage DB table

The existing implementation doesnt allow to store the complete certificate chain of uplinkIDs or storagenodeIDs, so the current table is dropped and new table will be added which addresses the storage and retrieval of certificates

pkg/identity: fixes spelling mistakes that I missed on PR#2754

Fixes V3-1992/V3-2388
2019-08-12 10:41:34 -04:00

222 lines
8.2 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"context"
"io"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
)
// DB implements saving order after receiving from storage node
type DB interface {
// CreateSerialInfo creates serial number entry in database
CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
// UseSerialNumber creates serial number entry in database
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
// UnuseSerialNumber removes pair serial number -> storage node id from database
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage nodes
UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNodes []storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
// GetBucketBandwidth gets total bucket bandwidth from period of time
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
}
var (
// Error the default orders errs class
Error = errs.Class("orders error")
// ErrUsingSerialNumber error class for serial number
ErrUsingSerialNumber = errs.Class("serial number")
mon = monkit.Package()
)
// Endpoint for orders receiving
type Endpoint struct {
log *zap.Logger
satelliteSignee signing.Signee
DB DB
}
// NewEndpoint new orders receiving endpoint
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB) *Endpoint {
return &Endpoint{
log: log,
satelliteSignee: satelliteSignee,
DB: db,
}
}
func monitoredSettlementStreamReceive(ctx context.Context, stream pb.Orders_SettlementServer) (_ *pb.SettlementRequest, err error) {
defer mon.Task()(&ctx)(&err)
return stream.Recv()
}
func monitoredSettlementStreamSend(ctx context.Context, stream pb.Orders_SettlementServer, resp *pb.SettlementResponse) (err error) {
defer mon.Task()(&ctx)(&err)
switch resp.Status {
case pb.SettlementResponse_ACCEPTED:
mon.Event("settlement_response_accepted")
case pb.SettlementResponse_REJECTED:
mon.Event("settlement_response_rejected")
default:
mon.Event("settlement_response_unknown")
}
return stream.Send(resp)
}
// Settlement receives and handles orders.
func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return status.Error(codes.Unauthenticated, err.Error())
}
formatError := func(err error) error {
if err == io.EOF {
return nil
}
return status.Error(codes.Unknown, err.Error())
}
log := endpoint.log.Named(peer.ID.String())
log.Debug("Settlement")
for {
// TODO: batch these requests so we hit the db in batches
request, err := monitoredSettlementStreamReceive(ctx, stream)
if err != nil {
return formatError(err)
}
if request == nil {
return status.Error(codes.InvalidArgument, "request missing")
}
if request.Limit == nil {
return status.Error(codes.InvalidArgument, "order limit missing")
}
if request.Order == nil {
return status.Error(codes.InvalidArgument, "order missing")
}
orderLimit := request.Limit
order := request.Order
if orderLimit.StorageNodeId != peer.ID {
return status.Error(codes.Unauthenticated, "only specified storage node can settle order")
}
rejectErr := func() error {
// satellite verifies that it signed the order limit
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
return Error.New("unable to verify order limit")
}
// satellite verifies that the order signature matches pub key in order limit
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
return Error.New("unable to verify order")
}
// TODO should this reject or just error ??
if orderLimit.SerialNumber != order.SerialNumber {
return Error.New("invalid serial number")
}
if orderLimit.OrderExpiration.Before(time.Now()) {
return Error.New("order limit expired")
}
return nil
}()
if rejectErr != err {
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(err), zap.Error(rejectErr))
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
SerialNumber: orderLimit.SerialNumber,
Status: pb.SettlementResponse_REJECTED,
})
if err != nil {
return formatError(err)
}
continue
}
bucketID, err := endpoint.DB.UseSerialNumber(ctx, orderLimit.SerialNumber, orderLimit.StorageNodeId)
if err != nil {
log.Warn("unable to use serial number", zap.Error(err))
if ErrUsingSerialNumber.Has(err) {
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
SerialNumber: orderLimit.SerialNumber,
Status: pb.SettlementResponse_REJECTED,
})
if err != nil {
return formatError(err)
}
continue
}
return err
}
now := time.Now().UTC()
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
projectID, bucketName, err := SplitBucketID(bucketID)
if err != nil {
return err
}
if err := endpoint.DB.UpdateBucketBandwidthSettle(ctx, *projectID, bucketName, orderLimit.Action, order.Amount, intervalStart); err != nil {
// TODO: we should use the serial number in the same transaction we settle the bandwidth? that way we don't need this undo in an error case?
if err := endpoint.DB.UnuseSerialNumber(ctx, orderLimit.SerialNumber, orderLimit.StorageNodeId); err != nil {
log.Error("unable to unuse serial number", zap.Error(err))
}
return err
}
// TODO: whoa this should also be in the same transaction
if err := endpoint.DB.UpdateStoragenodeBandwidthSettle(ctx, orderLimit.StorageNodeId, orderLimit.Action, order.Amount, intervalStart); err != nil {
if err := endpoint.DB.UnuseSerialNumber(ctx, orderLimit.SerialNumber, orderLimit.StorageNodeId); err != nil {
log.Error("unable to unuse serial number", zap.Error(err))
}
if err := endpoint.DB.UpdateBucketBandwidthSettle(ctx, *projectID, bucketName, orderLimit.Action, -order.Amount, intervalStart); err != nil {
log.Error("unable to rollback bucket bandwidth", zap.Error(err))
}
return err
}
err = monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
SerialNumber: orderLimit.SerialNumber,
Status: pb.SettlementResponse_ACCEPTED,
})
if err != nil {
return formatError(err)
}
// TODO: in fact, why don't we batch these into group transactions as they come in from Recv() ?
}
}