2019-03-28 20:09:23 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"context"
2019-07-11 23:44:47 +01:00
"math"
2020-05-14 16:45:35 +01:00
mathrand "math/rand"
2020-01-27 20:01:37 +00:00
"sync"
2019-03-28 20:09:23 +00:00
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
2019-12-27 11:48:47 +00:00
"storj.io/common/pb"
"storj.io/common/signing"
"storj.io/common/storj"
2021-01-08 16:04:46 +00:00
"storj.io/storj/satellite/internalpb"
2021-04-21 13:42:57 +01:00
"storj.io/storj/satellite/metabase"
2019-07-28 06:55:36 +01:00
"storj.io/storj/satellite/overlay"
2019-03-28 20:09:23 +00:00
)
2020-11-18 21:39:13 +00:00
var (
// ErrDownloadFailedNotEnoughPieces is returned when download failed due to missing pieces.
ErrDownloadFailedNotEnoughPieces = errs . Class ( "not enough pieces for download" )
// ErrDecryptOrderMetadata is returned when a step of decrypting metadata fails.
ErrDecryptOrderMetadata = errs . Class ( "decrytping order metadata" )
)
2019-12-04 21:24:36 +00:00
2019-06-21 11:38:40 +01:00
// Config is a configuration struct for orders Service.
type Config struct {
2021-01-22 13:51:29 +00:00
EncryptionKeys EncryptionKeys ` help:"encryption keys to encrypt info in orders" default:"" `
2022-11-17 09:00:52 +00:00
Expiration time . Duration ` help:"how long until an order expires" default:"24h" testDefault:"168h" ` // default is 1 day
testplanet/satellite: reduce the number of places default values need to be configured
Satellites set their configuration values to default values using
cfgstruct, however, it turns out our tests don't test these values
at all! Instead, they have a completely separate definition system
that is easy to forget about.
As is to be expected, these values have drifted, and it appears
in a few cases test planet is testing unreasonable values that we
won't see in production, or perhaps worse, features enabled in
production were missed and weren't enabled in testplanet.
This change makes it so all values are configured the same,
systematic way, so it's easy to see when test values are different
than dev values or release values, and it's less hard to forget
to enable features in testplanet.
In terms of reviewing, this change should be actually fairly
easy to review, considering private/testplanet/satellite.go keeps
the current config system and the new one and confirms that they
result in identical configurations, so you can be certain that
nothing was missed and the config is all correct.
You can also check the config lock to see what actual config
values changed.
Change-Id: I6715d0794887f577e21742afcf56fd2b9d12170e
2021-05-31 22:15:00 +01:00
FlushBatchSize int ` help:"how many items in the rollups write cache before they are flushed to the database" devDefault:"20" releaseDefault:"1000" testDefault:"10" `
FlushInterval time . Duration ` help:"how often to flush the rollups write cache to the database" devDefault:"30s" releaseDefault:"1m" testDefault:"$TESTINTERVAL" `
NodeStatusLogging bool ` hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false" testDefault:"true" `
2021-01-22 13:51:29 +00:00
OrdersSemaphoreSize int ` help:"how many concurrent orders to process at once. zero is unlimited" default:"2" `
2019-06-21 11:38:40 +01:00
}
2023-01-31 10:36:33 +00:00
// Overlay defines the overlay dependency of orders.Service.
// use `go install github.com/golang/mock/mockgen@v1.6.0` if missing
//
//go:generate mockgen -destination mock_test.go -package orders . OverlayForOrders
type Overlay interface {
CachedGetOnlineNodesForGet ( context . Context , [ ] storj . NodeID ) ( map [ storj . NodeID ] * overlay . SelectedNode , error )
GetOnlineNodesForAuditRepair ( context . Context , [ ] storj . NodeID ) ( map [ storj . NodeID ] * overlay . NodeReputation , error )
Get ( ctx context . Context , nodeID storj . NodeID ) ( * overlay . NodeDossier , error )
IsOnline ( node * overlay . NodeDossier ) bool
}
2019-03-28 20:09:23 +00:00
// Service for creating order limits.
2019-09-10 14:24:16 +01:00
//
// architecture: Service
2019-03-28 20:09:23 +00:00
type Service struct {
2020-08-27 15:30:04 +01:00
log * zap . Logger
satellite signing . Signer
2023-01-31 10:36:33 +00:00
overlay Overlay
2020-08-27 15:30:04 +01:00
orders DB
2020-12-18 18:16:20 +00:00
encryptionKeys EncryptionKeys
2020-07-24 18:13:15 +01:00
2021-01-08 17:25:39 +00:00
orderExpiration time . Duration
2020-08-27 15:30:04 +01:00
rngMu sync . Mutex
rng * mathrand . Rand
2019-03-28 20:09:23 +00:00
}
// NewService creates new service for creating order limits.
2019-07-11 23:44:47 +01:00
func NewService (
2023-01-31 10:36:33 +00:00
log * zap . Logger , satellite signing . Signer , overlay Overlay ,
2022-09-28 09:56:11 +01:00
orders DB , config Config ,
2020-07-24 18:13:15 +01:00
) ( * Service , error ) {
2020-12-18 18:16:20 +00:00
if config . EncryptionKeys . Default . IsZero ( ) {
2020-07-24 18:13:15 +01:00
return nil , Error . New ( "encryption keys must be specified to include encrypted metadata" )
}
2019-03-28 20:09:23 +00:00
return & Service {
2020-08-27 15:30:04 +01:00
log : log ,
satellite : satellite ,
overlay : overlay ,
orders : orders ,
2020-12-18 18:16:20 +00:00
encryptionKeys : config . EncryptionKeys ,
2020-07-24 18:13:15 +01:00
2021-01-08 17:25:39 +00:00
orderExpiration : config . Expiration ,
2020-07-24 12:08:58 +01:00
rng : mathrand . New ( mathrand . NewSource ( time . Now ( ) . UnixNano ( ) ) ) ,
2020-07-24 18:13:15 +01:00
} , nil
2019-03-28 20:09:23 +00:00
}
// VerifyOrderLimitSignature verifies that the signature inside order limit belongs to the satellite.
2019-07-01 16:54:11 +01:00
func ( service * Service ) VerifyOrderLimitSignature ( ctx context . Context , signed * pb . OrderLimit ) ( err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-06-05 14:47:01 +01:00
return signing . VerifyOrderLimitSignature ( ctx , service . satellite , signed )
2019-03-28 20:09:23 +00:00
}
2020-08-28 12:56:09 +01:00
func ( service * Service ) updateBandwidth ( ctx context . Context , bucket metabase . BucketLocation , addressedOrderLimits ... * pb . AddressedOrderLimit ) ( err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-04-01 21:14:58 +01:00
if len ( addressedOrderLimits ) == 0 {
return nil
}
2019-06-10 15:58:28 +01:00
2019-04-01 21:14:58 +01:00
var action pb . PieceAction
2019-06-10 15:58:28 +01:00
var bucketAllocation int64
2019-04-01 21:14:58 +01:00
for _ , addressedOrderLimit := range addressedOrderLimits {
2020-01-27 20:01:37 +00:00
if addressedOrderLimit != nil && addressedOrderLimit . Limit != nil {
2019-04-01 21:14:58 +01:00
orderLimit := addressedOrderLimit . Limit
action = orderLimit . Action
2019-06-10 15:58:28 +01:00
bucketAllocation += orderLimit . Limit
2019-04-01 21:14:58 +01:00
}
}
2019-06-10 15:58:28 +01:00
2019-04-09 20:12:58 +01:00
now := time . Now ( ) . UTC ( )
2019-04-04 16:20:59 +01:00
intervalStart := time . Date ( now . Year ( ) , now . Month ( ) , now . Day ( ) , now . Hour ( ) , 0 , 0 , 0 , now . Location ( ) )
2019-04-01 21:14:58 +01:00
2019-06-12 16:00:29 +01:00
// TODO: all of this below should be a single db transaction. in fact, this whole function should probably be part of an existing transaction
2020-08-28 12:56:09 +01:00
if err := service . orders . UpdateBucketBandwidthAllocation ( ctx , bucket . ProjectID , [ ] byte ( bucket . BucketName ) , action , bucketAllocation , intervalStart ) ; err != nil {
2019-06-10 15:58:28 +01:00
return Error . Wrap ( err )
2019-04-01 21:14:58 +01:00
}
2019-06-10 15:58:28 +01:00
2019-04-01 21:14:58 +01:00
return nil
}
2020-12-14 12:54:22 +00:00
// CreateGetOrderLimits creates the order limits for downloading the pieces of a segment.
2023-01-24 10:22:27 +00:00
func ( service * Service ) CreateGetOrderLimits ( ctx context . Context , bucket metabase . BucketLocation , segment metabase . Segment , desiredNodes int32 , overrideLimit int64 ) ( _ [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-11 21:51:40 +01:00
2023-02-22 10:32:26 +00:00
orderLimit := segment . PieceSize ( )
2021-05-13 14:31:55 +01:00
if overrideLimit > 0 && overrideLimit < orderLimit {
orderLimit = overrideLimit
}
2019-03-28 20:09:23 +00:00
2020-11-06 11:54:52 +00:00
nodeIDs := make ( [ ] storj . NodeID , len ( segment . Pieces ) )
for i , piece := range segment . Pieces {
nodeIDs [ i ] = piece . StorageNode
}
2022-06-22 21:26:53 +01:00
nodes , err := service . overlay . CachedGetOnlineNodesForGet ( ctx , nodeIDs )
2020-11-06 11:54:52 +00:00
if err != nil {
service . log . Debug ( "error getting nodes from overlay" , zap . Error ( err ) )
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
}
2023-06-05 18:55:56 +01:00
if segment . Placement != storj . EveryCountry {
for id , node := range nodes {
if ! segment . Placement . AllowedCountry ( node . CountryCode ) {
delete ( nodes , id )
}
}
}
2021-05-13 14:31:55 +01:00
signer , err := NewSignerGet ( service , segment . RootPieceID , time . Now ( ) , orderLimit , bucket )
2020-11-06 11:54:52 +00:00
if err != nil {
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
}
neededLimits := segment . Redundancy . DownloadNodes ( )
2023-01-24 10:22:27 +00:00
if desiredNodes > neededLimits {
neededLimits = desiredNodes
}
2023-06-05 18:55:56 +01:00
2020-11-06 11:54:52 +00:00
pieces := segment . Pieces
for _ , pieceIndex := range service . perm ( len ( pieces ) ) {
piece := pieces [ pieceIndex ]
node , ok := nodes [ piece . StorageNode ]
if ! ok {
continue
}
2023-01-02 16:10:47 +00:00
_ , err := signer . Sign ( ctx , resolveStorageNode_Selected ( node , true ) , int32 ( piece . Number ) )
2020-11-06 11:54:52 +00:00
if err != nil {
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
}
if len ( signer . AddressedLimits ) >= int ( neededLimits ) {
break
}
}
2023-02-22 10:32:26 +00:00
if len ( signer . AddressedLimits ) < int ( segment . Redundancy . RequiredShares ) {
2020-11-06 11:54:52 +00:00
mon . Meter ( "download_failed_not_enough_pieces_uplink" ) . Mark ( 1 ) //mon:locked
2023-02-22 10:32:26 +00:00
return nil , storj . PiecePrivateKey { } , ErrDownloadFailedNotEnoughPieces . New ( "not enough orderlimits: got %d, required %d" , len ( signer . AddressedLimits ) , segment . Redundancy . RequiredShares )
2019-03-29 09:53:53 +00:00
}
2020-08-28 12:56:09 +01:00
if err := service . updateBandwidth ( ctx , bucket , signer . AddressedLimits ... ) ; err != nil {
2019-07-11 21:51:40 +01:00
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
2019-04-01 21:14:58 +01:00
}
2021-09-22 14:58:06 +01:00
signer . AddressedLimits , err = sortLimits ( signer . AddressedLimits , segment )
if err != nil {
return nil , storj . PiecePrivateKey { } , err
}
// workaround to avoid sending nil values on top level
for i := range signer . AddressedLimits {
if signer . AddressedLimits [ i ] == nil {
signer . AddressedLimits [ i ] = & pb . AddressedOrderLimit { }
}
}
2020-08-14 15:36:30 +01:00
return signer . AddressedLimits , signer . PrivateKey , nil
2019-03-28 20:09:23 +00:00
}
2020-08-14 15:36:30 +01:00
func ( service * Service ) perm ( n int ) [ ] int {
2020-01-27 20:01:37 +00:00
service . rngMu . Lock ( )
2020-08-14 15:36:30 +01:00
defer service . rngMu . Unlock ( )
return service . rng . Perm ( n )
2020-01-27 20:01:37 +00:00
}
2021-09-22 14:58:06 +01:00
// sortLimits sorts order limits and fill missing ones with nil values.
func sortLimits ( limits [ ] * pb . AddressedOrderLimit , segment metabase . Segment ) ( [ ] * pb . AddressedOrderLimit , error ) {
sorted := make ( [ ] * pb . AddressedOrderLimit , segment . Redundancy . TotalShares )
for _ , piece := range segment . Pieces {
if int16 ( piece . Number ) >= segment . Redundancy . TotalShares {
return nil , Error . New ( "piece number is greater than redundancy total shares: got %d, max %d" ,
piece . Number , ( segment . Redundancy . TotalShares - 1 ) )
}
sorted [ piece . Number ] = getLimitByStorageNodeID ( limits , piece . StorageNode )
}
return sorted , nil
}
func getLimitByStorageNodeID ( limits [ ] * pb . AddressedOrderLimit , storageNodeID storj . NodeID ) * pb . AddressedOrderLimit {
for _ , limit := range limits {
if limit == nil || limit . GetLimit ( ) == nil {
continue
}
if limit . GetLimit ( ) . StorageNodeId == storageNodeID {
return limit
}
}
return nil
}
2019-03-28 20:09:23 +00:00
// CreatePutOrderLimits creates the order limits for uploading pieces to nodes.
2020-08-28 12:56:09 +01:00
func ( service * Service ) CreatePutOrderLimits ( ctx context . Context , bucket metabase . BucketLocation , nodes [ ] * overlay . SelectedNode , pieceExpiration time . Time , maxPieceSize int64 ) ( _ storj . PieceID , _ [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-09 22:54:00 +01:00
2020-07-24 18:13:15 +01:00
signer , err := NewSignerPut ( service , pieceExpiration , time . Now ( ) , maxPieceSize , bucket )
2019-03-28 20:09:23 +00:00
if err != nil {
2019-07-11 21:51:40 +01:00
return storj . PieceID { } , nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
2019-03-28 20:09:23 +00:00
}
2020-07-24 19:57:11 +01:00
for pieceNum , node := range nodes {
2023-01-02 16:10:47 +00:00
_ , err := signer . Sign ( ctx , resolveStorageNode_Selected ( node , true ) , int32 ( pieceNum ) )
2020-07-24 19:57:11 +01:00
if err != nil {
return storj . PieceID { } , nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
2019-03-28 20:09:23 +00:00
}
}
2020-07-24 19:57:11 +01:00
return signer . RootPieceID , signer . AddressedLimits , signer . PrivateKey , nil
2019-03-28 20:09:23 +00:00
}
2023-01-11 15:40:17 +00:00
// ReplacePutOrderLimits replaces order limits for uploading pieces to nodes.
func ( service * Service ) ReplacePutOrderLimits ( ctx context . Context , rootPieceID storj . PieceID , addressedLimits [ ] * pb . AddressedOrderLimit , nodes [ ] * overlay . SelectedNode , pieceNumbers [ ] int32 ) ( _ [ ] * pb . AddressedOrderLimit , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
pieceIDDeriver := rootPieceID . Deriver ( )
newAddressedLimits := make ( [ ] * pb . AddressedOrderLimit , len ( addressedLimits ) )
copy ( newAddressedLimits , addressedLimits )
for i , pieceNumber := range pieceNumbers {
if pieceNumber < 0 || int ( pieceNumber ) >= len ( addressedLimits ) {
return nil , Error . New ( "invalid piece number %d" , pieceNumber )
}
// TODO: clone?
newAddressedLimit := * addressedLimits [ pieceNumber ] . Limit
newAddressedLimit . StorageNodeId = nodes [ i ] . ID
newAddressedLimit . PieceId = pieceIDDeriver . Derive ( nodes [ i ] . ID , pieceNumber )
newAddressedLimit . SatelliteSignature = nil
newAddressedLimits [ pieceNumber ] . Limit , err = signing . SignOrderLimit ( ctx , service . satellite , & newAddressedLimit )
if err != nil {
return nil , ErrSigner . Wrap ( err )
}
2023-01-02 16:10:47 +00:00
newAddressedLimits [ pieceNumber ] . StorageNodeAddress = resolveStorageNode_Selected ( nodes [ i ] , true ) . Address
2023-01-11 15:40:17 +00:00
}
return newAddressedLimits , nil
}
2020-12-14 12:54:22 +00:00
// CreateAuditOrderLimits creates the order limits for auditing the pieces of a segment.
2021-11-08 20:51:04 +00:00
func ( service * Service ) CreateAuditOrderLimits ( ctx context . Context , segment metabase . Segment , skip map [ storj . NodeID ] bool ) ( _ [ ] * pb . AddressedOrderLimit , _ storj . PiecePrivateKey , cachedNodesInfo map [ storj . NodeID ] overlay . NodeReputation , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-07-24 19:57:11 +01:00
2020-12-14 12:54:22 +00:00
nodeIDs := make ( [ ] storj . NodeID , len ( segment . Pieces ) )
for i , piece := range segment . Pieces {
nodeIDs [ i ] = piece . StorageNode
2020-03-13 18:01:48 +00:00
}
2021-11-08 20:51:04 +00:00
nodes , err := service . overlay . GetOnlineNodesForAuditRepair ( ctx , nodeIDs )
2020-03-13 18:01:48 +00:00
if err != nil {
service . log . Debug ( "error getting nodes from overlay" , zap . Error ( err ) )
satellite/audit: use LastIPAndPort preferentially
This preserves the last_ip_and_port field from node lookups through
CreateAuditOrderLimits() and CreateAuditOrderLimit(), so that later
calls to (*Verifier).GetShare() can try to use that IP and port. If a
connection to the given IP and port cannot be made, or the connection
cannot be verified and secured with the target node identity, an
attempt is made to connect to the original node address instead.
A similar change is not necessary to the other Create*OrderLimits
functions, because they already replace node addresses with the cached
IP and port as appropriate. We might want to consider making a similar
change to CreateGetRepairOrderLimits(), though.
The audit situation is unique because the ramifications are especially
powerful when we get the address wrong. Failing a single audit can have
a heavy cost to a storage node. We need to make extra effort in order
to avoid imposing that cost unfairly.
Situation 1: If an audit fails because the repair worker failed to make
a DNS query (which might well be the fault on the satellite side), and
we have last_ip_and_port information available for the target node, it
would be unfair not to try connecting to that last_ip_and_port address.
Situation 2: If a node has changed addresses recently and the operator
correctly changed its DNS entry, but we don't bother querying DNS, it
would be unfair to penalize the node for our failure to connect to it.
So the audit worker must try both last_ip_and_port _and_ the node
address as supplied by the SNO.
We elect here to try last_ip_and_port first, on the grounds that (a) it
is expected to work in the large majority of cases, and (b) there
should not be any security concerns with connecting to an out-or-date
address, and (c) avoiding DNS queries on the satellite side helps
alleviate satellite operational load.
Change-Id: I9bf6c6c79866d879adecac6144a6c346f4f61200
2020-09-30 05:53:43 +01:00
return nil , storj . PiecePrivateKey { } , nil , Error . Wrap ( err )
2020-03-13 18:01:48 +00:00
}
2021-06-14 16:40:46 +01:00
bucket := metabase . BucketLocation { }
2020-12-14 12:54:22 +00:00
signer , err := NewSignerAudit ( service , segment . RootPieceID , time . Now ( ) , int64 ( segment . Redundancy . ShareSize ) , bucket )
2020-07-24 19:57:11 +01:00
if err != nil {
satellite/audit: use LastIPAndPort preferentially
This preserves the last_ip_and_port field from node lookups through
CreateAuditOrderLimits() and CreateAuditOrderLimit(), so that later
calls to (*Verifier).GetShare() can try to use that IP and port. If a
connection to the given IP and port cannot be made, or the connection
cannot be verified and secured with the target node identity, an
attempt is made to connect to the original node address instead.
A similar change is not necessary to the other Create*OrderLimits
functions, because they already replace node addresses with the cached
IP and port as appropriate. We might want to consider making a similar
change to CreateGetRepairOrderLimits(), though.
The audit situation is unique because the ramifications are especially
powerful when we get the address wrong. Failing a single audit can have
a heavy cost to a storage node. We need to make extra effort in order
to avoid imposing that cost unfairly.
Situation 1: If an audit fails because the repair worker failed to make
a DNS query (which might well be the fault on the satellite side), and
we have last_ip_and_port information available for the target node, it
would be unfair not to try connecting to that last_ip_and_port address.
Situation 2: If a node has changed addresses recently and the operator
correctly changed its DNS entry, but we don't bother querying DNS, it
would be unfair to penalize the node for our failure to connect to it.
So the audit worker must try both last_ip_and_port _and_ the node
address as supplied by the SNO.
We elect here to try last_ip_and_port first, on the grounds that (a) it
is expected to work in the large majority of cases, and (b) there
should not be any security concerns with connecting to an out-or-date
address, and (c) avoiding DNS queries on the satellite side helps
alleviate satellite operational load.
Change-Id: I9bf6c6c79866d879adecac6144a6c346f4f61200
2020-09-30 05:53:43 +01:00
return nil , storj . PiecePrivateKey { } , nil , Error . Wrap ( err )
2020-07-24 19:57:11 +01:00
}
2021-11-08 20:51:04 +00:00
cachedNodesInfo = make ( map [ storj . NodeID ] overlay . NodeReputation )
2020-03-30 14:32:02 +01:00
var nodeErrors errs . Group
2020-12-14 12:54:22 +00:00
var limitsCount int16
limits := make ( [ ] * pb . AddressedOrderLimit , segment . Redundancy . TotalShares )
for _ , piece := range segment . Pieces {
if skip [ piece . StorageNode ] {
2019-05-27 12:13:47 +01:00
continue
}
2020-12-14 12:54:22 +00:00
node , ok := nodes [ piece . StorageNode ]
2020-03-13 18:01:48 +00:00
if ! ok {
2020-12-14 12:54:22 +00:00
nodeErrors . Add ( errs . New ( "node %q is not reliable" , piece . StorageNode ) )
2019-03-29 08:53:43 +00:00
continue
}
2021-11-08 20:51:04 +00:00
cachedNodesInfo [ piece . StorageNode ] = * node
2023-01-02 16:10:47 +00:00
limit , err := signer . Sign ( ctx , resolveStorageNode_Reputation ( node ) , int32 ( piece . Number ) )
2019-03-28 20:09:23 +00:00
if err != nil {
satellite/audit: use LastIPAndPort preferentially
This preserves the last_ip_and_port field from node lookups through
CreateAuditOrderLimits() and CreateAuditOrderLimit(), so that later
calls to (*Verifier).GetShare() can try to use that IP and port. If a
connection to the given IP and port cannot be made, or the connection
cannot be verified and secured with the target node identity, an
attempt is made to connect to the original node address instead.
A similar change is not necessary to the other Create*OrderLimits
functions, because they already replace node addresses with the cached
IP and port as appropriate. We might want to consider making a similar
change to CreateGetRepairOrderLimits(), though.
The audit situation is unique because the ramifications are especially
powerful when we get the address wrong. Failing a single audit can have
a heavy cost to a storage node. We need to make extra effort in order
to avoid imposing that cost unfairly.
Situation 1: If an audit fails because the repair worker failed to make
a DNS query (which might well be the fault on the satellite side), and
we have last_ip_and_port information available for the target node, it
would be unfair not to try connecting to that last_ip_and_port address.
Situation 2: If a node has changed addresses recently and the operator
correctly changed its DNS entry, but we don't bother querying DNS, it
would be unfair to penalize the node for our failure to connect to it.
So the audit worker must try both last_ip_and_port _and_ the node
address as supplied by the SNO.
We elect here to try last_ip_and_port first, on the grounds that (a) it
is expected to work in the large majority of cases, and (b) there
should not be any security concerns with connecting to an out-or-date
address, and (c) avoiding DNS queries on the satellite side helps
alleviate satellite operational load.
Change-Id: I9bf6c6c79866d879adecac6144a6c346f4f61200
2020-09-30 05:53:43 +01:00
return nil , storj . PiecePrivateKey { } , nil , Error . Wrap ( err )
2019-03-28 20:09:23 +00:00
}
2020-12-14 12:54:22 +00:00
limits [ piece . Number ] = limit
2019-03-28 20:09:23 +00:00
limitsCount ++
}
2019-03-29 08:53:43 +00:00
2020-12-14 12:54:22 +00:00
if limitsCount < segment . Redundancy . RequiredShares {
2021-08-17 17:33:22 +01:00
err = ErrDownloadFailedNotEnoughPieces . New ( "not enough nodes available: got %d, required %d" , limitsCount , segment . Redundancy . RequiredShares )
satellite/audit: use LastIPAndPort preferentially
This preserves the last_ip_and_port field from node lookups through
CreateAuditOrderLimits() and CreateAuditOrderLimit(), so that later
calls to (*Verifier).GetShare() can try to use that IP and port. If a
connection to the given IP and port cannot be made, or the connection
cannot be verified and secured with the target node identity, an
attempt is made to connect to the original node address instead.
A similar change is not necessary to the other Create*OrderLimits
functions, because they already replace node addresses with the cached
IP and port as appropriate. We might want to consider making a similar
change to CreateGetRepairOrderLimits(), though.
The audit situation is unique because the ramifications are especially
powerful when we get the address wrong. Failing a single audit can have
a heavy cost to a storage node. We need to make extra effort in order
to avoid imposing that cost unfairly.
Situation 1: If an audit fails because the repair worker failed to make
a DNS query (which might well be the fault on the satellite side), and
we have last_ip_and_port information available for the target node, it
would be unfair not to try connecting to that last_ip_and_port address.
Situation 2: If a node has changed addresses recently and the operator
correctly changed its DNS entry, but we don't bother querying DNS, it
would be unfair to penalize the node for our failure to connect to it.
So the audit worker must try both last_ip_and_port _and_ the node
address as supplied by the SNO.
We elect here to try last_ip_and_port first, on the grounds that (a) it
is expected to work in the large majority of cases, and (b) there
should not be any security concerns with connecting to an out-or-date
address, and (c) avoiding DNS queries on the satellite side helps
alleviate satellite operational load.
Change-Id: I9bf6c6c79866d879adecac6144a6c346f4f61200
2020-09-30 05:53:43 +01:00
return nil , storj . PiecePrivateKey { } , nil , errs . Combine ( err , nodeErrors . Err ( ) )
2019-03-28 20:09:23 +00:00
}
2019-04-01 21:14:58 +01:00
2021-11-08 20:51:04 +00:00
return limits , signer . PrivateKey , cachedNodesInfo , nil
2019-03-28 20:09:23 +00:00
}
2022-10-11 20:38:40 +01:00
// CreateAuditOrderLimit creates an order limit for auditing a single piece from a segment.
2021-11-08 20:51:04 +00:00
func ( service * Service ) CreateAuditOrderLimit ( ctx context . Context , nodeID storj . NodeID , pieceNum uint16 , rootPieceID storj . PieceID , shareSize int32 ) ( limit * pb . AddressedOrderLimit , _ storj . PiecePrivateKey , nodeInfo * overlay . NodeReputation , err error ) {
2019-07-03 17:53:15 +01:00
// TODO reduce number of params ?
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-09 22:54:00 +01:00
2022-10-11 20:38:40 +01:00
signer , err := NewSignerAudit ( service , rootPieceID , time . Now ( ) , int64 ( shareSize ) , metabase . BucketLocation { } )
if err != nil {
return nil , storj . PiecePrivateKey { } , nodeInfo , Error . Wrap ( err )
}
return service . createAuditOrderLimitWithSigner ( ctx , nodeID , pieceNum , signer )
}
// CreateAuditPieceOrderLimit creates an order limit for auditing a single
// piece from a segment, requesting that the original order limit and piece
// hash be included.
//
// Unfortunately, because of the way the protocol works historically, we
// must use GET_REPAIR for this operation instead of GET_AUDIT.
func ( service * Service ) CreateAuditPieceOrderLimit ( ctx context . Context , nodeID storj . NodeID , pieceNum uint16 , rootPieceID storj . PieceID , pieceSize int32 ) ( limit * pb . AddressedOrderLimit , _ storj . PiecePrivateKey , nodeInfo * overlay . NodeReputation , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
signer , err := NewSignerRepairGet ( service , rootPieceID , time . Now ( ) , int64 ( pieceSize ) , metabase . BucketLocation { } )
if err != nil {
return nil , storj . PiecePrivateKey { } , nodeInfo , Error . Wrap ( err )
}
return service . createAuditOrderLimitWithSigner ( ctx , nodeID , pieceNum , signer )
}
func ( service * Service ) createAuditOrderLimitWithSigner ( ctx context . Context , nodeID storj . NodeID , pieceNum uint16 , signer * Signer ) ( limit * pb . AddressedOrderLimit , _ storj . PiecePrivateKey , nodeInfo * overlay . NodeReputation , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-08-06 17:35:59 +01:00
node , err := service . overlay . Get ( ctx , nodeID )
2019-05-27 12:13:47 +01:00
if err != nil {
2021-11-08 20:51:04 +00:00
return nil , storj . PiecePrivateKey { } , nil , Error . Wrap ( err )
2019-05-27 12:13:47 +01:00
}
2021-11-08 20:51:04 +00:00
nodeInfo = & overlay . NodeReputation {
ID : nodeID ,
Address : node . Address ,
LastNet : node . LastNet ,
LastIPPort : node . LastIPPort ,
Reputation : node . Reputation . Status ,
}
2019-06-24 15:46:10 +01:00
if node . Disqualified != nil {
2021-11-08 20:51:04 +00:00
return nil , storj . PiecePrivateKey { } , nodeInfo , overlay . ErrNodeDisqualified . New ( "%v" , nodeID )
2019-06-24 15:46:10 +01:00
}
2020-08-13 13:00:56 +01:00
if node . ExitStatus . ExitFinishedAt != nil {
2021-11-08 20:51:04 +00:00
return nil , storj . PiecePrivateKey { } , nodeInfo , overlay . ErrNodeFinishedGE . New ( "%v" , nodeID )
2020-08-13 13:00:56 +01:00
}
2019-08-06 17:35:59 +01:00
if ! service . overlay . IsOnline ( node ) {
2021-11-08 20:51:04 +00:00
return nil , storj . PiecePrivateKey { } , nodeInfo , overlay . ErrNodeOffline . New ( "%v" , nodeID )
2019-05-27 12:13:47 +01:00
}
2023-01-02 16:10:47 +00:00
orderLimit , err := signer . Sign ( ctx , resolveStorageNode ( & node . Node , node . LastIPPort , false ) , int32 ( pieceNum ) )
2020-07-24 19:57:11 +01:00
if err != nil {
2021-11-08 20:51:04 +00:00
return nil , storj . PiecePrivateKey { } , nodeInfo , Error . Wrap ( err )
2019-05-27 12:13:47 +01:00
}
2021-11-08 20:51:04 +00:00
return orderLimit , signer . PrivateKey , nodeInfo , nil
2019-05-27 12:13:47 +01:00
}
2019-07-11 23:44:47 +01:00
// CreateGetRepairOrderLimits creates the order limits for downloading the
2020-12-14 14:29:48 +00:00
// healthy pieces of segment as the source for repair.
2019-07-11 23:44:47 +01:00
//
// The length of the returned orders slice is the total number of pieces of the
// segment, setting to null the ones which don't correspond to a healthy piece.
2023-05-22 13:35:23 +01:00
func ( service * Service ) CreateGetRepairOrderLimits ( ctx context . Context , segment metabase . Segment , healthy metabase . Pieces ) ( _ [ ] * pb . AddressedOrderLimit , _ storj . PiecePrivateKey , cachedNodesInfo map [ storj . NodeID ] overlay . NodeReputation , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-07-11 21:51:40 +01:00
2023-02-22 10:32:26 +00:00
pieceSize := segment . PieceSize ( )
totalPieces := segment . Redundancy . TotalShares
2019-03-28 20:09:23 +00:00
2020-12-14 14:29:48 +00:00
nodeIDs := make ( [ ] storj . NodeID , len ( segment . Pieces ) )
for i , piece := range segment . Pieces {
nodeIDs [ i ] = piece . StorageNode
2020-03-13 18:01:48 +00:00
}
2021-11-08 20:51:04 +00:00
nodes , err := service . overlay . GetOnlineNodesForAuditRepair ( ctx , nodeIDs )
2020-03-13 18:01:48 +00:00
if err != nil {
service . log . Debug ( "error getting nodes from overlay" , zap . Error ( err ) )
2021-07-13 14:52:37 +01:00
return nil , storj . PiecePrivateKey { } , nil , Error . Wrap ( err )
2020-03-13 18:01:48 +00:00
}
2023-05-22 13:35:23 +01:00
signer , err := NewSignerRepairGet ( service , segment . RootPieceID , time . Now ( ) , pieceSize , metabase . BucketLocation { } )
2020-07-24 19:57:11 +01:00
if err != nil {
2021-07-13 14:52:37 +01:00
return nil , storj . PiecePrivateKey { } , nil , Error . Wrap ( err )
2020-07-24 19:57:11 +01:00
}
2021-11-08 20:51:04 +00:00
cachedNodesInfo = make ( map [ storj . NodeID ] overlay . NodeReputation , len ( healthy ) )
2020-03-30 14:32:02 +01:00
var nodeErrors errs . Group
2019-04-03 14:17:32 +01:00
var limitsCount int
2019-03-28 20:09:23 +00:00
limits := make ( [ ] * pb . AddressedOrderLimit , totalPieces )
for _ , piece := range healthy {
2020-12-14 14:29:48 +00:00
node , ok := nodes [ piece . StorageNode ]
2020-03-13 18:01:48 +00:00
if ! ok {
2020-12-14 14:29:48 +00:00
nodeErrors . Add ( errs . New ( "node %q is not reliable" , piece . StorageNode ) )
2019-03-29 08:53:43 +00:00
continue
}
2021-11-08 20:51:04 +00:00
cachedNodesInfo [ piece . StorageNode ] = * node
2021-07-13 14:52:37 +01:00
2023-01-02 16:10:47 +00:00
limit , err := signer . Sign ( ctx , resolveStorageNode_Reputation ( node ) , int32 ( piece . Number ) )
2019-03-28 20:09:23 +00:00
if err != nil {
2021-07-13 14:52:37 +01:00
return nil , storj . PiecePrivateKey { } , nil , Error . Wrap ( err )
2019-03-28 20:09:23 +00:00
}
2020-12-14 14:29:48 +00:00
limits [ piece . Number ] = limit
2019-04-03 14:17:32 +01:00
limitsCount ++
2019-03-28 20:09:23 +00:00
}
2023-02-22 10:32:26 +00:00
if limitsCount < int ( segment . Redundancy . RequiredShares ) {
err = ErrDownloadFailedNotEnoughPieces . New ( "not enough nodes available: got %d, required %d" , limitsCount , segment . Redundancy . RequiredShares )
2021-07-13 14:52:37 +01:00
return nil , storj . PiecePrivateKey { } , nil , errs . Combine ( err , nodeErrors . Err ( ) )
2019-03-29 08:53:43 +00:00
}
2021-11-08 20:51:04 +00:00
return limits , signer . PrivateKey , cachedNodesInfo , nil
2019-03-28 20:09:23 +00:00
}
2020-12-14 14:29:48 +00:00
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes.
2023-05-22 13:35:23 +01:00
func ( service * Service ) CreatePutRepairOrderLimits ( ctx context . Context , segment metabase . Segment , getOrderLimits [ ] * pb . AddressedOrderLimit , healthySet map [ int32 ] struct { } , newNodes [ ] * overlay . SelectedNode , optimalThresholdMultiplier float64 , numPiecesInExcludedCountries int ) ( _ [ ] * pb . AddressedOrderLimit , _ storj . PiecePrivateKey , err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-28 20:09:23 +00:00
2020-07-24 19:57:11 +01:00
// Create the order limits for being used to upload the repaired pieces
2023-02-22 10:32:26 +00:00
pieceSize := segment . PieceSize ( )
2020-07-24 19:57:11 +01:00
2023-02-22 10:32:26 +00:00
totalPieces := int ( segment . Redundancy . TotalShares )
totalPiecesAfterRepair := int ( math . Ceil ( float64 ( segment . Redundancy . OptimalShares ) * optimalThresholdMultiplier ) ) + numPiecesInExcludedCountries
2022-03-03 00:23:11 +00:00
2020-07-24 19:57:11 +01:00
if totalPiecesAfterRepair > totalPieces {
totalPiecesAfterRepair = totalPieces
}
2022-12-13 20:40:15 +00:00
var numRetrievablePieces int
2020-07-24 19:57:11 +01:00
for _ , o := range getOrderLimits {
if o != nil {
2022-12-13 20:40:15 +00:00
numRetrievablePieces ++
2020-07-24 19:57:11 +01:00
}
}
2022-12-13 20:40:15 +00:00
totalPiecesToRepair := totalPiecesAfterRepair - len ( healthySet )
2019-07-11 21:51:40 +01:00
2020-07-24 19:57:11 +01:00
limits := make ( [ ] * pb . AddressedOrderLimit , totalPieces )
2020-12-14 14:29:48 +00:00
2021-07-26 18:39:56 +01:00
expirationDate := time . Time { }
if segment . ExpiresAt != nil {
expirationDate = * segment . ExpiresAt
}
2023-05-22 13:35:23 +01:00
signer , err := NewSignerRepairPut ( service , segment . RootPieceID , expirationDate , time . Now ( ) , pieceSize , metabase . BucketLocation { } )
2019-03-28 20:09:23 +00:00
if err != nil {
2019-07-11 21:51:40 +01:00
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
2019-03-28 20:09:23 +00:00
}
2020-07-24 19:57:11 +01:00
var pieceNum int32
for _ , node := range newNodes {
2022-12-13 20:40:15 +00:00
for int ( pieceNum ) < totalPieces {
_ , isHealthy := healthySet [ pieceNum ]
if ! isHealthy {
break
}
2020-07-24 19:57:11 +01:00
pieceNum ++
2019-03-28 20:09:23 +00:00
}
2020-07-24 19:57:11 +01:00
if int ( pieceNum ) >= totalPieces { // should not happen
return nil , storj . PiecePrivateKey { } , Error . New ( "piece num greater than total pieces: %d >= %d" , pieceNum , totalPieces )
2019-03-28 20:09:23 +00:00
}
2023-01-02 16:10:47 +00:00
limit , err := signer . Sign ( ctx , resolveStorageNode_Selected ( node , false ) , pieceNum )
2020-07-24 19:57:11 +01:00
if err != nil {
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
2019-03-28 20:09:23 +00:00
}
2020-07-24 19:57:11 +01:00
limits [ pieceNum ] = limit
pieceNum ++
totalPiecesToRepair --
2019-07-11 23:44:47 +01:00
2020-07-24 19:57:11 +01:00
if totalPiecesToRepair == 0 {
break
2019-03-28 20:09:23 +00:00
}
}
2020-07-24 19:57:11 +01:00
return limits , signer . PrivateKey , nil
2019-03-28 20:09:23 +00:00
}
2019-04-05 08:42:56 +01:00
2019-10-11 22:18:05 +01:00
// CreateGracefulExitPutOrderLimit creates an order limit for graceful exit put transfers.
2020-08-28 12:56:09 +01:00
func ( service * Service ) CreateGracefulExitPutOrderLimit ( ctx context . Context , bucket metabase . BucketLocation , nodeID storj . NodeID , pieceNum int32 , rootPieceID storj . PieceID , shareSize int32 ) ( limit * pb . AddressedOrderLimit , _ storj . PiecePrivateKey , err error ) {
2019-10-11 22:18:05 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-07-24 19:57:11 +01:00
// should this use KnownReliable or similar?
2019-10-11 22:18:05 +01:00
node , err := service . overlay . Get ( ctx , nodeID )
if err != nil {
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
}
if node . Disqualified != nil {
return nil , storj . PiecePrivateKey { } , overlay . ErrNodeDisqualified . New ( "%v" , nodeID )
}
if ! service . overlay . IsOnline ( node ) {
return nil , storj . PiecePrivateKey { } , overlay . ErrNodeOffline . New ( "%v" , nodeID )
}
2020-07-24 18:13:15 +01:00
signer , err := NewSignerGracefulExit ( service , rootPieceID , time . Now ( ) , shareSize , bucket )
2019-10-11 22:18:05 +01:00
if err != nil {
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
}
2023-01-02 16:10:47 +00:00
limit , err = signer . Sign ( ctx , resolveStorageNode ( & node . Node , node . LastIPPort , true ) , pieceNum )
2020-07-24 19:57:11 +01:00
if err != nil {
return nil , storj . PiecePrivateKey { } , Error . Wrap ( err )
2019-10-11 22:18:05 +01:00
}
2020-07-24 19:57:11 +01:00
return limit , signer . PrivateKey , nil
2019-10-11 22:18:05 +01:00
}
2020-07-16 15:18:02 +01:00
// UpdateGetInlineOrder updates amount of inline GET bandwidth for given bucket.
2020-08-28 12:56:09 +01:00
func ( service * Service ) UpdateGetInlineOrder ( ctx context . Context , bucket metabase . BucketLocation , amount int64 ) ( err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-04-09 20:12:58 +01:00
now := time . Now ( ) . UTC ( )
2019-04-05 08:42:56 +01:00
intervalStart := time . Date ( now . Year ( ) , now . Month ( ) , now . Day ( ) , now . Hour ( ) , 0 , 0 , 0 , now . Location ( ) )
2020-08-28 12:56:09 +01:00
return service . orders . UpdateBucketBandwidthInline ( ctx , bucket . ProjectID , [ ] byte ( bucket . BucketName ) , pb . PieceAction_GET , amount , intervalStart )
2019-04-05 08:42:56 +01:00
}
2020-07-16 15:18:02 +01:00
// UpdatePutInlineOrder updates amount of inline PUT bandwidth for given bucket.
2020-08-28 12:56:09 +01:00
func ( service * Service ) UpdatePutInlineOrder ( ctx context . Context , bucket metabase . BucketLocation , amount int64 ) ( err error ) {
2019-06-04 12:55:38 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-04-09 20:12:58 +01:00
now := time . Now ( ) . UTC ( )
2019-04-05 08:42:56 +01:00
intervalStart := time . Date ( now . Year ( ) , now . Month ( ) , now . Day ( ) , now . Hour ( ) , 0 , 0 , 0 , now . Location ( ) )
2020-08-28 12:56:09 +01:00
return service . orders . UpdateBucketBandwidthInline ( ctx , bucket . ProjectID , [ ] byte ( bucket . BucketName ) , pb . PieceAction_PUT , amount , intervalStart )
2019-04-05 08:42:56 +01:00
}
2020-11-18 21:39:13 +00:00
// DecryptOrderMetadata decrypts the order metadata.
2021-01-08 16:04:46 +00:00
func ( service * Service ) DecryptOrderMetadata ( ctx context . Context , order * pb . OrderLimit ) ( _ * internalpb . OrderLimitMetadata , err error ) {
2020-11-18 21:39:13 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
var orderKeyID EncryptionKeyID
copy ( orderKeyID [ : ] , order . EncryptedMetadataKeyId )
2021-01-22 13:51:29 +00:00
key := service . encryptionKeys . Default
2020-11-18 21:39:13 +00:00
if key . ID != orderKeyID {
val , ok := service . encryptionKeys . KeyByID [ orderKeyID ]
if ! ok {
return nil , ErrDecryptOrderMetadata . New ( "no encryption key found that matches the order.EncryptedMetadataKeyId" )
}
key = EncryptionKey {
ID : orderKeyID ,
Key : val ,
}
}
return key . DecryptMetadata ( order . SerialNumber , order . EncryptedMetadata )
}
2023-01-11 15:40:17 +00:00
2023-01-02 16:10:47 +00:00
func resolveStorageNode_Selected ( node * overlay . SelectedNode , resolveDNS bool ) * pb . Node {
return resolveStorageNode ( & pb . Node {
Id : node . ID ,
Address : node . Address ,
} , node . LastIPPort , resolveDNS )
}
func resolveStorageNode_Reputation ( node * overlay . NodeReputation ) * pb . Node {
return resolveStorageNode ( & pb . Node {
Id : node . ID ,
Address : node . Address ,
} , node . LastIPPort , false )
}
func resolveStorageNode ( node * pb . Node , lastIPPort string , resolveDNS bool ) * pb . Node {
if resolveDNS && lastIPPort != "" {
node = pb . CopyNode ( node ) // we mutate
node . Address . Address = lastIPPort
2023-01-11 15:40:17 +00:00
}
2023-01-02 16:10:47 +00:00
return node
2023-01-11 15:40:17 +00:00
}