2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-07-03 09:35:01 +01:00
// See LICENSE for copying information.
package ecclient
import (
"context"
"io"
2018-08-06 15:24:30 +01:00
"io/ioutil"
2018-08-02 16:12:19 +01:00
"sort"
2019-08-09 16:01:40 +01:00
"sync"
2018-07-03 09:35:01 +01:00
"time"
2019-01-29 20:42:27 +00:00
"github.com/zeebo/errs"
2018-07-03 09:35:01 +01:00
"go.uber.org/zap"
2019-01-22 15:48:23 +00:00
monkit "gopkg.in/spacemonkeygo/monkit.v2"
2018-07-03 09:35:01 +01:00
2019-08-05 15:46:32 +01:00
"storj.io/storj/internal/errs2"
2019-10-29 22:47:28 +00:00
"storj.io/storj/internal/groupcancel"
2019-03-18 10:55:06 +00:00
"storj.io/storj/internal/sync2"
2019-10-25 18:16:20 +01:00
"storj.io/storj/pkg/identity"
2018-09-18 05:39:06 +01:00
"storj.io/storj/pkg/pb"
2018-07-03 09:35:01 +01:00
"storj.io/storj/pkg/ranger"
2019-09-19 05:46:39 +01:00
"storj.io/storj/pkg/rpc"
2018-11-30 13:40:13 +00:00
"storj.io/storj/pkg/storj"
2019-07-28 06:55:36 +01:00
"storj.io/storj/uplink/eestream"
2019-03-18 10:55:06 +00:00
"storj.io/storj/uplink/piecestore"
2018-07-03 09:35:01 +01:00
)
var mon = monkit . Package ( )
// Client defines an interface for storing erasure coded data to piece store nodes
type Client interface {
2019-07-11 21:51:40 +01:00
Put ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , rs eestream . RedundancyStrategy , data io . Reader , expiration time . Time ) ( successfulNodes [ ] * pb . Node , successfulHashes [ ] * pb . PieceHash , err error )
Get ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , es eestream . ErasureScheme , size int64 ) ( ranger . Ranger , error )
Delete ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey ) error
2019-06-14 10:16:31 +01:00
WithForceErrorDetection ( force bool ) Client
2019-10-22 21:42:21 +01:00
// PutPiece is not intended to be used by normal uplinks directly, but is exported to support storagenode graceful exit transfers.
2019-10-30 15:35:00 +00:00
PutPiece ( ctx , parent context . Context , limit * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , data io . ReadCloser ) ( hash * pb . PieceHash , id * identity . PeerIdentity , err error )
2018-07-03 09:35:01 +01:00
}
2019-06-26 13:14:48 +01:00
type dialPiecestoreFunc func ( context . Context , * pb . Node ) ( * piecestore . Client , error )
2018-07-03 09:35:01 +01:00
2018-11-06 17:49:17 +00:00
type ecClient struct {
2019-07-02 11:08:02 +01:00
log * zap . Logger
2019-09-19 05:46:39 +01:00
dialer rpc . Dialer
2019-06-14 10:16:31 +01:00
memoryLimit int
forceErrorDetection bool
2018-07-03 09:35:01 +01:00
}
2018-11-06 17:49:17 +00:00
// NewClient from the given identity and max buffer memory
2019-09-19 05:46:39 +01:00
func NewClient ( log * zap . Logger , dialer rpc . Dialer , memoryLimit int ) Client {
2018-11-06 17:49:17 +00:00
return & ecClient {
2019-07-02 11:08:02 +01:00
log : log ,
2019-09-19 05:46:39 +01:00
dialer : dialer ,
2019-03-18 10:55:06 +00:00
memoryLimit : memoryLimit ,
2018-07-03 09:35:01 +01:00
}
}
2019-06-14 10:16:31 +01:00
func ( ec * ecClient ) WithForceErrorDetection ( force bool ) Client {
ec . forceErrorDetection = force
return ec
}
2019-06-26 13:14:48 +01:00
func ( ec * ecClient ) dialPiecestore ( ctx context . Context , n * pb . Node ) ( * piecestore . Client , error ) {
2019-07-02 11:08:02 +01:00
logger := ec . log . Named ( n . Id . String ( ) )
2019-09-19 05:46:39 +01:00
return piecestore . Dial ( ctx , ec . dialer , n , logger , piecestore . DefaultConfig )
2018-07-03 09:35:01 +01:00
}
2019-07-11 21:51:40 +01:00
func ( ec * ecClient ) Put ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , rs eestream . RedundancyStrategy , data io . Reader , expiration time . Time ) ( successfulNodes [ ] * pb . Node , successfulHashes [ ] * pb . PieceHash , err error ) {
2018-07-03 09:35:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-03-19 13:14:59 +00:00
2019-08-05 15:46:32 +01:00
pieceCount := len ( limits )
if pieceCount != rs . TotalCount ( ) {
return nil , nil , Error . New ( "size of limits slice (%d) does not match total count (%d) of erasure scheme" , pieceCount , rs . TotalCount ( ) )
2018-07-03 09:35:01 +01:00
}
2018-12-11 16:05:14 +00:00
2019-06-19 21:13:11 +01:00
nonNilLimits := nonNilCount ( limits )
if nonNilLimits <= rs . RepairThreshold ( ) && nonNilLimits < rs . OptimalThreshold ( ) {
return nil , nil , Error . New ( "number of non-nil limits (%d) is less than or equal to the repair threshold (%d) of erasure scheme" , nonNilLimits , rs . RepairThreshold ( ) )
2018-12-11 16:05:14 +00:00
}
2019-03-18 10:55:06 +00:00
if ! unique ( limits ) {
2019-02-25 15:57:54 +00:00
return nil , nil , Error . New ( "duplicated nodes are not allowed" )
2018-08-02 16:12:19 +01:00
}
2018-09-27 11:45:19 +01:00
2019-08-30 22:00:34 +01:00
ec . log . Debug ( "Uploading to storage nodes" ,
zap . Int ( "Erasure Share Size" , rs . ErasureShareSize ( ) ) ,
zap . Int ( "Stripe Size" , rs . StripeSize ( ) ) ,
zap . Int ( "Repair Threshold" , rs . RepairThreshold ( ) ) ,
zap . Int ( "Optimal Threshold" , rs . OptimalThreshold ( ) ) ,
)
2019-06-06 20:51:00 +01:00
2018-09-27 12:52:18 +01:00
padded := eestream . PadReader ( ioutil . NopCloser ( data ) , rs . StripeSize ( ) )
2019-07-31 15:38:44 +01:00
readers , err := eestream . EncodeReader ( ctx , ec . log , padded , rs )
2018-07-03 09:35:01 +01:00
if err != nil {
2019-02-25 15:57:54 +00:00
return nil , nil , err
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
type info struct {
2019-02-25 15:57:54 +00:00
i int
err error
2019-03-18 10:55:06 +00:00
hash * pb . PieceHash
2018-09-27 11:45:19 +01:00
}
2019-08-05 15:46:32 +01:00
infos := make ( chan info , pieceCount )
2018-09-27 11:45:19 +01:00
2019-02-05 10:54:25 +00:00
psCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2018-10-16 16:53:25 +01:00
2019-03-18 10:55:06 +00:00
for i , addressedLimit := range limits {
go func ( i int , addressedLimit * pb . AddressedOrderLimit ) {
2019-10-30 15:35:00 +00:00
hash , _ , err := ec . PutPiece ( psCtx , ctx , addressedLimit , privateKey , readers [ i ] )
2019-02-25 15:57:54 +00:00
infos <- info { i : i , err : err , hash : hash }
2019-03-18 10:55:06 +00:00
} ( i , addressedLimit )
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2019-08-05 15:46:32 +01:00
successfulNodes = make ( [ ] * pb . Node , pieceCount )
successfulHashes = make ( [ ] * pb . PieceHash , pieceCount )
2019-08-14 20:40:26 +01:00
var successfulCount , failureCount , cancellationCount int32
2019-03-18 10:55:06 +00:00
for range limits {
2018-09-27 11:45:19 +01:00
info := <- infos
2019-03-19 13:14:59 +00:00
if limits [ info . i ] == nil {
continue
}
2019-03-18 10:55:06 +00:00
if info . err != nil {
2019-08-05 15:46:32 +01:00
if ! errs2 . IsCanceled ( info . err ) {
2019-08-14 20:40:26 +01:00
failureCount ++
2019-08-05 15:46:32 +01:00
} else {
2019-08-14 20:40:26 +01:00
cancellationCount ++
2019-08-05 15:46:32 +01:00
}
2019-08-30 22:00:34 +01:00
ec . log . Debug ( "Upload to storage node failed" ,
zap . String ( "NodeID" , limits [ info . i ] . GetLimit ( ) . StorageNodeId . String ( ) ) ,
zap . Error ( info . err ) ,
)
2019-03-18 10:55:06 +00:00
continue
}
successfulNodes [ info . i ] = & pb . Node {
Id : limits [ info . i ] . GetLimit ( ) . StorageNodeId ,
Address : limits [ info . i ] . GetStorageNodeAddress ( ) ,
}
successfulHashes [ info . i ] = info . hash
2019-07-03 16:00:24 +01:00
2019-09-13 11:47:35 +01:00
successfulCount ++
2019-07-03 16:00:24 +01:00
if int ( successfulCount ) >= rs . OptimalThreshold ( ) {
2019-09-13 11:04:12 +01:00
ec . log . Debug ( "Success threshold reached. Cancelling remaining uploads." ,
2019-08-30 22:00:34 +01:00
zap . Int ( "Optimal Threshold" , rs . OptimalThreshold ( ) ) ,
)
2019-06-19 21:13:11 +01:00
cancel ( )
2018-09-27 11:45:19 +01:00
}
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2018-10-04 14:52:12 +01:00
defer func ( ) {
select {
case <- ctx . Done ( ) :
2019-08-27 17:07:12 +01:00
err = Error . New ( "upload cancelled by user" )
// TODO: clean up the partially uploaded segment's pieces
// ec.Delete(context.Background(), nodes, pieceID, pba.SatelliteId),
2018-10-04 14:52:12 +01:00
default :
}
} ( )
2019-08-14 20:40:26 +01:00
mon . IntVal ( "put_segment_pieces_total" ) . Observe ( int64 ( pieceCount ) )
mon . IntVal ( "put_segment_pieces_optimal" ) . Observe ( int64 ( rs . OptimalThreshold ( ) ) )
2019-09-13 11:47:35 +01:00
mon . IntVal ( "put_segment_pieces_successful" ) . Observe ( int64 ( successfulCount ) )
2019-08-14 20:40:26 +01:00
mon . IntVal ( "put_segment_pieces_failed" ) . Observe ( int64 ( failureCount ) )
mon . IntVal ( "put_segment_pieces_canceled" ) . Observe ( int64 ( cancellationCount ) )
2019-08-05 15:46:32 +01:00
2019-09-13 11:47:35 +01:00
if int ( successfulCount ) <= rs . RepairThreshold ( ) && int ( successfulCount ) < rs . OptimalThreshold ( ) {
return nil , nil , Error . New ( "successful puts (%d) less than or equal to repair threshold (%d)" , successfulCount , rs . RepairThreshold ( ) )
2018-09-27 11:45:19 +01:00
}
2019-09-13 11:47:35 +01:00
if int ( successfulCount ) < rs . OptimalThreshold ( ) {
return nil , nil , Error . New ( "successful puts (%d) less than success threshold (%d)" , successfulCount , rs . OptimalThreshold ( ) )
2019-07-02 21:49:35 +01:00
}
2019-02-25 15:57:54 +00:00
return successfulNodes , successfulHashes , nil
2018-07-03 09:35:01 +01:00
}
2019-10-30 15:35:00 +00:00
func ( ec * ecClient ) PutPiece ( ctx , parent context . Context , limit * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , data io . ReadCloser ) ( hash * pb . PieceHash , peerID * identity . PeerIdentity , err error ) {
2019-06-07 23:34:16 +01:00
nodeName := "nil"
if limit != nil {
nodeName = limit . GetLimit ( ) . StorageNodeId . String ( ) [ 0 : 8 ]
}
defer mon . Task ( ) ( & ctx , "node: " + nodeName ) ( & err )
2019-02-05 10:54:25 +00:00
defer func ( ) { err = errs . Combine ( err , data . Close ( ) ) } ( )
2019-03-18 10:55:06 +00:00
if limit == nil {
2019-03-19 13:14:59 +00:00
_ , _ = io . Copy ( ioutil . Discard , data )
2019-10-25 18:16:20 +01:00
return nil , nil , nil
2019-02-05 10:54:25 +00:00
}
2019-03-18 10:55:06 +00:00
storageNodeID := limit . GetLimit ( ) . StorageNodeId
pieceID := limit . GetLimit ( ) . PieceId
2019-06-26 13:14:48 +01:00
ps , err := ec . dialPiecestore ( ctx , & pb . Node {
2019-03-18 10:55:06 +00:00
Id : storageNodeID ,
Address : limit . GetStorageNodeAddress ( ) ,
} )
2019-02-05 10:54:25 +00:00
if err != nil {
2019-07-30 17:58:08 +01:00
ec . log . Debug ( "Failed dialing for putting piece to node" ,
2019-08-30 22:00:34 +01:00
zap . String ( "PieceID" , pieceID . String ( ) ) ,
zap . String ( "NodeID" , storageNodeID . String ( ) ) ,
2019-07-30 17:58:08 +01:00
zap . Error ( err ) ,
)
2019-10-25 18:16:20 +01:00
return nil , nil , err
2019-02-05 10:54:25 +00:00
}
2019-03-18 10:55:06 +00:00
defer func ( ) { err = errs . Combine ( err , ps . Close ( ) ) } ( )
2019-10-25 18:16:20 +01:00
peerID , err = ps . GetPeerIdentity ( )
if err != nil {
ec . log . Debug ( "Failed getting peer identity from node connection" ,
zap . String ( "NodeID" , storageNodeID . String ( ) ) ,
zap . Error ( err ) ,
)
return nil , nil , err
}
2019-07-11 21:51:40 +01:00
upload , err := ps . Upload ( ctx , limit . GetLimit ( ) , privateKey )
2019-02-05 10:54:25 +00:00
if err != nil {
2019-07-30 17:58:08 +01:00
ec . log . Debug ( "Failed requesting upload of pieces to node" ,
2019-08-30 22:00:34 +01:00
zap . String ( "PieceID" , pieceID . String ( ) ) ,
zap . String ( "NodeID" , storageNodeID . String ( ) ) ,
2019-07-30 17:58:08 +01:00
zap . Error ( err ) ,
)
2019-10-25 18:16:20 +01:00
return nil , nil , err
2019-02-05 10:54:25 +00:00
}
2019-10-16 17:05:22 +01:00
2019-03-18 10:55:06 +00:00
defer func ( ) {
2019-10-16 17:05:22 +01:00
if err != nil {
2019-06-05 14:47:01 +01:00
err = errs . Combine ( err , upload . Cancel ( ctx ) )
2019-03-18 10:55:06 +00:00
return
}
2019-10-16 17:05:22 +01:00
hash , err = upload . Commit ( ctx )
2019-03-18 10:55:06 +00:00
} ( )
_ , err = sync2 . Copy ( ctx , upload , data )
2019-02-05 10:54:25 +00:00
// Canceled context means the piece upload was interrupted by user or due
// to slow connection. No error logging for this case.
2019-10-16 17:05:22 +01:00
if err != nil {
if errs2 . IsCanceled ( err ) {
if parent . Err ( ) == context . Canceled {
ec . log . Info ( "Upload to node canceled by user" , zap . Stringer ( "NodeID" , storageNodeID ) )
} else {
ec . log . Debug ( "Node cut from upload due to slow connection" , zap . Stringer ( "NodeID" , storageNodeID ) )
}
2019-02-05 10:54:25 +00:00
} else {
2019-10-16 17:05:22 +01:00
nodeAddress := ""
if limit . GetStorageNodeAddress ( ) != nil {
nodeAddress = limit . GetStorageNodeAddress ( ) . GetAddress ( )
}
ec . log . Debug ( "Failed uploading piece to node" ,
zap . Stringer ( "PieceID" , pieceID ) ,
zap . Stringer ( "NodeID" , storageNodeID ) ,
zap . String ( "Node Address" , nodeAddress ) ,
zap . Error ( err ) ,
)
2019-02-05 10:54:25 +00:00
}
2019-07-30 17:58:08 +01:00
2019-10-25 18:16:20 +01:00
return nil , nil , err
2019-02-05 10:54:25 +00:00
}
2019-10-25 18:16:20 +01:00
return nil , peerID , nil
2019-02-05 10:54:25 +00:00
}
2019-07-11 21:51:40 +01:00
func ( ec * ecClient ) Get ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey , es eestream . ErasureScheme , size int64 ) ( rr ranger . Ranger , err error ) {
2018-07-03 09:35:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-09-11 05:52:14 +01:00
2019-03-18 10:55:06 +00:00
if len ( limits ) != es . TotalCount ( ) {
return nil , Error . New ( "size of limits slice (%d) does not match total count (%d) of erasure scheme" , len ( limits ) , es . TotalCount ( ) )
2018-12-11 16:05:14 +00:00
}
2019-03-18 10:55:06 +00:00
if nonNilCount ( limits ) < es . RequiredCount ( ) {
return nil , Error . New ( "number of non-nil limits (%d) is less than required count (%d) of erasure scheme" , nonNilCount ( limits ) , es . RequiredCount ( ) )
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2018-09-27 12:52:18 +01:00
paddedSize := calcPadded ( size , es . StripeSize ( ) )
2018-08-06 15:24:30 +01:00
pieceSize := paddedSize / int64 ( es . RequiredCount ( ) )
2018-09-27 11:45:19 +01:00
2019-03-18 10:55:06 +00:00
rrs := map [ int ] ranger . Ranger { }
for i , addressedLimit := range limits {
if addressedLimit == nil {
2018-09-27 11:45:19 +01:00
continue
}
2019-03-18 10:55:06 +00:00
rrs [ i ] = & lazyPieceRanger {
2019-06-26 13:14:48 +01:00
dialPiecestore : ec . dialPiecestore ,
limit : addressedLimit ,
2019-07-11 21:51:40 +01:00
privateKey : privateKey ,
2019-06-26 13:14:48 +01:00
size : pieceSize ,
2018-07-03 09:35:01 +01:00
}
}
2018-09-27 11:45:19 +01:00
2019-07-31 15:38:44 +01:00
rr , err = eestream . Decode ( ec . log , rrs , es , ec . memoryLimit , ec . forceErrorDetection )
2018-08-06 15:24:30 +01:00
if err != nil {
2019-07-23 15:28:06 +01:00
return nil , Error . Wrap ( err )
2018-08-06 15:24:30 +01:00
}
2018-09-27 11:45:19 +01:00
2019-07-23 15:28:06 +01:00
ranger , err := eestream . Unpad ( rr , int ( paddedSize - size ) )
return ranger , Error . Wrap ( err )
2018-07-03 09:35:01 +01:00
}
2019-07-11 21:51:40 +01:00
func ( ec * ecClient ) Delete ( ctx context . Context , limits [ ] * pb . AddressedOrderLimit , privateKey storj . PiecePrivateKey ) ( err error ) {
2018-07-03 09:35:01 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2018-09-27 11:45:19 +01:00
2019-10-29 22:47:28 +00:00
setLimits := 0
for _ , addressedLimit := range limits {
if addressedLimit != nil {
setLimits ++
}
}
gctx , cancel := groupcancel . NewContext ( ctx , setLimits , .75 , 2 )
defer cancel ( )
errch := make ( chan error , setLimits )
2019-03-18 10:55:06 +00:00
for _ , addressedLimit := range limits {
if addressedLimit == nil {
2018-09-27 11:45:19 +01:00
continue
}
2019-03-18 10:55:06 +00:00
go func ( addressedLimit * pb . AddressedOrderLimit ) {
limit := addressedLimit . GetLimit ( )
2019-10-29 22:47:28 +00:00
ps , err := ec . dialPiecestore ( gctx , & pb . Node {
2019-03-18 10:55:06 +00:00
Id : limit . StorageNodeId ,
Address : addressedLimit . GetStorageNodeAddress ( ) ,
} )
2018-07-16 20:22:34 +01:00
if err != nil {
2019-08-30 22:00:34 +01:00
ec . log . Debug ( "Failed dialing for deleting piece from node" ,
zap . String ( "PieceID" , limit . PieceId . String ( ) ) ,
zap . String ( "NodeID" , limit . StorageNodeId . String ( ) ) ,
zap . Error ( err ) ,
)
2019-01-29 20:42:27 +00:00
errch <- err
2018-07-16 20:22:34 +01:00
return
}
2019-10-29 22:47:28 +00:00
err = ps . Delete ( gctx , limit , privateKey )
2019-01-29 20:42:27 +00:00
err = errs . Combine ( err , ps . Close ( ) )
2018-07-03 09:35:01 +01:00
if err != nil {
2019-08-30 22:00:34 +01:00
ec . log . Debug ( "Failed deleting piece from node" ,
zap . String ( "PieceID" , limit . PieceId . String ( ) ) ,
zap . String ( "NodeID" , limit . StorageNodeId . String ( ) ) ,
zap . Error ( err ) ,
)
2018-07-03 09:35:01 +01:00
}
2019-01-29 20:42:27 +00:00
errch <- err
2019-03-18 10:55:06 +00:00
} ( addressedLimit )
2018-07-03 09:35:01 +01:00
}
2018-09-27 11:45:19 +01:00
2019-10-29 22:47:28 +00:00
var anySuccess bool
var lastErr error
for i := 0 ; i < setLimits ; i ++ {
if err := <- errch ; err == nil {
gctx . Success ( )
anySuccess = true
} else {
gctx . Failure ( )
lastErr = err
2018-07-03 09:35:01 +01:00
}
}
2019-10-29 22:47:28 +00:00
if anySuccess {
return nil
}
return lastErr
2018-07-03 09:35:01 +01:00
}
2018-07-16 20:22:34 +01:00
2019-03-18 10:55:06 +00:00
func unique ( limits [ ] * pb . AddressedOrderLimit ) bool {
if len ( limits ) < 2 {
2018-08-02 16:12:19 +01:00
return true
}
2019-03-18 10:55:06 +00:00
ids := make ( storj . NodeIDList , len ( limits ) )
for i , addressedLimit := range limits {
if addressedLimit != nil {
ids [ i ] = addressedLimit . GetLimit ( ) . StorageNodeId
2018-11-29 18:39:27 +00:00
}
2018-08-02 16:12:19 +01:00
}
// sort the ids and check for identical neighbors
2018-11-29 18:39:27 +00:00
sort . Sort ( ids )
// sort.Slice(ids, func(i, k int) bool { return ids[i].Less(ids[k]) })
2018-08-02 16:12:19 +01:00
for i := 1 ; i < len ( ids ) ; i ++ {
2018-11-29 18:39:27 +00:00
if ids [ i ] != ( storj . NodeID { } ) && ids [ i ] == ids [ i - 1 ] {
2018-08-02 16:12:19 +01:00
return false
}
}
return true
}
2018-08-06 15:24:30 +01:00
func calcPadded ( size int64 , blockSize int ) int64 {
mod := size % int64 ( blockSize )
if mod == 0 {
return size
}
return size + int64 ( blockSize ) - mod
}
2018-09-08 14:52:19 +01:00
type lazyPieceRanger struct {
2019-06-26 13:14:48 +01:00
dialPiecestore dialPiecestoreFunc
limit * pb . AddressedOrderLimit
2019-07-11 21:51:40 +01:00
privateKey storj . PiecePrivateKey
2019-06-26 13:14:48 +01:00
size int64
2018-09-08 14:52:19 +01:00
}
// Size implements Ranger.Size
func ( lr * lazyPieceRanger ) Size ( ) int64 {
return lr . size
}
// Range implements Ranger.Range to be lazily connected
2019-06-04 12:36:27 +01:00
func ( lr * lazyPieceRanger ) Range ( ctx context . Context , offset , length int64 ) ( _ io . ReadCloser , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-08-09 16:01:40 +01:00
return & lazyPieceReader {
ranger : lr ,
ctx : ctx ,
offset : offset ,
length : length ,
} , nil
}
type lazyPieceReader struct {
ranger * lazyPieceRanger
ctx context . Context
offset int64
length int64
2019-08-14 15:03:51 +01:00
mu sync . Mutex
2019-08-09 16:01:40 +01:00
isClosed bool
piecestore . Downloader
2019-08-14 15:03:51 +01:00
client * piecestore . Client
2019-08-09 16:01:40 +01:00
}
func ( lr * lazyPieceReader ) Read ( data [ ] byte ) ( _ int , err error ) {
2019-08-14 15:03:51 +01:00
lr . mu . Lock ( )
defer lr . mu . Unlock ( )
2019-08-09 16:01:40 +01:00
if lr . isClosed {
return 0 , io . EOF
}
if lr . Downloader == nil {
client , downloader , err := lr . ranger . dial ( lr . ctx , lr . offset , lr . length )
if err != nil {
return 0 , err
}
lr . Downloader = downloader
2019-08-14 15:03:51 +01:00
lr . client = client
2019-08-09 16:01:40 +01:00
}
return lr . Downloader . Read ( data )
}
func ( lr * lazyPieceRanger ) dial ( ctx context . Context , offset , length int64 ) ( _ * piecestore . Client , _ piecestore . Downloader , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-06-26 13:14:48 +01:00
ps , err := lr . dialPiecestore ( ctx , & pb . Node {
2019-03-18 10:55:06 +00:00
Id : lr . limit . GetLimit ( ) . StorageNodeId ,
Address : lr . limit . GetStorageNodeAddress ( ) ,
} )
if err != nil {
2019-08-09 16:01:40 +01:00
return nil , nil , err
2018-09-08 14:52:19 +01:00
}
2019-04-10 23:27:04 +01:00
2019-07-11 21:51:40 +01:00
download , err := ps . Download ( ctx , lr . limit . GetLimit ( ) , lr . privateKey , offset , length )
2019-04-10 23:27:04 +01:00
if err != nil {
2019-08-09 16:01:40 +01:00
return nil , nil , errs . Combine ( err , ps . Close ( ) )
2019-04-10 23:27:04 +01:00
}
2019-08-09 16:01:40 +01:00
return ps , download , nil
2019-04-10 23:27:04 +01:00
}
2019-08-09 16:01:40 +01:00
func ( lr * lazyPieceReader ) Close ( ) ( err error ) {
lr . mu . Lock ( )
defer lr . mu . Unlock ( )
2019-04-10 23:27:04 +01:00
2019-08-09 16:01:40 +01:00
if lr . isClosed {
return nil
}
lr . isClosed = true
2019-08-14 15:03:51 +01:00
if lr . Downloader != nil {
err = errs . Combine ( err , lr . Downloader . Close ( ) )
}
if lr . client != nil {
err = errs . Combine ( err , lr . client . Close ( ) )
2019-08-09 16:01:40 +01:00
}
return err
2018-09-08 14:52:19 +01:00
}
2018-11-02 15:22:01 +00:00
2019-03-18 10:55:06 +00:00
func nonNilCount ( limits [ ] * pb . AddressedOrderLimit ) int {
2018-11-02 15:22:01 +00:00
total := 0
2019-03-18 10:55:06 +00:00
for _ , limit := range limits {
if limit != nil {
2018-11-02 15:22:01 +00:00
total ++
}
}
return total
}