2019-10-11 22:18:05 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
2019-10-24 20:38:40 +01:00
"bytes"
2019-10-11 22:18:05 +01:00
"context"
"io"
"sync"
"time"
2019-10-24 20:38:40 +01:00
"github.com/zeebo/errs"
2019-10-11 22:18:05 +01:00
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
2019-10-29 18:40:42 +00:00
"storj.io/storj/internal/sync2"
2019-10-11 22:18:05 +01:00
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc/rpcstatus"
2019-10-24 20:38:40 +01:00
"storj.io/storj/pkg/signing"
2019-10-11 22:18:05 +01:00
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/uplink/eestream"
)
// millis for the transfer queue building ticker
const buildQueueMillis = 100
2019-10-24 20:38:40 +01:00
var (
// ErrInvalidArgument is an error class for invalid argument errors used to check which rpc code to use.
ErrInvalidArgument = errs . Class ( "graceful exit" )
)
2019-10-11 22:18:05 +01:00
// drpcEndpoint wraps streaming methods so that they can be used with drpc
type drpcEndpoint struct { * Endpoint }
// processStream is the minimum interface required to process requests.
type processStream interface {
Context ( ) context . Context
Send ( * pb . SatelliteMessage ) error
Recv ( ) ( * pb . StorageNodeMessage , error )
}
// Endpoint for handling the transfer of pieces for Graceful Exit.
type Endpoint struct {
2019-10-24 20:38:40 +01:00
log * zap . Logger
interval time . Duration
2019-10-25 21:36:26 +01:00
signer signing . Signer
2019-10-24 20:38:40 +01:00
db DB
overlaydb overlay . DB
overlay * overlay . Service
metainfo * metainfo . Service
orders * orders . Service
2019-10-29 17:23:17 +00:00
connections * connectionsTracker
2019-10-24 20:38:40 +01:00
peerIdentities overlay . PeerIdentities
config Config
2019-10-30 17:40:57 +00:00
recvTimeout time . Duration
2019-10-11 22:18:05 +01:00
}
type pendingTransfer struct {
path [ ] byte
pieceSize int64
satelliteMessage * pb . SatelliteMessage
2019-10-28 18:43:46 +00:00
originalPointer * pb . Pointer
2019-10-24 20:38:40 +01:00
pieceNum int32
2019-10-11 22:18:05 +01:00
}
// pendingMap for managing concurrent access to the pending transfer map.
type pendingMap struct {
mu sync . RWMutex
data map [ storj . PieceID ] * pendingTransfer
}
// newPendingMap creates a new pendingMap and instantiates the map.
func newPendingMap ( ) * pendingMap {
newData := make ( map [ storj . PieceID ] * pendingTransfer )
return & pendingMap {
data : newData ,
}
}
// put adds to the map.
func ( pm * pendingMap ) put ( pieceID storj . PieceID , pendingTransfer * pendingTransfer ) {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
pm . data [ pieceID ] = pendingTransfer
}
// get returns the pending transfer item from the map, if it exists.
func ( pm * pendingMap ) get ( pieceID storj . PieceID ) ( pendingTransfer * pendingTransfer , ok bool ) {
pm . mu . RLock ( )
defer pm . mu . RUnlock ( )
pendingTransfer , ok = pm . data [ pieceID ]
return pendingTransfer , ok
}
// length returns the number of elements in the map.
func ( pm * pendingMap ) length ( ) int {
pm . mu . RLock ( )
defer pm . mu . RUnlock ( )
return len ( pm . data )
}
// delete removes the pending transfer item from the map.
func ( pm * pendingMap ) delete ( pieceID storj . PieceID ) {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
delete ( pm . data , pieceID )
}
2019-10-29 17:23:17 +00:00
// connectionsTracker for tracking ongoing connections on this api server
type connectionsTracker struct {
mu sync . RWMutex
data map [ storj . NodeID ] struct { }
}
// newConnectionsTracker creates a new connectionsTracker and instantiates the map.
func newConnectionsTracker ( ) * connectionsTracker {
return & connectionsTracker {
data : make ( map [ storj . NodeID ] struct { } ) ,
}
}
// tryAdd adds to the map if the node ID is not already added
// it returns true if succeeded and false if already added.
func ( pm * connectionsTracker ) tryAdd ( nodeID storj . NodeID ) bool {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
if _ , ok := pm . data [ nodeID ] ; ok {
return false
}
pm . data [ nodeID ] = struct { } { }
return true
}
// delete deletes a node ID from the map.
func ( pm * connectionsTracker ) delete ( nodeID storj . NodeID ) {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
delete ( pm . data , nodeID )
}
2019-10-11 22:18:05 +01:00
// DRPC returns a DRPC form of the endpoint.
func ( endpoint * Endpoint ) DRPC ( ) pb . DRPCSatelliteGracefulExitServer {
return & drpcEndpoint { Endpoint : endpoint }
}
// NewEndpoint creates a new graceful exit endpoint.
2019-10-25 21:36:26 +01:00
func NewEndpoint ( log * zap . Logger , signer signing . Signer , db DB , overlaydb overlay . DB , overlay * overlay . Service , metainfo * metainfo . Service , orders * orders . Service ,
2019-10-24 20:38:40 +01:00
peerIdentities overlay . PeerIdentities , config Config ) * Endpoint {
2019-10-11 22:18:05 +01:00
return & Endpoint {
2019-10-24 20:38:40 +01:00
log : log ,
interval : time . Millisecond * buildQueueMillis ,
2019-10-25 21:36:26 +01:00
signer : signer ,
2019-10-24 20:38:40 +01:00
db : db ,
overlaydb : overlaydb ,
overlay : overlay ,
metainfo : metainfo ,
orders : orders ,
2019-10-29 17:23:17 +00:00
connections : newConnectionsTracker ( ) ,
2019-10-24 20:38:40 +01:00
peerIdentities : peerIdentities ,
config : config ,
2019-10-30 17:40:57 +00:00
recvTimeout : config . RecvTimeout ,
2019-10-11 22:18:05 +01:00
}
}
// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
2019-10-29 17:23:17 +00:00
func ( endpoint * Endpoint ) Process ( stream pb . SatelliteGracefulExit_ProcessServer ) ( err error ) {
2019-10-11 22:18:05 +01:00
return endpoint . doProcess ( stream )
}
// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
func ( endpoint * drpcEndpoint ) Process ( stream pb . DRPCSatelliteGracefulExit_ProcessStream ) error {
return endpoint . doProcess ( stream )
}
func ( endpoint * Endpoint ) doProcess ( stream processStream ) ( err error ) {
ctx := stream . Context ( )
defer mon . Task ( ) ( & ctx ) ( & err )
peer , err := identity . PeerIdentityFromContext ( ctx )
if err != nil {
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Unauthenticated , Error . Wrap ( err ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
// TODO should we error if the node is DQ'd?
nodeID := peer . ID
2019-10-17 15:29:35 +01:00
endpoint . log . Debug ( "graceful exit process" , zap . Stringer ( "node ID" , nodeID ) )
2019-10-11 22:18:05 +01:00
2019-10-29 17:23:17 +00:00
// ensure that only one connection can be opened for a single node at a time
if ! endpoint . connections . tryAdd ( nodeID ) {
return rpcstatus . Error ( rpcstatus . PermissionDenied , "Only one concurrent connection allowed for graceful exit" )
}
defer func ( ) {
endpoint . connections . delete ( nodeID )
} ( )
2019-10-11 22:18:05 +01:00
eofHandler := func ( err error ) error {
if err == io . EOF {
2019-10-17 15:29:35 +01:00
endpoint . log . Debug ( "received EOF when trying to receive messages from storage node" , zap . Stringer ( "node ID" , nodeID ) )
2019-10-11 22:18:05 +01:00
return nil
}
2019-10-24 20:38:40 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Unknown , Error . Wrap ( err ) . Error ( ) )
}
return nil
2019-10-11 22:18:05 +01:00
}
exitStatus , err := endpoint . overlaydb . GetExitStatus ( ctx , nodeID )
if err != nil {
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
if exitStatus . ExitFinishedAt != nil {
2019-10-25 21:36:26 +01:00
// TODO maybe we should store the reason in the DB so we know how it originally failed.
finishedMsg , err := endpoint . getFinishedMessage ( ctx , endpoint . signer , nodeID , * exitStatus . ExitFinishedAt , exitStatus . ExitSuccess , - 1 )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
err = stream . Send ( finishedMsg )
2019-10-24 20:38:40 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
}
return nil
2019-10-11 22:18:05 +01:00
}
if exitStatus . ExitInitiatedAt == nil {
request := & overlay . ExitStatusRequest { NodeID : nodeID , ExitInitiatedAt : time . Now ( ) . UTC ( ) }
2019-10-29 20:22:20 +00:00
node , err := endpoint . overlaydb . UpdateExitStatus ( ctx , request )
2019-10-11 22:18:05 +01:00
if err != nil {
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
}
err = endpoint . db . IncrementProgress ( ctx , nodeID , 0 , 0 , 0 )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
2019-10-29 20:22:20 +00:00
// graceful exit initiation metrics
age := time . Now ( ) . UTC ( ) . Sub ( node . CreatedAt . UTC ( ) )
mon . FloatVal ( "graceful_exit_init_node_age_seconds" ) . Observe ( age . Seconds ( ) )
mon . IntVal ( "graceful_exit_init_node_audit_success_count" ) . Observe ( node . Reputation . AuditSuccessCount )
mon . IntVal ( "graceful_exit_init_node_audit_total_count" ) . Observe ( node . Reputation . AuditCount )
mon . IntVal ( "graceful_exit_init_node_piece_count" ) . Observe ( node . PieceCount )
2019-10-11 22:18:05 +01:00
err = stream . Send ( & pb . SatelliteMessage { Message : & pb . SatelliteMessage_NotReady { NotReady : & pb . NotReady { } } } )
2019-10-24 20:38:40 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
}
return nil
2019-10-11 22:18:05 +01:00
}
if exitStatus . ExitLoopCompletedAt == nil {
err = stream . Send ( & pb . SatelliteMessage { Message : & pb . SatelliteMessage_NotReady { NotReady : & pb . NotReady { } } } )
2019-10-24 20:38:40 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
}
return nil
2019-10-11 22:18:05 +01:00
}
pending := newPendingMap ( )
2019-10-29 18:40:42 +00:00
// these are used to synchronize the "incomplete transfer loop" with the main thread (storagenode receive loop)
morePiecesFlag := true
loopRunningFlag := true
2019-10-22 21:42:21 +01:00
errChan := make ( chan error , 1 )
2019-10-29 18:40:42 +00:00
processMu := & sync . Mutex { }
processCond := sync . NewCond ( processMu )
2019-10-25 21:36:26 +01:00
2019-10-22 21:42:21 +01:00
handleError := func ( err error ) error {
errChan <- err
close ( errChan )
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
2019-10-22 21:42:21 +01:00
}
2019-10-11 22:18:05 +01:00
var group errgroup . Group
group . Go ( func ( ) error {
2019-10-29 18:40:42 +00:00
incompleteLoop := sync2 . NewCycle ( endpoint . interval )
2019-10-25 21:36:26 +01:00
defer func ( ) {
2019-10-29 18:40:42 +00:00
processMu . Lock ( )
loopRunningFlag = false
processCond . Broadcast ( )
processMu . Unlock ( )
2019-10-25 21:36:26 +01:00
} ( )
2019-10-11 22:18:05 +01:00
2019-10-29 18:40:42 +00:00
ctx , cancel := context . WithCancel ( ctx )
return incompleteLoop . Run ( ctx , func ( ctx context . Context ) error {
2019-10-11 22:18:05 +01:00
if pending . length ( ) == 0 {
incomplete , err := endpoint . db . GetIncompleteNotFailed ( ctx , nodeID , endpoint . config . EndpointBatchSize , 0 )
if err != nil {
2019-10-22 21:42:21 +01:00
return handleError ( err )
2019-10-11 22:18:05 +01:00
}
if len ( incomplete ) == 0 {
2019-10-24 17:24:42 +01:00
incomplete , err = endpoint . db . GetIncompleteFailed ( ctx , nodeID , endpoint . config . MaxFailuresPerPiece , endpoint . config . EndpointBatchSize , 0 )
2019-10-11 22:18:05 +01:00
if err != nil {
2019-10-22 21:42:21 +01:00
return handleError ( err )
2019-10-11 22:18:05 +01:00
}
}
if len ( incomplete ) == 0 {
2019-10-17 15:29:35 +01:00
endpoint . log . Debug ( "no more pieces to transfer for node" , zap . Stringer ( "node ID" , nodeID ) )
2019-10-29 18:40:42 +00:00
processMu . Lock ( )
morePiecesFlag = false
processMu . Unlock ( )
cancel ( )
return nil
2019-10-11 22:18:05 +01:00
}
for _ , inc := range incomplete {
err = endpoint . processIncomplete ( ctx , stream , pending , inc )
if err != nil {
2019-10-22 21:42:21 +01:00
return handleError ( err )
2019-10-11 22:18:05 +01:00
}
}
2019-10-29 18:40:42 +00:00
if pending . length ( ) > 0 {
processCond . Broadcast ( )
}
2019-10-11 22:18:05 +01:00
}
2019-10-29 18:40:42 +00:00
return nil
} )
2019-10-11 22:18:05 +01:00
} )
for {
2019-10-22 21:42:21 +01:00
select {
case <- errChan :
return group . Wait ( )
default :
}
2019-10-11 22:18:05 +01:00
pendingCount := pending . length ( )
2019-10-24 17:24:42 +01:00
2019-10-29 18:40:42 +00:00
processMu . Lock ( )
2019-10-11 22:18:05 +01:00
// if there are no more transfers and the pending queue is empty, send complete
2019-10-29 18:40:42 +00:00
if ! morePiecesFlag && pendingCount == 0 {
processMu . Unlock ( )
2019-10-24 17:24:42 +01:00
exitStatusRequest := & overlay . ExitStatusRequest {
NodeID : nodeID ,
ExitFinishedAt : time . Now ( ) . UTC ( ) ,
}
progress , err := endpoint . db . GetProgress ( ctx , nodeID )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
var transferMsg * pb . SatelliteMessage
2019-10-29 20:22:20 +00:00
mon . IntVal ( "graceful_exit_final_pieces_failed" ) . Observe ( progress . PiecesFailed )
mon . IntVal ( "graceful_exit_final_pieces_succeess" ) . Observe ( progress . PiecesTransferred )
mon . IntVal ( "graceful_exit_final_bytes_transferred" ) . Observe ( progress . BytesTransferred )
2019-10-24 17:24:42 +01:00
processed := progress . PiecesFailed + progress . PiecesTransferred
2019-10-29 20:22:20 +00:00
if processed > 0 {
mon . IntVal ( "graceful_exit_successful_pieces_transfer_ratio" ) . Observe ( progress . PiecesTransferred / processed )
}
2019-10-24 17:24:42 +01:00
// check node's exiting progress to see if it has failed passed max failure threshold
if processed > 0 && float64 ( progress . PiecesFailed ) / float64 ( processed ) * 100 >= float64 ( endpoint . config . OverallMaxFailuresPercentage ) {
exitStatusRequest . ExitSuccess = false
2019-10-25 21:36:26 +01:00
transferMsg , err = endpoint . getFinishedMessage ( ctx , endpoint . signer , nodeID , exitStatusRequest . ExitFinishedAt , exitStatusRequest . ExitSuccess , pb . ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-24 17:24:42 +01:00
}
} else {
exitStatusRequest . ExitSuccess = true
2019-10-25 21:36:26 +01:00
transferMsg , err = endpoint . getFinishedMessage ( ctx , endpoint . signer , nodeID , exitStatusRequest . ExitFinishedAt , exitStatusRequest . ExitSuccess , - 1 )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-24 17:24:42 +01:00
}
}
2019-10-29 20:22:20 +00:00
if exitStatusRequest . ExitSuccess {
mon . Meter ( "graceful_exit_success" ) . Mark ( 1 )
} else {
mon . Meter ( "graceful_exit_fail_max_failures_percentage" ) . Mark ( 1 )
}
2019-10-24 17:24:42 +01:00
_ , err = endpoint . overlaydb . UpdateExitStatus ( ctx , exitStatusRequest )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-11 22:18:05 +01:00
}
2019-10-24 17:24:42 +01:00
2019-10-11 22:18:05 +01:00
err = stream . Send ( transferMsg )
if err != nil {
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
2019-10-24 17:24:42 +01:00
// remove remaining items from the queue after notifying nodes about their exit status
err = endpoint . db . DeleteTransferQueueItems ( ctx , nodeID )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-10-11 22:18:05 +01:00
break
2019-10-29 18:40:42 +00:00
} else if pendingCount == 0 {
// otherwise, wait for incomplete loop
processCond . Wait ( )
select {
case <- ctx . Done ( ) :
processMu . Unlock ( )
return ctx . Err ( )
default :
}
pendingCount = pending . length ( )
// if pending count is still 0 and the loop has exited, return
if pendingCount == 0 && ! loopRunningFlag {
processMu . Unlock ( )
continue
}
2019-10-11 22:18:05 +01:00
}
2019-10-29 18:40:42 +00:00
processMu . Unlock ( )
2019-10-11 22:18:05 +01:00
2019-10-30 17:40:57 +00:00
done := make ( chan struct { } )
var request * pb . StorageNodeMessage
var recvErr error
go func ( ) {
request , recvErr = stream . Recv ( )
close ( done )
} ( )
timer := time . NewTimer ( endpoint . recvTimeout )
select {
case <- ctx . Done ( ) :
return rpcstatus . Error ( rpcstatus . Internal , Error . New ( "context canceled while waiting to receive message from storagenode" ) . Error ( ) )
case <- timer . C :
return rpcstatus . Error ( rpcstatus . DeadlineExceeded , Error . New ( "timeout while waiting to receive message from storagenode" ) . Error ( ) )
case <- done :
}
if recvErr != nil {
return eofHandler ( recvErr )
2019-10-11 22:18:05 +01:00
}
switch m := request . GetMessage ( ) . ( type ) {
case * pb . StorageNodeMessage_Succeeded :
2019-10-25 21:36:26 +01:00
err = endpoint . handleSucceeded ( ctx , stream , pending , nodeID , m )
2019-10-11 22:18:05 +01:00
if err != nil {
2019-10-24 20:38:40 +01:00
if ErrInvalidArgument . Has ( err ) {
2019-10-25 18:16:20 +01:00
// immediately fail and complete graceful exit for nodes that fail satellite validation
exitStatusRequest := & overlay . ExitStatusRequest {
NodeID : nodeID ,
ExitFinishedAt : time . Now ( ) . UTC ( ) ,
ExitSuccess : false ,
}
2019-10-25 21:36:26 +01:00
finishedMsg , err := endpoint . getFinishedMessage ( ctx , endpoint . signer , nodeID , exitStatusRequest . ExitFinishedAt , exitStatusRequest . ExitSuccess , pb . ExitFailed_VERIFICATION_FAILED )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-25 18:16:20 +01:00
}
2019-10-29 20:22:20 +00:00
mon . Meter ( "graceful_exit_fail_validation" ) . Mark ( 1 )
2019-10-25 18:16:20 +01:00
_ , err = endpoint . overlaydb . UpdateExitStatus ( ctx , exitStatusRequest )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-10-25 21:36:26 +01:00
err = stream . Send ( finishedMsg )
2019-10-25 18:16:20 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
}
// remove remaining items from the queue after notifying nodes about their exit status
err = endpoint . db . DeleteTransferQueueItems ( ctx , nodeID )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
break
2019-10-24 20:38:40 +01:00
}
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-11 22:18:05 +01:00
}
case * pb . StorageNodeMessage_Failed :
err = endpoint . handleFailed ( ctx , pending , nodeID , m )
if err != nil {
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
default :
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Unknown , Error . New ( "unknown storage node message: %v" , m ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
}
if err := group . Wait ( ) ; err != nil {
2019-10-29 18:40:42 +00:00
if ! errs . Is ( err , context . Canceled ) {
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
}
2019-10-11 22:18:05 +01:00
}
return nil
}
func ( endpoint * Endpoint ) processIncomplete ( ctx context . Context , stream processStream , pending * pendingMap , incomplete * TransferQueueItem ) error {
nodeID := incomplete . NodeID
pointer , err := endpoint . metainfo . Get ( ctx , string ( incomplete . Path ) )
if err != nil {
return Error . Wrap ( err )
}
remote := pointer . GetRemote ( )
pieces := remote . GetRemotePieces ( )
var nodePiece * pb . RemotePiece
excludedNodeIDs := make ( [ ] storj . NodeID , len ( pieces ) )
for i , piece := range pieces {
if piece . NodeId == nodeID && piece . PieceNum == incomplete . PieceNum {
nodePiece = piece
}
excludedNodeIDs [ i ] = piece . NodeId
}
if nodePiece == nil {
2019-10-17 15:29:35 +01:00
endpoint . log . Debug ( "piece no longer held by node" , zap . Stringer ( "node ID" , nodeID ) , zap . ByteString ( "path" , incomplete . Path ) , zap . Int32 ( "piece num" , incomplete . PieceNum ) )
2019-10-11 22:18:05 +01:00
2019-10-28 15:08:33 +00:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , incomplete . Path , incomplete . PieceNum )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
return nil
}
redundancy , err := eestream . NewRedundancyStrategyFromProto ( pointer . GetRemote ( ) . GetRedundancy ( ) )
if err != nil {
return Error . Wrap ( err )
}
if len ( remote . GetRemotePieces ( ) ) > redundancy . OptimalThreshold ( ) {
2019-10-17 15:29:35 +01:00
endpoint . log . Debug ( "pointer has more pieces than required. removing node from pointer." , zap . Stringer ( "node ID" , nodeID ) , zap . ByteString ( "path" , incomplete . Path ) , zap . Int32 ( "piece num" , incomplete . PieceNum ) )
2019-10-11 22:18:05 +01:00
_ , err = endpoint . metainfo . UpdatePieces ( ctx , string ( incomplete . Path ) , pointer , nil , [ ] * pb . RemotePiece { nodePiece } )
if err != nil {
return Error . Wrap ( err )
}
2019-10-28 15:08:33 +00:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , incomplete . Path , incomplete . PieceNum )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
return nil
}
pieceSize := eestream . CalcPieceSize ( pointer . GetSegmentSize ( ) , redundancy )
request := overlay . FindStorageNodesRequest {
RequestedCount : 1 ,
FreeBandwidth : pieceSize ,
FreeDisk : pieceSize ,
ExcludedNodes : excludedNodeIDs ,
}
newNodes , err := endpoint . overlay . FindStorageNodes ( ctx , request )
if err != nil {
return Error . Wrap ( err )
}
if len ( newNodes ) == 0 {
2019-10-17 15:29:35 +01:00
return Error . New ( "could not find a node to receive piece transfer: node ID %v, path %v, piece num %v" , nodeID , incomplete . Path , incomplete . PieceNum )
2019-10-11 22:18:05 +01:00
}
newNode := newNodes [ 0 ]
2019-10-17 15:29:35 +01:00
endpoint . log . Debug ( "found new node for piece transfer" , zap . Stringer ( "original node ID" , nodeID ) , zap . Stringer ( "replacement node ID" , newNode . Id ) ,
2019-10-11 22:18:05 +01:00
zap . ByteString ( "path" , incomplete . Path ) , zap . Int32 ( "piece num" , incomplete . PieceNum ) )
pieceID := remote . RootPieceId . Derive ( nodeID , incomplete . PieceNum )
parts := storj . SplitPath ( storj . Path ( incomplete . Path ) )
if len ( parts ) < 2 {
2019-10-17 15:29:35 +01:00
return Error . New ( "invalid path for node ID %v, piece ID %v" , incomplete . NodeID , pieceID )
2019-10-11 22:18:05 +01:00
}
bucketID := [ ] byte ( storj . JoinPaths ( parts [ 0 ] , parts [ 1 ] ) )
2019-10-22 21:42:21 +01:00
limit , privateKey , err := endpoint . orders . CreateGracefulExitPutOrderLimit ( ctx , bucketID , newNode . Id , incomplete . PieceNum , remote . RootPieceId , int32 ( pieceSize ) )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
transferMsg := & pb . SatelliteMessage {
Message : & pb . SatelliteMessage_TransferPiece {
TransferPiece : & pb . TransferPiece {
2019-10-15 20:59:12 +01:00
OriginalPieceId : pieceID ,
2019-10-11 22:18:05 +01:00
AddressedOrderLimit : limit ,
PrivateKey : privateKey ,
} ,
} ,
}
err = stream . Send ( transferMsg )
if err != nil {
return Error . Wrap ( err )
}
pending . put ( pieceID , & pendingTransfer {
path : incomplete . Path ,
pieceSize : pieceSize ,
satelliteMessage : transferMsg ,
2019-10-28 18:43:46 +00:00
originalPointer : pointer ,
2019-10-24 20:38:40 +01:00
pieceNum : incomplete . PieceNum ,
2019-10-11 22:18:05 +01:00
} )
return nil
}
2019-10-25 21:36:26 +01:00
func ( endpoint * Endpoint ) handleSucceeded ( ctx context . Context , stream processStream , pending * pendingMap , exitingNodeID storj . NodeID , message * pb . StorageNodeMessage_Succeeded ) ( err error ) {
2019-10-11 22:18:05 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-10-24 20:38:40 +01:00
originalPieceID := message . Succeeded . OriginalPieceId
transfer , ok := pending . get ( originalPieceID )
if ! ok {
endpoint . log . Error ( "Could not find transfer item in pending queue" , zap . Stringer ( "Piece ID" , originalPieceID ) )
return Error . New ( "Could not find transfer item in pending queue" )
2019-10-11 22:18:05 +01:00
}
2019-10-24 20:38:40 +01:00
if transfer . satelliteMessage == nil {
return Error . New ( "Satellite message cannot be nil" )
}
if transfer . satelliteMessage . GetTransferPiece ( ) == nil {
return Error . New ( "Satellite message transfer piece cannot be nil" )
}
if transfer . satelliteMessage . GetTransferPiece ( ) . GetAddressedOrderLimit ( ) == nil {
return Error . New ( "Addressed order limit on transfer piece cannot be nil" )
}
if transfer . satelliteMessage . GetTransferPiece ( ) . GetAddressedOrderLimit ( ) . GetLimit ( ) == nil {
return Error . New ( "Addressed order limit on transfer piece cannot be nil" )
}
if transfer . path == nil {
return Error . New ( "Transfer path cannot be nil" )
2019-10-11 22:18:05 +01:00
}
2019-10-24 20:38:40 +01:00
originalOrderLimit := message . Succeeded . GetOriginalOrderLimit ( )
if originalOrderLimit == nil {
return ErrInvalidArgument . New ( "Original order limit cannot be nil" )
}
originalPieceHash := message . Succeeded . GetOriginalPieceHash ( )
if originalPieceHash == nil {
return ErrInvalidArgument . New ( "Original piece hash cannot be nil" )
}
replacementPieceHash := message . Succeeded . GetReplacementPieceHash ( )
if replacementPieceHash == nil {
return ErrInvalidArgument . New ( "Replacement piece hash cannot be nil" )
}
2019-10-11 22:18:05 +01:00
2019-10-24 20:38:40 +01:00
// verify that the original piece hash and replacement piece hash match
if ! bytes . Equal ( originalPieceHash . Hash , replacementPieceHash . Hash ) {
return ErrInvalidArgument . New ( "Piece hashes for transferred piece don't match" )
}
2019-10-11 22:18:05 +01:00
2019-10-24 20:38:40 +01:00
// verify that the satellite signed the original order limit
err = endpoint . orders . VerifyOrderLimitSignature ( ctx , originalOrderLimit )
if err != nil {
return ErrInvalidArgument . Wrap ( err )
}
2019-10-11 22:18:05 +01:00
2019-10-24 20:38:40 +01:00
// verify that the public key on the order limit signed the original piece hash
err = signing . VerifyUplinkPieceHashSignature ( ctx , originalOrderLimit . UplinkPublicKey , originalPieceHash )
if err != nil {
return ErrInvalidArgument . Wrap ( err )
}
if originalOrderLimit . PieceId != message . Succeeded . OriginalPieceId {
return ErrInvalidArgument . New ( "Invalid original piece ID" )
}
receivingNodeID := transfer . satelliteMessage . GetTransferPiece ( ) . GetAddressedOrderLimit ( ) . GetLimit ( ) . StorageNodeId
2019-10-28 18:43:46 +00:00
if transfer . originalPointer == nil || transfer . originalPointer . GetRemote ( ) == nil {
return Error . New ( "could not get remote pointer from transfer item" )
}
calculatedNewPieceID := transfer . originalPointer . GetRemote ( ) . RootPieceId . Derive ( receivingNodeID , transfer . pieceNum )
2019-10-24 20:38:40 +01:00
if calculatedNewPieceID != replacementPieceHash . PieceId {
return ErrInvalidArgument . New ( "Invalid replacement piece ID" )
}
// get peerID and signee for new storage node
peerID , err := endpoint . peerIdentities . Get ( ctx , receivingNodeID )
if err != nil {
return Error . Wrap ( err )
}
signee := signing . SigneeFromPeerIdentity ( peerID )
// verify that the new node signed the replacement piece hash
err = signing . VerifyPieceHashSignature ( ctx , signee , replacementPieceHash )
if err != nil {
return ErrInvalidArgument . Wrap ( err )
2019-10-11 22:18:05 +01:00
}
2019-10-28 15:08:33 +00:00
transferQueueItem , err := endpoint . db . GetTransferQueueItem ( ctx , exitingNodeID , transfer . path , transfer . pieceNum )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2019-10-28 18:43:46 +00:00
err = endpoint . updatePointer ( ctx , transfer . originalPointer , exitingNodeID , receivingNodeID , transfer . path , transfer . pieceNum )
2019-10-25 16:14:22 +01:00
if err != nil {
return Error . Wrap ( err )
}
2019-10-11 22:18:05 +01:00
var failed int64
2019-10-24 17:24:42 +01:00
if transferQueueItem . FailedCount != nil && * transferQueueItem . FailedCount >= endpoint . config . MaxFailuresPerPiece {
2019-10-11 22:18:05 +01:00
failed = - 1
}
2019-10-24 20:38:40 +01:00
err = endpoint . db . IncrementProgress ( ctx , exitingNodeID , transfer . pieceSize , 1 , failed )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2019-10-28 15:08:33 +00:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , exitingNodeID , transfer . path , transfer . pieceNum )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2019-10-24 20:38:40 +01:00
pending . delete ( originalPieceID )
2019-10-11 22:18:05 +01:00
2019-10-25 21:36:26 +01:00
deleteMsg := & pb . SatelliteMessage {
Message : & pb . SatelliteMessage_DeletePiece {
DeletePiece : & pb . DeletePiece {
OriginalPieceId : originalPieceID ,
} ,
} ,
}
err = stream . Send ( deleteMsg )
if err != nil {
return Error . Wrap ( err )
}
2019-10-29 20:22:20 +00:00
mon . Meter ( "graceful_exit_transfer_piece_success" ) . Mark ( 1 )
2019-10-11 22:18:05 +01:00
return nil
}
func ( endpoint * Endpoint ) handleFailed ( ctx context . Context , pending * pendingMap , nodeID storj . NodeID , message * pb . StorageNodeMessage_Failed ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
2019-10-17 15:29:35 +01:00
endpoint . log . Warn ( "transfer failed" , zap . Stringer ( "piece ID" , message . Failed . OriginalPieceId ) , zap . Stringer ( "transfer error" , message . Failed . GetError ( ) ) )
2019-10-29 20:22:20 +00:00
mon . Meter ( "graceful_exit_transfer_piece_fail" ) . Mark ( 1 )
2019-10-15 20:59:12 +01:00
pieceID := message . Failed . OriginalPieceId
2019-10-11 22:18:05 +01:00
transfer , ok := pending . get ( pieceID )
if ! ok {
2019-10-17 15:29:35 +01:00
endpoint . log . Debug ( "could not find transfer message in pending queue. skipping." , zap . Stringer ( "piece ID" , pieceID ) )
2019-10-11 22:18:05 +01:00
// TODO we should probably error out here so we don't get stuck in a loop with a SN that is not behaving properl
}
2019-10-28 15:08:33 +00:00
transferQueueItem , err := endpoint . db . GetTransferQueueItem ( ctx , nodeID , transfer . path , transfer . pieceNum )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
now := time . Now ( ) . UTC ( )
failedCount := 1
if transferQueueItem . FailedCount != nil {
failedCount = * transferQueueItem . FailedCount + 1
}
errorCode := int ( pb . TransferFailed_Error_value [ message . Failed . Error . String ( ) ] )
// TODO if error code is NOT_FOUND, the node no longer has the piece. remove the queue item and the pointer
transferQueueItem . LastFailedAt = & now
transferQueueItem . FailedCount = & failedCount
transferQueueItem . LastFailedCode = & errorCode
err = endpoint . db . UpdateTransferQueueItem ( ctx , * transferQueueItem )
if err != nil {
return Error . Wrap ( err )
}
2019-10-24 17:24:42 +01:00
// only increment overall failed count if piece failures has reached the threshold
if failedCount == endpoint . config . MaxFailuresPerPiece {
2019-10-11 22:18:05 +01:00
err = endpoint . db . IncrementProgress ( ctx , nodeID , 0 , 0 , 1 )
if err != nil {
return Error . Wrap ( err )
}
}
pending . delete ( pieceID )
return nil
}
2019-10-25 16:14:22 +01:00
2019-10-25 21:36:26 +01:00
func ( endpoint * Endpoint ) getFinishedMessage ( ctx context . Context , signer signing . Signer , nodeID storj . NodeID , finishedAt time . Time , success bool , reason pb . ExitFailed_Reason ) ( message * pb . SatelliteMessage , err error ) {
if success {
unsigned := & pb . ExitCompleted {
SatelliteId : endpoint . signer . ID ( ) ,
NodeId : nodeID ,
Completed : finishedAt ,
}
signed , err := signing . SignExitCompleted ( ctx , endpoint . signer , unsigned )
if err != nil {
return nil , Error . Wrap ( err )
}
message = & pb . SatelliteMessage { Message : & pb . SatelliteMessage_ExitCompleted {
ExitCompleted : signed ,
} }
} else {
unsigned := & pb . ExitFailed {
SatelliteId : endpoint . signer . ID ( ) ,
NodeId : nodeID ,
Failed : finishedAt ,
}
if reason >= 0 {
unsigned . Reason = reason
}
signed , err := signing . SignExitFailed ( ctx , endpoint . signer , unsigned )
if err != nil {
return nil , Error . Wrap ( err )
}
message = & pb . SatelliteMessage { Message : & pb . SatelliteMessage_ExitFailed {
ExitFailed : signed ,
} }
}
return message , nil
}
2019-10-28 18:43:46 +00:00
func ( endpoint * Endpoint ) updatePointer ( ctx context . Context , originalPointer * pb . Pointer , exitingNodeID storj . NodeID , receivingNodeID storj . NodeID , path [ ] byte , pieceNum int32 ) ( err error ) {
2019-10-25 16:14:22 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
// remove the node from the pointer
pointer , err := endpoint . metainfo . Get ( ctx , string ( path ) )
if err != nil {
return Error . Wrap ( err )
}
remote := pointer . GetRemote ( )
// nothing to do here
if remote == nil {
return nil
}
2019-10-27 18:20:22 +00:00
pieceMap := make ( map [ storj . NodeID ] * pb . RemotePiece )
2019-10-25 16:14:22 +01:00
for _ , piece := range remote . GetRemotePieces ( ) {
2019-10-27 18:20:22 +00:00
pieceMap [ piece . NodeId ] = piece
}
var toRemove [ ] * pb . RemotePiece
existingPiece , ok := pieceMap [ exitingNodeID ]
if ! ok {
return Error . New ( "node no longer has the piece. Node ID: %s" , exitingNodeID . String ( ) )
}
if existingPiece != nil && existingPiece . PieceNum != pieceNum {
return Error . New ( "invalid existing piece info. Exiting Node ID: %s, PieceNum: %d" , exitingNodeID . String ( ) , pieceNum )
2019-10-25 16:14:22 +01:00
}
2019-10-27 18:20:22 +00:00
toRemove = [ ] * pb . RemotePiece { existingPiece }
delete ( pieceMap , exitingNodeID )
2019-10-25 16:14:22 +01:00
var toAdd [ ] * pb . RemotePiece
2019-10-27 18:20:22 +00:00
// check receiving node id is not already in the pointer
_ , ok = pieceMap [ receivingNodeID ]
if ok {
return Error . New ( "node id already exists in piece. Path: %s, NodeID: %s" , path , receivingNodeID . String ( ) )
}
2019-10-25 16:14:22 +01:00
if ! receivingNodeID . IsZero ( ) {
toAdd = [ ] * pb . RemotePiece { {
PieceNum : pieceNum ,
NodeId : receivingNodeID ,
} }
}
2019-10-28 18:43:46 +00:00
_ , err = endpoint . metainfo . UpdatePieces ( ctx , string ( path ) , originalPointer , toAdd , toRemove )
2019-10-25 16:14:22 +01:00
if err != nil {
return Error . Wrap ( err )
}
return nil
}