2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-07-03 09:35:01 +01:00
// See LICENSE for copying information.
package ecclient
import (
"context"
"io"
2018-08-06 15:24:30 +01:00
"io/ioutil"
2018-08-02 16:12:19 +01:00
"sort"
2019-02-05 10:54:25 +00:00
"sync/atomic"
2018-07-03 09:35:01 +01:00
"time"
2019-01-29 20:42:27 +00:00
"github.com/zeebo/errs"
2018-07-03 09:35:01 +01:00
"go.uber.org/zap"
2019-01-22 15:48:23 +00:00
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2018-07-03 09:35:01 +01:00
2019-03-18 10:55:06 +00:00
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/auth/signing"
2018-07-03 09:35:01 +01:00
"storj.io/storj/pkg/eestream"
2018-09-18 05:39:06 +01:00
"storj.io/storj/pkg/pb"
2018-07-03 09:35:01 +01:00
"storj.io/storj/pkg/ranger"
2018-11-30 13:40:13 +00:00
"storj.io/storj/pkg/storj"
2018-07-03 09:35:01 +01:00
"storj.io/storj/pkg/transport"
2019-03-18 10:55:06 +00:00
"storj.io/storj/uplink/piecestore"
2018-07-03 09:35:01 +01:00
)
var mon = monkit . Package ( )
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
2019-03-18 10:55:06 +00:00
Put ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , rs eestream . RedundancyStrategy , data io . Reader , expiration time . Time ) ( successfulNodes [ ] * pb . Node , successfulHashes [ ] * pb . PieceHash , err error )
2019-05-20 09:37:46 +01:00
Repair ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , rs eestream . RedundancyStrategy , data io . Reader , expiration time . Time , timeout time . Duration , path storj . Path ) ( successfulNodes [ ] * pb . Node , successfulHashes [ ] * pb . PieceHash , err error )
2019-03-18 10:55:06 +00:00
Get ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , es eestream . ErasureScheme , size int64 ) ( ranger . Ranger , error )
Delete ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit ) error
2019-06-14 10:16:31 +01:00
WithForceErrorDetection ( force bool ) Client
2018-07-03 09:35:01 +01:00
}
2019-06-26 13:14:48 +01:00
type dialPiecestoreFunc func ( context . Context , * pb . Node ) ( * piecestore . Client , error )
2018-07-03 09:35:01 +01:00
2018-11-06 17:49:17 +00:00
type ecClient struct {
2019-07-02 11:08:02 +01:00
log * zap . Logger
2019-06-14 10:16:31 +01:00
transport transport . Client
memoryLimit int
forceErrorDetection bool
2018-07-03 09:35:01 +01:00
}
2018-11-06 17:49:17 +00:00
// NewClient from the given identity and max buffer memory
2019-07-02 11:08:02 +01:00
func NewClient ( log * zap . Logger , tc transport . Client , memoryLimit int ) Client {
2018-11-06 17:49:17 +00:00
return & ecClient {
2019-07-02 11:08:02 +01:00
log : log ,
2019-03-18 10:55:06 +00:00
transport : tc ,
memoryLimit : memoryLimit ,
2018-07-03 09:35:01 +01:00
}
}
2019-06-14 10:16:31 +01:00
func ( ec * ecClient ) WithForceErrorDetection ( force bool ) Client {
ec . forceErrorDetection = force
return ec
}
2019-06-26 13:14:48 +01:00
func ( ec * ecClient ) dialPiecestore ( ctx context . Context , n * pb . Node ) ( * piecestore . Client , error ) {
2019-07-02 11:08:02 +01:00
logger := ec . log . Named ( n . Id . String ( ) )
2019-06-26 13:14:48 +01:00
signer := signing . SignerFromFullIdentity ( ec . transport . Identity ( ) )
return piecestore . Dial ( ctx , ec . transport , n , logger , signer , piecestore . DefaultConfig )
2018-07-03 09:35:01 +01:00
}
2019-03-18 10:55:06 +00:00
func ( ec * ecClient ) Put ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , rs eestream . RedundancyStrategy , data io . Reader , expiration time . Time ) ( successfulNodes [ ] * pb . Node , successfulHashes [ ] * pb . PieceHash , err error ) {
2018-07-03 09:35:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-19 13:14:59 +00:00
2019-03-18 10:55:06 +00:00
if len ( limits ) != rs . TotalCount ( ) {
return nil , nil , Error . New ( "size of limits slice (%d) does not match total count (%d) of erasure scheme" , len ( limits ) , rs . TotalCount ( ) )
2018-07-03 09:35:01 +01:00
}
2018-12-11 16:05:14 +00:00
2019-06-19 21:13:11 +01:00
nonNilLimits := nonNilCount ( limits )
if nonNilLimits <= rs . RepairThreshold ( ) && nonNilLimits < rs . OptimalThreshold ( ) {
return nil , nil , Error . New ( "number of non-nil limits (%d) is less than or equal to the repair threshold (%d) of erasure scheme" , nonNilLimits , rs . RepairThreshold ( ) )
2018-12-11 16:05:14 +00:00
}
2019-03-18 10:55:06 +00:00
if ! unique ( limits ) {
2019-02-25 15:57:54 +00:00
return nil , nil , Error . New ( "duplicated nodes are not allowed" )
2018-08-02 16:12:19 +01:00
}
2018-09-27 11:45:19 +01:00
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Uploading to storage nodes using ErasureShareSize: %d StripeSize: %d RepairThreshold: %d OptimalThreshold: %d" ,
2019-06-06 20:51:00 +01:00
rs . ErasureShareSize ( ) , rs . StripeSize ( ) , rs . RepairThreshold ( ) , rs . OptimalThreshold ( ) )
2018-09-27 12:52:18 +01:00
padded := eestream . PadReader ( ioutil . NopCloser ( data ) , rs . StripeSize ( ) )
2019-02-05 10:54:25 +00:00
readers , err := eestream . EncodeReader ( ctx , padded , rs )
2018-07-03 09:35:01 +01:00
if err != nil {
2019-02-25 15:57:54 +00:00
return nil , nil , err
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
type info struct {
2019-02-25 15:57:54 +00:00
i int
err error
2019-03-18 10:55:06 +00:00
hash * pb . PieceHash
2018-09-27 11:45:19 +01:00
}
2019-03-18 10:55:06 +00:00
infos := make ( chan info , len ( limits ) )
2018-09-27 11:45:19 +01:00
2019-02-05 10:54:25 +00:00
psCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2018-10-16 16:53:25 +01:00
2019-02-05 10:54:25 +00:00
start := time . Now ( )
2019-01-02 18:47:34 +00:00
2019-03-18 10:55:06 +00:00
for i , addressedLimit := range limits {
go func ( i int , addressedLimit * pb . AddressedOrderLimit ) {
hash , err := ec . putPiece ( psCtx , ctx , addressedLimit , readers [ i ] , expiration )
2019-02-25 15:57:54 +00:00
infos <- info { i : i , err : err , hash : hash }
2019-03-18 10:55:06 +00:00
} ( i , addressedLimit )
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2019-03-18 10:55:06 +00:00
successfulNodes = make ( [ ] * pb . Node , len ( limits ) )
successfulHashes = make ( [ ] * pb . PieceHash , len ( limits ) )
2019-02-05 10:54:25 +00:00
var successfulCount int32
var timer * time . Timer
2019-07-02 21:49:35 +01:00
var lastSuccess time . Time
var waitStart time . Time
2019-02-05 10:54:25 +00:00
2019-03-18 10:55:06 +00:00
for range limits {
2018-09-27 11:45:19 +01:00
info := <- infos
2019-03-19 13:14:59 +00:00
if limits [ info . i ] == nil {
continue
}
2019-03-18 10:55:06 +00:00
if info . err != nil {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Upload to storage node %s failed: %v" , limits [ info . i ] . GetLimit ( ) . StorageNodeId , info . err )
2019-03-18 10:55:06 +00:00
continue
}
successfulNodes [ info . i ] = & pb . Node {
Id : limits [ info . i ] . GetLimit ( ) . StorageNodeId ,
Address : limits [ info . i ] . GetStorageNodeAddress ( ) ,
}
successfulHashes [ info . i ] = info . hash
2019-07-02 21:49:35 +01:00
lastSuccess = time . Now ( )
2019-03-18 10:55:06 +00:00
switch int ( atomic . AddInt32 ( & successfulCount , 1 ) ) {
2019-06-19 21:13:11 +01:00
case rs . OptimalThreshold ( ) :
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Infof ( "Success threshold (%d nodes) reached. Canceling the long tail..." , rs . OptimalThreshold ( ) )
2019-06-19 21:13:11 +01:00
if timer != nil {
timer . Stop ( )
}
cancel ( )
case rs . RepairThreshold ( ) + 1 :
2019-07-02 21:49:35 +01:00
waitStart = time . Now ( )
elapsed := waitStart . Sub ( start )
2019-03-18 10:55:06 +00:00
more := elapsed * 3 / 2
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Repair threshold (%d nodes) passed in %.2f s. Starting a timer for %.2f s for reaching the success threshold (%d nodes)..." ,
2019-03-18 10:55:06 +00:00
rs . RepairThreshold ( ) , elapsed . Seconds ( ) , more . Seconds ( ) , rs . OptimalThreshold ( ) )
timer = time . AfterFunc ( more , func ( ) {
if ctx . Err ( ) != context . Canceled {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Timer expired. Successfully uploaded to %d nodes. Canceling the long tail..." , atomic . LoadInt32 ( & successfulCount ) )
2019-03-18 10:55:06 +00:00
cancel ( )
}
} )
2018-09-27 11:45:19 +01:00
}
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2019-03-04 13:48:13 +00:00
// Ensure timer is stopped in the case of repair threshold is reached, but
// not the success threshold due to errors instead of slowness.
if timer != nil {
timer . Stop ( )
}
2018-10-04 14:52:12 +01:00
defer func ( ) {
select {
case <- ctx . Done ( ) :
2019-03-19 13:14:59 +00:00
err = errs . Combine (
2018-10-04 14:52:12 +01:00
Error . New ( "upload cancelled by user" ) ,
2019-03-19 13:14:59 +00:00
// TODO: clean up the partially uploaded segment's pieces
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId),
2018-10-04 14:52:12 +01:00
)
default :
}
} ( )
2019-06-19 21:13:11 +01:00
successes := int ( atomic . LoadInt32 ( & successfulCount ) )
if successes <= rs . RepairThreshold ( ) && successes < rs . OptimalThreshold ( ) {
return nil , nil , Error . New ( "successful puts (%d) less than or equal to repair threshold (%d)" , successes , rs . RepairThreshold ( ) )
2018-09-27 11:45:19 +01:00
}
2019-07-02 21:49:35 +01:00
// Monitor what the best fraction would have been for this upload.
if ! lastSuccess . IsZero ( ) && ! waitStart . IsZero ( ) {
repairThreshold := waitStart . Sub ( start ) . Seconds ( )
extraDuration := lastSuccess . Sub ( waitStart ) . Seconds ( )
if extraDuration != 0 {
mon . FloatVal ( "repair_threshold" ) . Observe ( repairThreshold )
mon . FloatVal ( "extra_duration" ) . Observe ( extraDuration )
mon . FloatVal ( "optimal_fraction" ) . Observe ( extraDuration / repairThreshold )
}
}
2019-02-25 15:57:54 +00:00
return successfulNodes , successfulHashes , nil
2018-07-03 09:35:01 +01:00
}
2019-05-20 09:37:46 +01:00
func ( ec * ecClient ) Repair ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , rs eestream . RedundancyStrategy , data io . Reader , expiration time . Time , timeout time . Duration , path storj . Path ) ( successfulNodes [ ] * pb . Node , successfulHashes [ ] * pb . PieceHash , err error ) {
2019-03-19 13:14:59 +00:00
defer mon . Task ( ) ( & ctx ) ( & err )
if len ( limits ) != rs . TotalCount ( ) {
return nil , nil , Error . New ( "size of limits slice (%d) does not match total count (%d) of erasure scheme" , len ( limits ) , rs . TotalCount ( ) )
}
if ! unique ( limits ) {
return nil , nil , Error . New ( "duplicated nodes are not allowed" )
}
padded := eestream . PadReader ( ioutil . NopCloser ( data ) , rs . StripeSize ( ) )
readers , err := eestream . EncodeReader ( ctx , padded , rs )
if err != nil {
return nil , nil , err
}
type info struct {
i int
err error
hash * pb . PieceHash
}
infos := make ( chan info , len ( limits ) )
psCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
for i , addressedLimit := range limits {
go func ( i int , addressedLimit * pb . AddressedOrderLimit ) {
hash , err := ec . putPiece ( psCtx , ctx , addressedLimit , readers [ i ] , expiration )
infos <- info { i : i , err : err , hash : hash }
} ( i , addressedLimit )
}
successfulNodes = make ( [ ] * pb . Node , len ( limits ) )
successfulHashes = make ( [ ] * pb . PieceHash , len ( limits ) )
var successfulCount int32
// how many nodes must be repaired to reach the success threshold: o - (n - r)
optimalCount := rs . OptimalThreshold ( ) - ( rs . TotalCount ( ) - nonNilCount ( limits ) )
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Infof ( "Starting a timer for %s for repairing %s to %d nodes to reach the success threshold (%d nodes)..." ,
2019-05-20 09:37:46 +01:00
timeout , path , optimalCount , rs . OptimalThreshold ( ) )
2019-03-19 13:14:59 +00:00
timer := time . AfterFunc ( timeout , func ( ) {
if ctx . Err ( ) != context . Canceled {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Infof ( "Timer expired. Successfully repaired %s to %d nodes. Canceling the long tail..." , path , atomic . LoadInt32 ( & successfulCount ) )
2019-03-19 13:14:59 +00:00
cancel ( )
}
} )
for range limits {
info := <- infos
if limits [ info . i ] == nil {
continue
}
if info . err != nil {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Repair %s to storage node %s failed: %v" , path , limits [ info . i ] . GetLimit ( ) . StorageNodeId , info . err )
2019-03-19 13:14:59 +00:00
continue
}
successfulNodes [ info . i ] = & pb . Node {
Id : limits [ info . i ] . GetLimit ( ) . StorageNodeId ,
Address : limits [ info . i ] . GetStorageNodeAddress ( ) ,
}
successfulHashes [ info . i ] = info . hash
2019-05-21 13:23:00 +01:00
atomic . AddInt32 ( & successfulCount , 1 )
2019-03-19 13:14:59 +00:00
}
// Ensure timer is stopped in the case the success threshold is not reached
// due to errors instead of slowness.
if timer != nil {
timer . Stop ( )
}
// TODO: clean up the partially uploaded segment's pieces
defer func ( ) {
select {
case <- ctx . Done ( ) :
err = errs . Combine (
Error . New ( "repair cancelled" ) ,
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId), //TODO
)
default :
}
} ( )
2019-05-21 13:23:00 +01:00
if atomic . LoadInt32 ( & successfulCount ) == 0 {
return nil , nil , Error . New ( "repair %v to all nodes failed" , path )
2019-04-09 23:46:43 +01:00
}
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Infof ( "Successfully repaired %s to %d nodes." , path , atomic . LoadInt32 ( & successfulCount ) )
2019-05-20 15:18:16 +01:00
2019-03-19 13:14:59 +00:00
return successfulNodes , successfulHashes , nil
}
2019-03-18 10:55:06 +00:00
func ( ec * ecClient ) putPiece ( ctx , parent context . Context , limit * pb . AddressedOrderLimit , data io . ReadCloser , expiration time . Time ) ( hash * pb . PieceHash , err error ) {
2019-06-07 23:34:16 +01:00
nodeName := "nil"
if limit != nil {
nodeName = limit . GetLimit ( ) . StorageNodeId . String ( ) [ 0 : 8 ]
}
defer mon . Task ( ) ( & ctx , "node: " + nodeName ) ( & err )
2019-02-05 10:54:25 +00:00
defer func ( ) { err = errs . Combine ( err , data . Close ( ) ) } ( )
2019-03-18 10:55:06 +00:00
if limit == nil {
2019-03-19 13:14:59 +00:00
_ , _ = io . Copy ( ioutil . Discard , data )
return nil , nil
2019-02-05 10:54:25 +00:00
}
2019-03-18 10:55:06 +00:00
storageNodeID := limit . GetLimit ( ) . StorageNodeId
pieceID := limit . GetLimit ( ) . PieceId
2019-06-26 13:14:48 +01:00
ps , err := ec . dialPiecestore ( ctx , & pb . Node {
2019-03-18 10:55:06 +00:00
Id : storageNodeID ,
Address : limit . GetStorageNodeAddress ( ) ,
} )
2019-02-05 10:54:25 +00:00
if err != nil {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Failed dialing for putting piece %s to node %s: %v" , pieceID , storageNodeID , err )
2019-02-25 15:57:54 +00:00
return nil , err
2019-02-05 10:54:25 +00:00
}
2019-03-18 10:55:06 +00:00
defer func ( ) { err = errs . Combine ( err , ps . Close ( ) ) } ( )
upload , err := ps . Upload ( ctx , limit . GetLimit ( ) )
2019-02-05 10:54:25 +00:00
if err != nil {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Failed requesting upload of piece %s to node %s: %v" , pieceID , storageNodeID , err )
2019-02-25 15:57:54 +00:00
return nil , err
2019-02-05 10:54:25 +00:00
}
2019-03-18 10:55:06 +00:00
defer func ( ) {
if ctx . Err ( ) != nil || err != nil {
hash = nil
2019-06-05 14:47:01 +01:00
err = errs . Combine ( err , upload . Cancel ( ctx ) )
2019-03-18 10:55:06 +00:00
return
}
2019-06-05 14:47:01 +01:00
h , closeErr := upload . Commit ( ctx )
2019-03-18 10:55:06 +00:00
hash = h
err = errs . Combine ( err , closeErr )
} ( )
_ , err = sync2 . Copy ( ctx , upload , data )
2019-02-05 10:54:25 +00:00
// Canceled context means the piece upload was interrupted by user or due
// to slow connection. No error logging for this case.
if ctx . Err ( ) == context . Canceled {
if parent . Err ( ) == context . Canceled {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Infof ( "Upload to node %s canceled by user." , storageNodeID )
2019-02-05 10:54:25 +00:00
} else {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Node %s cut from upload due to slow connection." , storageNodeID )
2019-02-05 10:54:25 +00:00
}
err = context . Canceled
} else if err != nil {
nodeAddress := "nil"
2019-03-18 10:55:06 +00:00
if limit . GetStorageNodeAddress ( ) != nil {
nodeAddress = limit . GetStorageNodeAddress ( ) . GetAddress ( )
2019-02-05 10:54:25 +00:00
}
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Debugf ( "Failed uploading piece %s to node %s (%+v): %v" , pieceID , storageNodeID , nodeAddress , err )
2019-02-05 10:54:25 +00:00
}
2019-02-25 15:57:54 +00:00
return hash , err
2019-02-05 10:54:25 +00:00
}
2019-03-18 10:55:06 +00:00
func ( ec * ecClient ) Get ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , es eestream . ErasureScheme , size int64 ) ( rr ranger . Ranger , err error ) {
2018-07-03 09:35:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-09-11 05:52:14 +01:00
2019-03-18 10:55:06 +00:00
if len ( limits ) != es . TotalCount ( ) {
return nil , Error . New ( "size of limits slice (%d) does not match total count (%d) of erasure scheme" , len ( limits ) , es . TotalCount ( ) )
2018-12-11 16:05:14 +00:00
}
2019-03-18 10:55:06 +00:00
if nonNilCount ( limits ) < es . RequiredCount ( ) {
return nil , Error . New ( "number of non-nil limits (%d) is less than required count (%d) of erasure scheme" , nonNilCount ( limits ) , es . RequiredCount ( ) )
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2018-09-27 12:52:18 +01:00
paddedSize := calcPadded ( size , es . StripeSize ( ) )
2018-08-06 15:24:30 +01:00
pieceSize := paddedSize / int64 ( es . RequiredCount ( ) )
2018-09-27 11:45:19 +01:00
2019-03-18 10:55:06 +00:00
rrs := map [ int ] ranger . Ranger { }
for i , addressedLimit := range limits {
if addressedLimit == nil {
2018-09-27 11:45:19 +01:00
continue
}
2019-03-18 10:55:06 +00:00
rrs [ i ] = & lazyPieceRanger {
2019-06-26 13:14:48 +01:00
dialPiecestore : ec . dialPiecestore ,
limit : addressedLimit ,
size : pieceSize ,
2018-07-03 09:35:01 +01:00
}
}
2018-09-27 11:45:19 +01:00
2019-06-14 10:16:31 +01:00
rr , err = eestream . Decode ( rrs , es , ec . memoryLimit , ec . forceErrorDetection )
2018-08-06 15:24:30 +01:00
if err != nil {
return nil , err
}
2018-09-27 11:45:19 +01:00
2018-09-14 15:10:43 +01:00
return eestream . Unpad ( rr , int ( paddedSize - size ) )
2018-07-03 09:35:01 +01:00
}
2019-03-18 10:55:06 +00:00
func ( ec * ecClient ) Delete ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit ) ( err error ) {
2018-07-03 09:35:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-09-27 11:45:19 +01:00
2019-03-18 10:55:06 +00:00
errch := make ( chan error , len ( limits ) )
for _ , addressedLimit := range limits {
if addressedLimit == nil {
2019-01-29 20:42:27 +00:00
errch <- nil
2018-09-27 11:45:19 +01:00
continue
}
2019-03-18 10:55:06 +00:00
go func ( addressedLimit * pb . AddressedOrderLimit ) {
limit := addressedLimit . GetLimit ( )
2019-06-26 13:14:48 +01:00
ps , err := ec . dialPiecestore ( ctx , & pb . Node {
2019-03-18 10:55:06 +00:00
Id : limit . StorageNodeId ,
Address : addressedLimit . GetStorageNodeAddress ( ) ,
} )
2018-07-16 20:22:34 +01:00
if err != nil {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Errorf ( "Failed dialing for deleting piece %s from node %s: %v" , limit . PieceId , limit . StorageNodeId , err )
2019-01-29 20:42:27 +00:00
errch <- err
2018-07-16 20:22:34 +01:00
return
}
2019-03-18 10:55:06 +00:00
err = ps . Delete ( ctx , limit )
2019-01-29 20:42:27 +00:00
err = errs . Combine ( err , ps . Close ( ) )
2018-07-03 09:35:01 +01:00
if err != nil {
2019-07-02 11:08:02 +01:00
ec . log . Sugar ( ) . Errorf ( "Failed deleting piece %s from node %s: %v" , limit . PieceId , limit . StorageNodeId , err )
2018-07-03 09:35:01 +01:00
}
2019-01-29 20:42:27 +00:00
errch <- err
2019-03-18 10:55:06 +00:00
} ( addressedLimit )
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2019-03-18 10:55:06 +00:00
allerrs := collectErrors ( errch , len ( limits ) )
if len ( allerrs ) > 0 && len ( allerrs ) == len ( limits ) {
2018-07-03 09:35:01 +01:00
return allerrs [ 0 ]
}
2018-09-27 11:45:19 +01:00
2018-07-03 09:35:01 +01:00
return nil
}
func collectErrors ( errs <- chan error , size int ) [ ] error {
var result [ ] error
for i := 0 ; i < size ; i ++ {
err := <- errs
if err != nil {
result = append ( result , err )
}
}
return result
}
2018-07-16 20:22:34 +01:00
2019-03-18 10:55:06 +00:00
func unique ( limits [ ] * pb . AddressedOrderLimit ) bool {
if len ( limits ) < 2 {
2018-08-02 16:12:19 +01:00
return true
}
2019-03-18 10:55:06 +00:00
ids := make ( storj . NodeIDList , len ( limits ) )
for i , addressedLimit := range limits {
if addressedLimit != nil {
ids [ i ] = addressedLimit . GetLimit ( ) . StorageNodeId
2018-11-29 18:39:27 +00:00
}
2018-08-02 16:12:19 +01:00
}
// sort the ids and check for identical neighbors
2018-11-29 18:39:27 +00:00
sort . Sort ( ids )
// sort.Slice(ids, func(i, k int) bool { return ids[i].Less(ids[k]) })
2018-08-02 16:12:19 +01:00
for i := 1 ; i < len ( ids ) ; i ++ {
2018-11-29 18:39:27 +00:00
if ids [ i ] != ( storj . NodeID { } ) && ids [ i ] == ids [ i - 1 ] {
2018-08-02 16:12:19 +01:00
return false
}
}
return true
}
2018-08-06 15:24:30 +01:00
func calcPadded ( size int64 , blockSize int ) int64 {
mod := size % int64 ( blockSize )
if mod == 0 {
return size
}
return size + int64 ( blockSize ) - mod
}
2018-09-08 14:52:19 +01:00
type lazyPieceRanger struct {
2019-06-26 13:14:48 +01:00
dialPiecestore dialPiecestoreFunc
limit * pb . AddressedOrderLimit
size int64
2018-09-08 14:52:19 +01:00
}
// Size implements Ranger.Size
func ( lr * lazyPieceRanger ) Size ( ) int64 {
return lr . size
}
// Range implements Ranger.Range to be lazily connected
2019-06-04 12:36:27 +01:00
func ( lr * lazyPieceRanger ) Range ( ctx context . Context , offset , length int64 ) ( _ io . ReadCloser , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-06-26 13:14:48 +01:00
ps , err := lr . dialPiecestore ( ctx , & pb . Node {
2019-03-18 10:55:06 +00:00
Id : lr . limit . GetLimit ( ) . StorageNodeId ,
Address : lr . limit . GetStorageNodeAddress ( ) ,
} )
if err != nil {
return nil , err
2018-09-08 14:52:19 +01:00
}
2019-04-10 23:27:04 +01:00
download , err := ps . Download ( ctx , lr . limit . GetLimit ( ) , offset , length )
if err != nil {
return nil , errs . Combine ( err , ps . Close ( ) )
}
return & clientCloser { download , ps } , nil
}
type clientCloser struct {
piecestore . Downloader
client * piecestore . Client
}
func ( client * clientCloser ) Close ( ) error {
return errs . Combine (
client . Downloader . Close ( ) ,
client . client . Close ( ) ,
)
2018-09-08 14:52:19 +01:00
}
2018-11-02 15:22:01 +00:00
2019-03-18 10:55:06 +00:00
func nonNilCount ( limits [ ] * pb . AddressedOrderLimit ) int {
2018-11-02 15:22:01 +00:00
total := 0
2019-03-18 10:55:06 +00:00
for _ , limit := range limits {
if limit != nil {
2018-11-02 15:22:01 +00:00
total ++
}
}
return total
}