2019-03-28 20:09:23 +00:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package orders
|
|
|
|
|
|
|
|
import (
|
2019-06-25 16:58:42 +01:00
|
|
|
"bytes"
|
2019-03-28 20:09:23 +00:00
|
|
|
"context"
|
2019-07-11 23:44:47 +01:00
|
|
|
"math"
|
2020-05-14 16:45:35 +01:00
|
|
|
mathrand "math/rand"
|
2020-01-27 20:01:37 +00:00
|
|
|
"sync"
|
2019-03-28 20:09:23 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/zeebo/errs"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2019-12-27 11:48:47 +00:00
|
|
|
"storj.io/common/pb"
|
|
|
|
"storj.io/common/signing"
|
|
|
|
"storj.io/common/storj"
|
2020-03-30 10:08:50 +01:00
|
|
|
"storj.io/common/uuid"
|
2019-07-28 06:55:36 +01:00
|
|
|
"storj.io/storj/satellite/overlay"
|
2020-02-21 14:07:29 +00:00
|
|
|
"storj.io/uplink/private/eestream"
|
2019-03-28 20:09:23 +00:00
|
|
|
)
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces.
|
2019-12-04 21:24:36 +00:00
|
|
|
var ErrDownloadFailedNotEnoughPieces = errs.Class("not enough pieces for download")
|
|
|
|
|
2019-06-21 11:38:40 +01:00
|
|
|
// Config is a configuration struct for orders Service.
|
|
|
|
type Config struct {
|
satellite/orders: 3-phase rollout
This adds a config flag orders.window-endpoint-rollout-phase
that can take on the values phase1, phase2 or phase3.
In phase1, the current orders endpoint continues to work as
usual, and the windowed orders endpoint uses the same backend
as the current one (but also does a bit extra).
In phase2, the current orders endpoint is disabled and the
windowed orders endpoint continues to use the same backend.
In phase3, the current orders endpoint is still disabled and
the windowed orders endpoint uses the new backend that requires
much less database traffic and state.
The intention is to deploy in phase1, roll out code to nodes
to have them use the windowed endpoint, switch to phase2, wait
a couple days for all existing orders to expire, then switch
to phase3.
Additionally, it fixes a bug where a node could submit a bunch
of orders and rack up charges for a bucket.
Change-Id: Ifdc10e09ae1645159cbec7ace687dcb2d594c76d
2020-07-21 17:53:32 +01:00
|
|
|
Expiration time.Duration `help:"how long until an order expires" default:"48h"` // 2 days
|
|
|
|
SettlementBatchSize int `help:"how many orders to batch per transaction" default:"250"`
|
|
|
|
FlushBatchSize int `help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"10000"`
|
|
|
|
FlushInterval time.Duration `help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m"`
|
|
|
|
ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"`
|
|
|
|
NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false"`
|
|
|
|
WindowEndpointRolloutPhase WindowEndpointRolloutPhase `help:"rollout phase for the windowed endpoint" default:"phase1"`
|
2019-06-21 11:38:40 +01:00
|
|
|
}
|
|
|
|
|
2020-06-25 15:47:44 +01:00
|
|
|
// BucketsDB returns information about buckets.
|
|
|
|
type BucketsDB interface {
|
|
|
|
// GetBucketID returns an existing bucket id.
|
|
|
|
GetBucketID(ctx context.Context, bucketName []byte, projectID uuid.UUID) (id uuid.UUID, err error)
|
|
|
|
}
|
|
|
|
|
2019-03-28 20:09:23 +00:00
|
|
|
// Service for creating order limits.
|
2019-09-10 14:24:16 +01:00
|
|
|
//
|
|
|
|
// architecture: Service
|
2019-03-28 20:09:23 +00:00
|
|
|
type Service struct {
|
2020-07-24 12:08:58 +01:00
|
|
|
log *zap.Logger
|
|
|
|
satellite signing.Signer
|
|
|
|
overlay *overlay.Service
|
|
|
|
orders DB
|
2020-06-25 15:47:44 +01:00
|
|
|
buckets BucketsDB
|
2020-07-24 12:08:58 +01:00
|
|
|
satelliteAddress *pb.NodeAddress
|
|
|
|
orderExpiration time.Duration
|
|
|
|
rngMu sync.Mutex
|
|
|
|
rng *mathrand.Rand
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewService creates new service for creating order limits.
|
2019-07-11 23:44:47 +01:00
|
|
|
func NewService(
|
2019-08-06 17:35:59 +01:00
|
|
|
log *zap.Logger, satellite signing.Signer, overlay *overlay.Service,
|
2020-06-25 15:47:44 +01:00
|
|
|
orders DB, buckets BucketsDB,
|
|
|
|
orderExpiration time.Duration, satelliteAddress *pb.NodeAddress,
|
2019-07-11 23:44:47 +01:00
|
|
|
) *Service {
|
2019-03-28 20:09:23 +00:00
|
|
|
return &Service{
|
2020-07-24 12:08:58 +01:00
|
|
|
log: log,
|
|
|
|
satellite: satellite,
|
|
|
|
overlay: overlay,
|
|
|
|
orders: orders,
|
2020-06-25 15:47:44 +01:00
|
|
|
buckets: buckets,
|
2020-07-24 12:08:58 +01:00
|
|
|
satelliteAddress: satelliteAddress,
|
|
|
|
orderExpiration: orderExpiration,
|
|
|
|
|
|
|
|
rng: mathrand.New(mathrand.NewSource(time.Now().UnixNano())),
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// VerifyOrderLimitSignature verifies that the signature inside order limit belongs to the satellite.
|
2019-07-01 16:54:11 +01:00
|
|
|
func (service *Service) VerifyOrderLimitSignature(ctx context.Context, signed *pb.OrderLimit) (err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-06-05 14:47:01 +01:00
|
|
|
return signing.VerifyOrderLimitSignature(ctx, service.satellite, signed)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2019-06-04 12:55:38 +01:00
|
|
|
func (service *Service) saveSerial(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, expiresAt time.Time) (err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-28 20:09:23 +00:00
|
|
|
return service.orders.CreateSerialInfo(ctx, serialNumber, bucketID, expiresAt)
|
|
|
|
}
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
func (service *Service) updateBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, addressedOrderLimits ...*pb.AddressedOrderLimit) (err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-01 21:14:58 +01:00
|
|
|
if len(addressedOrderLimits) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
2019-06-10 15:58:28 +01:00
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
var action pb.PieceAction
|
2019-06-10 15:58:28 +01:00
|
|
|
|
|
|
|
var bucketAllocation int64
|
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
for _, addressedOrderLimit := range addressedOrderLimits {
|
2020-01-27 20:01:37 +00:00
|
|
|
if addressedOrderLimit != nil && addressedOrderLimit.Limit != nil {
|
2019-04-01 21:14:58 +01:00
|
|
|
orderLimit := addressedOrderLimit.Limit
|
|
|
|
action = orderLimit.Action
|
2019-06-10 15:58:28 +01:00
|
|
|
bucketAllocation += orderLimit.Limit
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
}
|
2019-06-10 15:58:28 +01:00
|
|
|
|
2019-04-09 20:12:58 +01:00
|
|
|
now := time.Now().UTC()
|
2019-04-04 16:20:59 +01:00
|
|
|
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
|
2019-04-01 21:14:58 +01:00
|
|
|
|
2019-06-12 16:00:29 +01:00
|
|
|
// TODO: all of this below should be a single db transaction. in fact, this whole function should probably be part of an existing transaction
|
2019-06-25 16:58:42 +01:00
|
|
|
if err := service.orders.UpdateBucketBandwidthAllocation(ctx, projectID, bucketName, action, bucketAllocation, intervalStart); err != nil {
|
2019-06-10 15:58:28 +01:00
|
|
|
return Error.Wrap(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
2019-06-10 15:58:28 +01:00
|
|
|
|
2019-04-01 21:14:58 +01:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-03-28 20:09:23 +00:00
|
|
|
// CreateGetOrderLimits creates the order limits for downloading the pieces of pointer.
|
2019-07-11 21:51:40 +01:00
|
|
|
func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-07-11 21:51:40 +01:00
|
|
|
|
2019-03-28 20:09:23 +00:00
|
|
|
rootPieceID := pointer.GetRemote().RootPieceId
|
2019-07-09 22:54:00 +01:00
|
|
|
pieceExpiration := pointer.ExpirationDate
|
|
|
|
orderExpiration := time.Now().Add(service.orderExpiration)
|
2019-03-28 20:09:23 +00:00
|
|
|
|
2019-07-11 21:51:40 +01:00
|
|
|
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
serialNumber, err := createSerial(orderExpiration)
|
2019-03-28 20:09:23 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
|
|
|
|
2020-03-13 18:01:48 +00:00
|
|
|
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
|
|
|
|
for i, piece := range pointer.GetRemote().GetRemotePieces() {
|
|
|
|
nodeIDs[i] = piece.NodeId
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
|
2020-03-13 18:01:48 +00:00
|
|
|
if err != nil {
|
|
|
|
service.log.Debug("error getting nodes from overlay", zap.Error(err))
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
var nodeErrors errs.Group
|
2019-03-28 20:09:23 +00:00
|
|
|
var limits []*pb.AddressedOrderLimit
|
|
|
|
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
2020-03-13 18:01:48 +00:00
|
|
|
node, ok := nodes[piece.NodeId]
|
|
|
|
if !ok {
|
2020-03-30 14:32:02 +01:00
|
|
|
nodeErrors.Add(errs.New("node %q is not reliable", piece.NodeId))
|
2019-03-29 08:53:43 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-01-27 20:01:37 +00:00
|
|
|
orderLimit := &pb.OrderLimit{
|
2019-06-21 10:19:52 +01:00
|
|
|
SerialNumber: serialNumber,
|
|
|
|
SatelliteId: service.satellite.ID(),
|
|
|
|
SatelliteAddress: service.satelliteAddress,
|
2019-07-11 21:51:40 +01:00
|
|
|
UplinkPublicKey: piecePublicKey,
|
2019-06-21 10:19:52 +01:00
|
|
|
StorageNodeId: piece.NodeId,
|
2019-07-03 17:53:15 +01:00
|
|
|
PieceId: rootPieceID.Derive(piece.NodeId, piece.PieceNum),
|
2019-06-21 10:19:52 +01:00
|
|
|
Action: pb.PieceAction_GET,
|
|
|
|
Limit: pieceSize,
|
2019-07-09 22:54:00 +01:00
|
|
|
PieceExpiration: pieceExpiration,
|
2019-07-02 17:06:12 +01:00
|
|
|
OrderCreation: time.Now(),
|
2019-06-21 10:19:52 +01:00
|
|
|
OrderExpiration: orderExpiration,
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-03-06 22:04:23 +00:00
|
|
|
// use the lastIP that we have on record to avoid doing extra DNS resolutions
|
|
|
|
if node.LastIPPort != "" {
|
|
|
|
node.Address.Address = node.LastIPPort
|
|
|
|
}
|
2019-03-28 20:09:23 +00:00
|
|
|
limits = append(limits, &pb.AddressedOrderLimit{
|
|
|
|
Limit: orderLimit,
|
2020-03-06 22:04:23 +00:00
|
|
|
StorageNodeAddress: node.Address,
|
2019-03-28 20:09:23 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(limits) < redundancy.RequiredCount() {
|
2019-12-04 21:24:36 +00:00
|
|
|
mon.Meter("download_failed_not_enough_pieces_uplink").Mark(1) //locked
|
2019-03-28 20:09:23 +00:00
|
|
|
err = Error.New("not enough nodes available: got %d, required %d", len(limits), redundancy.RequiredCount())
|
2020-03-30 14:32:02 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, ErrDownloadFailedNotEnoughPieces.Wrap(errs.Combine(err, nodeErrors.Err()))
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2019-07-09 22:54:00 +01:00
|
|
|
err = service.saveSerial(ctx, serialNumber, bucketID, orderExpiration)
|
2019-03-29 09:53:53 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2020-01-27 20:01:37 +00:00
|
|
|
neededLimits := pb.NewRedundancySchemeToStorj(pointer.GetRemote().GetRedundancy()).DownloadNodes()
|
|
|
|
if int(neededLimits) < redundancy.RequiredCount() {
|
|
|
|
err = Error.New("not enough needed node orderlimits: got %d, required %d", neededLimits, redundancy.RequiredCount())
|
2020-03-30 14:32:02 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, ErrDownloadFailedNotEnoughPieces.Wrap(errs.Combine(err, nodeErrors.Err()))
|
2020-01-27 20:01:37 +00:00
|
|
|
}
|
|
|
|
// an orderLimit was created for each piece, but lets only use
|
|
|
|
// the number of orderLimits actually needed to do the download
|
|
|
|
limits, err = service.RandomSampleOfOrderLimits(limits, int(neededLimits))
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, limit := range limits {
|
|
|
|
if limit == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
orderLimit, err := signing.SignOrderLimit(ctx, service.satellite, limit.Limit)
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
limits[i].Limit = orderLimit
|
|
|
|
}
|
2019-06-25 16:58:42 +01:00
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-06-25 16:58:42 +01:00
|
|
|
}
|
2020-04-02 13:30:43 +01:00
|
|
|
if err := service.updateBandwidth(ctx, projectID, bucketName, limits...); err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
|
2019-07-11 21:51:40 +01:00
|
|
|
return limits, piecePrivateKey, nil
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// RandomSampleOfOrderLimits returns a random sample of the order limits.
|
2020-01-27 20:01:37 +00:00
|
|
|
func (service *Service) RandomSampleOfOrderLimits(limits []*pb.AddressedOrderLimit, sampleSize int) ([]*pb.AddressedOrderLimit, error) {
|
|
|
|
service.rngMu.Lock()
|
|
|
|
perm := service.rng.Perm(len(limits))
|
|
|
|
service.rngMu.Unlock()
|
|
|
|
|
|
|
|
// the sample slice is the same size as the limits slice since that represents all
|
|
|
|
// of the pieces of a pointer in the correct order and we want to maintain the order
|
|
|
|
var sample = make([]*pb.AddressedOrderLimit, len(limits))
|
|
|
|
for _, i := range perm {
|
|
|
|
limit := limits[i]
|
|
|
|
sample[i] = limit
|
|
|
|
|
|
|
|
sampleSize--
|
|
|
|
if sampleSize <= 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return sample, nil
|
|
|
|
}
|
|
|
|
|
2019-03-28 20:09:23 +00:00
|
|
|
// CreatePutOrderLimits creates the order limits for uploading pieces to nodes.
|
2020-07-24 19:57:11 +01:00
|
|
|
func (service *Service) CreatePutOrderLimits(ctx context.Context, bucketID []byte, nodes []*overlay.SelectedNode, pieceExpiration time.Time, maxPieceSize int64) (_ storj.PieceID, _ []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-07-09 22:54:00 +01:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
orderCreation := time.Now()
|
|
|
|
orderExpiration := orderCreation.Add(service.orderExpiration)
|
2019-07-11 21:51:40 +01:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
signer, err := NewSignerPut(service, pieceExpiration, orderCreation, orderExpiration, maxPieceSize)
|
2019-03-28 20:09:23 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
for pieceNum, node := range nodes {
|
|
|
|
address := node.Address.Address
|
2020-03-06 22:04:23 +00:00
|
|
|
if node.LastIPPort != "" {
|
2020-07-24 19:57:11 +01:00
|
|
|
address = node.LastIPPort
|
2020-03-06 22:04:23 +00:00
|
|
|
}
|
2020-07-24 19:57:11 +01:00
|
|
|
_, err := signer.Sign(ctx, storj.NodeURL{ID: node.ID, Address: address}, int32(pieceNum))
|
|
|
|
if err != nil {
|
|
|
|
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
err = service.saveSerial(ctx, signer.Serial, bucketID, signer.OrderExpiration)
|
2019-03-29 09:53:53 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-06-25 16:58:42 +01:00
|
|
|
}
|
2020-07-24 19:57:11 +01:00
|
|
|
if err := service.updateBandwidth(ctx, projectID, bucketName, signer.AddressedLimits...); err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
return signer.RootPieceID, signer.AddressedLimits, signer.PrivateKey, nil
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// CreateDeleteOrderLimits creates the order limits for deleting the pieces of pointer.
|
2019-07-11 21:51:40 +01:00
|
|
|
func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-07-11 21:51:40 +01:00
|
|
|
|
2020-03-13 18:01:48 +00:00
|
|
|
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
|
|
|
|
for i, piece := range pointer.GetRemote().GetRemotePieces() {
|
|
|
|
nodeIDs[i] = piece.NodeId
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
|
2020-03-13 18:01:48 +00:00
|
|
|
if err != nil {
|
|
|
|
service.log.Debug("error getting nodes from overlay", zap.Error(err))
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
orderCreation := time.Now()
|
|
|
|
orderExpiration := orderCreation.Add(service.orderExpiration)
|
|
|
|
|
|
|
|
signer, err := NewSignerDelete(service, pointer.GetRemote().RootPieceId, pointer.ExpirationDate, orderCreation, orderExpiration)
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
var nodeErrors errs.Group
|
2019-03-28 20:09:23 +00:00
|
|
|
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
2020-03-13 18:01:48 +00:00
|
|
|
node, ok := nodes[piece.NodeId]
|
|
|
|
if !ok {
|
2020-03-30 14:32:02 +01:00
|
|
|
nodeErrors.Add(errs.New("node %q is not reliable", piece.NodeId))
|
2019-03-29 08:53:43 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
address := node.Address.Address
|
|
|
|
if node.LastIPPort != "" {
|
|
|
|
address = node.LastIPPort
|
|
|
|
}
|
|
|
|
_, err := signer.Sign(ctx, storj.NodeURL{ID: piece.NodeId, Address: address}, piece.PieceNum)
|
2019-03-28 20:09:23 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
if len(signer.AddressedLimits) == 0 {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.New("failed creating order limits: %w", nodeErrors.Err())
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
err = service.saveSerial(ctx, signer.Serial, bucketID, signer.OrderExpiration)
|
2019-03-29 09:53:53 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
return signer.AddressedLimits, signer.PrivateKey, nil
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// CreateAuditOrderLimits creates the order limits for auditing the pieces of pointer.
|
2019-07-11 21:51:40 +01:00
|
|
|
func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, skip map[storj.NodeID]bool) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2020-07-24 19:57:11 +01:00
|
|
|
|
2019-03-28 20:09:23 +00:00
|
|
|
redundancy := pointer.GetRemote().GetRedundancy()
|
|
|
|
shareSize := redundancy.GetErasureShareSize()
|
|
|
|
totalPieces := redundancy.GetTotal()
|
2019-07-11 21:51:40 +01:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
orderCreation := time.Now()
|
|
|
|
orderExpiration := orderCreation.Add(service.orderExpiration)
|
2019-03-28 20:09:23 +00:00
|
|
|
|
2020-03-13 18:01:48 +00:00
|
|
|
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
|
|
|
|
for i, piece := range pointer.GetRemote().GetRemotePieces() {
|
|
|
|
nodeIDs[i] = piece.NodeId
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
|
2020-03-13 18:01:48 +00:00
|
|
|
if err != nil {
|
|
|
|
service.log.Debug("error getting nodes from overlay", zap.Error(err))
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
signer, err := NewSignerAudit(service, pointer.GetRemote().RootPieceId, orderCreation, orderExpiration, int64(shareSize))
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
var nodeErrors errs.Group
|
2019-03-28 20:09:23 +00:00
|
|
|
var limitsCount int32
|
|
|
|
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
|
|
|
for _, piece := range pointer.GetRemote().GetRemotePieces() {
|
2019-05-27 12:13:47 +01:00
|
|
|
if skip[piece.NodeId] {
|
|
|
|
continue
|
|
|
|
}
|
2020-03-13 18:01:48 +00:00
|
|
|
node, ok := nodes[piece.NodeId]
|
|
|
|
if !ok {
|
2020-03-30 14:32:02 +01:00
|
|
|
nodeErrors.Add(errs.New("node %q is not reliable", piece.NodeId))
|
2019-03-29 08:53:43 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
limit, err := signer.Sign(ctx, storj.NodeURL{
|
|
|
|
ID: piece.NodeId,
|
|
|
|
Address: node.Address.Address,
|
|
|
|
}, piece.PieceNum)
|
2019-03-28 20:09:23 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
limits[piece.GetPieceNum()] = limit
|
2019-03-28 20:09:23 +00:00
|
|
|
limitsCount++
|
|
|
|
}
|
2019-03-29 08:53:43 +00:00
|
|
|
|
2019-03-28 20:09:23 +00:00
|
|
|
if limitsCount < redundancy.GetMinReq() {
|
|
|
|
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.GetMinReq())
|
2020-03-30 14:32:02 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, errs.Combine(err, nodeErrors.Err())
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
err = service.saveSerial(ctx, signer.Serial, bucketID, signer.OrderExpiration)
|
2019-03-29 09:53:53 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return limits, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-06-25 16:58:42 +01:00
|
|
|
}
|
2020-04-02 13:30:43 +01:00
|
|
|
if err := service.updateBandwidth(ctx, projectID, bucketName, limits...); err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
return limits, signer.PrivateKey, nil
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2019-05-27 12:13:47 +01:00
|
|
|
// CreateAuditOrderLimit creates an order limit for auditing a single the piece from a pointer.
|
2019-07-11 21:51:40 +01:00
|
|
|
func (service *Service) CreateAuditOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
2019-07-03 17:53:15 +01:00
|
|
|
// TODO reduce number of params ?
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-07-09 22:54:00 +01:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
orderCreation := time.Now()
|
|
|
|
orderExpiration := orderCreation.Add(service.orderExpiration)
|
2019-05-27 12:13:47 +01:00
|
|
|
|
2019-08-06 17:35:59 +01:00
|
|
|
node, err := service.overlay.Get(ctx, nodeID)
|
2019-05-27 12:13:47 +01:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-05-27 12:13:47 +01:00
|
|
|
}
|
2019-06-24 15:46:10 +01:00
|
|
|
if node.Disqualified != nil {
|
2019-08-21 17:30:29 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, overlay.ErrNodeDisqualified.New("%v", nodeID)
|
2019-06-24 15:46:10 +01:00
|
|
|
}
|
2020-08-13 13:00:56 +01:00
|
|
|
if node.ExitStatus.ExitFinishedAt != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, overlay.ErrNodeFinishedGE.New("%v", nodeID)
|
|
|
|
}
|
2019-08-06 17:35:59 +01:00
|
|
|
if !service.overlay.IsOnline(node) {
|
2019-08-21 17:30:29 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, overlay.ErrNodeOffline.New("%v", nodeID)
|
2019-05-27 12:13:47 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
signer, err := NewSignerAudit(service, rootPieceID, orderCreation, orderExpiration, int64(shareSize))
|
2019-05-27 12:13:47 +01:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-05-27 12:13:47 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
orderLimit, err := signer.Sign(ctx, storj.NodeURL{
|
|
|
|
ID: nodeID,
|
|
|
|
Address: node.Address.Address,
|
|
|
|
}, pieceNum)
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-05-27 12:13:47 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
err = service.saveSerial(ctx, signer.Serial, bucketID, signer.OrderExpiration)
|
2019-05-27 12:13:47 +01:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-05-27 12:13:47 +01:00
|
|
|
}
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
|
|
if err != nil {
|
2020-07-24 19:57:11 +01:00
|
|
|
return orderLimit, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-06-25 16:58:42 +01:00
|
|
|
}
|
2020-04-02 13:30:43 +01:00
|
|
|
if err := service.updateBandwidth(ctx, projectID, bucketName, limit); err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-05-27 12:13:47 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
return orderLimit, signer.PrivateKey, nil
|
2019-05-27 12:13:47 +01:00
|
|
|
}
|
|
|
|
|
2019-07-11 23:44:47 +01:00
|
|
|
// CreateGetRepairOrderLimits creates the order limits for downloading the
|
|
|
|
// healthy pieces of pointer as the source for repair.
|
|
|
|
//
|
|
|
|
// The length of the returned orders slice is the total number of pieces of the
|
|
|
|
// segment, setting to null the ones which don't correspond to a healthy piece.
|
2019-03-28 20:09:23 +00:00
|
|
|
// CreateGetRepairOrderLimits creates the order limits for downloading the healthy pieces of pointer as the source for repair.
|
2019-07-11 21:51:40 +01:00
|
|
|
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, healthy []*pb.RemotePiece) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-07-11 21:51:40 +01:00
|
|
|
|
2019-04-03 14:17:32 +01:00
|
|
|
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-04-03 14:17:32 +01:00
|
|
|
}
|
2019-07-11 21:51:40 +01:00
|
|
|
|
2019-04-03 14:17:32 +01:00
|
|
|
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
|
|
|
totalPieces := redundancy.TotalCount()
|
2020-07-24 19:57:11 +01:00
|
|
|
orderCreation := time.Now()
|
|
|
|
orderExpiration := orderCreation.Add(service.orderExpiration)
|
2019-03-28 20:09:23 +00:00
|
|
|
|
2020-03-13 18:01:48 +00:00
|
|
|
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
|
|
|
|
for i, piece := range pointer.GetRemote().GetRemotePieces() {
|
|
|
|
nodeIDs[i] = piece.NodeId
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
|
2020-03-13 18:01:48 +00:00
|
|
|
if err != nil {
|
|
|
|
service.log.Debug("error getting nodes from overlay", zap.Error(err))
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
signer, err := NewSignerRepairGet(service, pointer.GetRemote().RootPieceId, pointer.ExpirationDate, orderCreation, orderExpiration, pieceSize)
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-03-30 14:32:02 +01:00
|
|
|
var nodeErrors errs.Group
|
2019-04-03 14:17:32 +01:00
|
|
|
var limitsCount int
|
2019-03-28 20:09:23 +00:00
|
|
|
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
|
|
|
for _, piece := range healthy {
|
2020-03-13 18:01:48 +00:00
|
|
|
node, ok := nodes[piece.NodeId]
|
|
|
|
if !ok {
|
2020-03-30 14:32:02 +01:00
|
|
|
nodeErrors.Add(errs.New("node %q is not reliable", piece.NodeId))
|
2019-03-29 08:53:43 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
limit, err := signer.Sign(ctx, storj.NodeURL{
|
|
|
|
ID: piece.NodeId,
|
|
|
|
Address: node.Address.Address,
|
|
|
|
}, piece.PieceNum)
|
2019-03-28 20:09:23 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
limits[piece.GetPieceNum()] = limit
|
2019-04-03 14:17:32 +01:00
|
|
|
limitsCount++
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2019-04-03 14:17:32 +01:00
|
|
|
if limitsCount < redundancy.RequiredCount() {
|
|
|
|
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.RequiredCount())
|
2020-03-30 14:32:02 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, errs.Combine(err, nodeErrors.Err())
|
2019-03-29 08:53:43 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
err = service.saveSerial(ctx, signer.Serial, bucketID, signer.OrderExpiration)
|
2019-03-29 09:53:53 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return limits, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-06-25 16:58:42 +01:00
|
|
|
}
|
2020-04-02 13:30:43 +01:00
|
|
|
if err := service.updateBandwidth(ctx, projectID, bucketName, limits...); err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
return limits, signer.PrivateKey, nil
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of pointer to newNodes.
|
2020-07-24 12:08:58 +01:00
|
|
|
func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucketID []byte, pointer *pb.Pointer, getOrderLimits []*pb.AddressedOrderLimit, newNodes []*overlay.SelectedNode, optimalThresholdMultiplier float64) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-03-28 20:09:23 +00:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
orderCreation := time.Now()
|
|
|
|
orderExpiration := orderCreation.Add(service.orderExpiration)
|
|
|
|
|
|
|
|
// Create the order limits for being used to upload the repaired pieces
|
|
|
|
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
|
2019-07-11 21:51:40 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
2020-07-24 19:57:11 +01:00
|
|
|
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
|
|
|
|
|
|
|
|
totalPieces := redundancy.TotalCount()
|
|
|
|
totalPiecesAfterRepair := int(math.Ceil(float64(redundancy.OptimalThreshold()) * optimalThresholdMultiplier))
|
|
|
|
if totalPiecesAfterRepair > totalPieces {
|
|
|
|
totalPiecesAfterRepair = totalPieces
|
|
|
|
}
|
|
|
|
|
|
|
|
var numCurrentPieces int
|
|
|
|
for _, o := range getOrderLimits {
|
|
|
|
if o != nil {
|
|
|
|
numCurrentPieces++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
totalPiecesToRepair := totalPiecesAfterRepair - numCurrentPieces
|
2019-07-11 21:51:40 +01:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
limits := make([]*pb.AddressedOrderLimit, totalPieces)
|
|
|
|
signer, err := NewSignerRepairPut(service, pointer.GetRemote().RootPieceId, pointer.ExpirationDate, orderCreation, orderExpiration, pieceSize)
|
2019-03-28 20:09:23 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
var pieceNum int32
|
|
|
|
for _, node := range newNodes {
|
|
|
|
for int(pieceNum) < totalPieces && getOrderLimits[pieceNum] != nil {
|
|
|
|
pieceNum++
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
if int(pieceNum) >= totalPieces { // should not happen
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.New("piece num greater than total pieces: %d >= %d", pieceNum, totalPieces)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
limit, err := signer.Sign(ctx, storj.NodeURL{
|
|
|
|
ID: node.ID,
|
|
|
|
Address: node.Address.Address,
|
|
|
|
}, pieceNum)
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
limits[pieceNum] = limit
|
|
|
|
pieceNum++
|
|
|
|
totalPiecesToRepair--
|
2019-07-11 23:44:47 +01:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
if totalPiecesToRepair == 0 {
|
|
|
|
break
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
err = service.saveSerial(ctx, signer.Serial, bucketID, orderExpiration)
|
2019-03-29 09:53:53 +00:00
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-03-29 09:53:53 +00:00
|
|
|
}
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
|
|
if err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return limits, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-06-25 16:58:42 +01:00
|
|
|
}
|
2020-04-02 13:30:43 +01:00
|
|
|
if err := service.updateBandwidth(ctx, projectID, bucketName, limits...); err != nil {
|
2019-07-11 21:51:40 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-04-01 21:14:58 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
return limits, signer.PrivateKey, nil
|
2019-03-28 20:09:23 +00:00
|
|
|
}
|
2019-04-05 08:42:56 +01:00
|
|
|
|
2019-10-11 22:18:05 +01:00
|
|
|
// CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers.
|
|
|
|
func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, bucketID []byte, nodeID storj.NodeID, pieceNum int32, rootPieceID storj.PieceID, shareSize int32) (limit *pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
orderCreation := time.Now().UTC()
|
|
|
|
orderExpiration := orderCreation.Add(service.orderExpiration)
|
2019-10-11 22:18:05 +01:00
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
// should this use KnownReliable or similar?
|
2019-10-11 22:18:05 +01:00
|
|
|
node, err := service.overlay.Get(ctx, nodeID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
if node.Disqualified != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, overlay.ErrNodeDisqualified.New("%v", nodeID)
|
|
|
|
}
|
|
|
|
if !service.overlay.IsOnline(node) {
|
|
|
|
return nil, storj.PiecePrivateKey{}, overlay.ErrNodeOffline.New("%v", nodeID)
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
signer, err := NewSignerGracefulExit(service, rootPieceID, orderCreation, orderExpiration, shareSize)
|
2019-10-11 22:18:05 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
nodeURL := storj.NodeURL{ID: nodeID, Address: node.Address.Address}
|
|
|
|
limit, err = signer.Sign(ctx, nodeURL, pieceNum)
|
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
2019-10-11 22:18:05 +01:00
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
err = service.saveSerial(ctx, signer.Serial, bucketID, signer.OrderExpiration)
|
2019-10-11 22:18:05 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
projectID, bucketName, err := SplitBucketID(bucketID)
|
|
|
|
if err != nil {
|
|
|
|
return limit, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
2020-04-02 13:30:43 +01:00
|
|
|
if err := service.updateBandwidth(ctx, projectID, bucketName, limit); err != nil {
|
2019-10-11 22:18:05 +01:00
|
|
|
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
2020-07-24 19:57:11 +01:00
|
|
|
return limit, signer.PrivateKey, nil
|
2019-10-11 22:18:05 +01:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket.
|
2019-06-25 16:58:42 +01:00
|
|
|
func (service *Service) UpdateGetInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-09 20:12:58 +01:00
|
|
|
now := time.Now().UTC()
|
2019-04-05 08:42:56 +01:00
|
|
|
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
return service.orders.UpdateBucketBandwidthInline(ctx, projectID, bucketName, pb.PieceAction_GET, amount, intervalStart)
|
2019-04-05 08:42:56 +01:00
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// UpdatePutInlineOrder updates amount of inline PUT bandwidth for given bucket.
|
2019-06-25 16:58:42 +01:00
|
|
|
func (service *Service) UpdatePutInlineOrder(ctx context.Context, projectID uuid.UUID, bucketName []byte, amount int64) (err error) {
|
2019-06-04 12:55:38 +01:00
|
|
|
defer mon.Task()(&ctx)(&err)
|
2019-04-09 20:12:58 +01:00
|
|
|
now := time.Now().UTC()
|
2019-04-05 08:42:56 +01:00
|
|
|
intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
|
|
|
|
|
2019-06-25 16:58:42 +01:00
|
|
|
return service.orders.UpdateBucketBandwidthInline(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, intervalStart)
|
|
|
|
}
|
|
|
|
|
2020-07-16 15:18:02 +01:00
|
|
|
// SplitBucketID takes a bucketID, splits on /, and returns a projectID and bucketName.
|
2020-04-02 13:30:43 +01:00
|
|
|
func SplitBucketID(bucketID []byte) (projectID uuid.UUID, bucketName []byte, err error) {
|
2019-06-25 16:58:42 +01:00
|
|
|
pathElements := bytes.Split(bucketID, []byte("/"))
|
|
|
|
if len(pathElements) > 1 {
|
|
|
|
bucketName = pathElements[1]
|
|
|
|
}
|
2020-04-02 13:30:43 +01:00
|
|
|
projectID, err = uuid.FromString(string(pathElements[0]))
|
2019-06-25 16:58:42 +01:00
|
|
|
if err != nil {
|
2020-04-02 13:30:43 +01:00
|
|
|
return uuid.UUID{}, nil, err
|
2019-06-25 16:58:42 +01:00
|
|
|
}
|
|
|
|
return projectID, bucketName, nil
|
2019-04-05 08:42:56 +01:00
|
|
|
}
|