2019-10-11 22:18:05 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"context"
2021-06-30 09:05:09 +01:00
"database/sql"
2019-10-11 22:18:05 +01:00
"io"
2020-12-21 13:48:29 +00:00
"sort"
2019-10-11 22:18:05 +01:00
"sync"
"time"
2019-10-24 20:38:40 +01:00
"github.com/zeebo/errs"
2019-10-11 22:18:05 +01:00
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
2019-12-27 11:48:47 +00:00
"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/sync2"
2021-06-25 08:23:33 +01:00
"storj.io/common/uuid"
2021-04-21 13:42:57 +01:00
"storj.io/storj/satellite/metabase"
2019-10-11 22:18:05 +01:00
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
2020-02-21 14:07:29 +00:00
"storj.io/uplink/private/eestream"
2019-10-11 22:18:05 +01:00
)
2020-07-16 15:18:02 +01:00
// millis for the transfer queue building ticker.
2019-10-11 22:18:05 +01:00
const buildQueueMillis = 100
2019-10-24 20:38:40 +01:00
var (
// ErrInvalidArgument is an error class for invalid argument errors used to check which rpc code to use.
ErrInvalidArgument = errs . Class ( "graceful exit" )
2019-12-20 21:51:13 +00:00
// ErrIneligibleNodeAge is an error class for when a node has not been on the network long enough to graceful exit.
ErrIneligibleNodeAge = errs . Class ( "node is not yet eligible for graceful exit" )
2019-10-24 20:38:40 +01:00
)
2019-10-11 22:18:05 +01:00
// Endpoint for handling the transfer of pieces for Graceful Exit.
type Endpoint struct {
2021-03-29 09:58:04 +01:00
pb . DRPCSatelliteGracefulExitUnimplementedServer
2019-10-24 20:38:40 +01:00
log * zap . Logger
interval time . Duration
2019-10-25 21:36:26 +01:00
signer signing . Signer
2019-10-24 20:38:40 +01:00
db DB
overlaydb overlay . DB
overlay * overlay . Service
2021-05-13 09:14:18 +01:00
metabase * metabase . DB
2019-10-24 20:38:40 +01:00
orders * orders . Service
2019-10-29 17:23:17 +00:00
connections * connectionsTracker
2019-10-24 20:38:40 +01:00
peerIdentities overlay . PeerIdentities
config Config
2019-10-30 17:40:57 +00:00
recvTimeout time . Duration
2019-10-11 22:18:05 +01:00
}
2020-07-16 15:18:02 +01:00
// connectionsTracker for tracking ongoing connections on this api server.
2019-10-29 17:23:17 +00:00
type connectionsTracker struct {
mu sync . RWMutex
data map [ storj . NodeID ] struct { }
}
// newConnectionsTracker creates a new connectionsTracker and instantiates the map.
func newConnectionsTracker ( ) * connectionsTracker {
return & connectionsTracker {
data : make ( map [ storj . NodeID ] struct { } ) ,
}
}
// tryAdd adds to the map if the node ID is not already added
// it returns true if succeeded and false if already added.
func ( pm * connectionsTracker ) tryAdd ( nodeID storj . NodeID ) bool {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
if _ , ok := pm . data [ nodeID ] ; ok {
return false
}
pm . data [ nodeID ] = struct { } { }
return true
}
// delete deletes a node ID from the map.
func ( pm * connectionsTracker ) delete ( nodeID storj . NodeID ) {
pm . mu . Lock ( )
defer pm . mu . Unlock ( )
delete ( pm . data , nodeID )
}
2019-10-11 22:18:05 +01:00
// NewEndpoint creates a new graceful exit endpoint.
2021-05-13 09:14:18 +01:00
func NewEndpoint ( log * zap . Logger , signer signing . Signer , db DB , overlaydb overlay . DB , overlay * overlay . Service , metabase * metabase . DB , orders * orders . Service ,
2019-10-24 20:38:40 +01:00
peerIdentities overlay . PeerIdentities , config Config ) * Endpoint {
2019-10-11 22:18:05 +01:00
return & Endpoint {
2019-10-24 20:38:40 +01:00
log : log ,
interval : time . Millisecond * buildQueueMillis ,
2019-10-25 21:36:26 +01:00
signer : signer ,
2019-10-24 20:38:40 +01:00
db : db ,
overlaydb : overlaydb ,
overlay : overlay ,
2020-12-16 16:47:31 +00:00
metabase : metabase ,
2019-10-24 20:38:40 +01:00
orders : orders ,
2019-10-29 17:23:17 +00:00
connections : newConnectionsTracker ( ) ,
2019-10-24 20:38:40 +01:00
peerIdentities : peerIdentities ,
config : config ,
2019-10-30 17:40:57 +00:00
recvTimeout : config . RecvTimeout ,
2019-10-11 22:18:05 +01:00
}
}
// Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
2020-05-11 06:20:34 +01:00
func ( endpoint * Endpoint ) Process ( stream pb . DRPCSatelliteGracefulExit_ProcessStream ) ( err error ) {
2019-10-11 22:18:05 +01:00
ctx := stream . Context ( )
defer mon . Task ( ) ( & ctx ) ( & err )
peer , err := identity . PeerIdentityFromContext ( ctx )
if err != nil {
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Unauthenticated , Error . Wrap ( err ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
nodeID := peer . ID
2019-11-05 21:04:07 +00:00
endpoint . log . Debug ( "graceful exit process" , zap . Stringer ( "Node ID" , nodeID ) )
2019-10-11 22:18:05 +01:00
2019-10-29 17:23:17 +00:00
// ensure that only one connection can be opened for a single node at a time
if ! endpoint . connections . tryAdd ( nodeID ) {
2019-12-20 17:35:02 +00:00
return rpcstatus . Error ( rpcstatus . Aborted , "Only one concurrent connection allowed for graceful exit" )
2019-10-29 17:23:17 +00:00
}
defer func ( ) {
endpoint . connections . delete ( nodeID )
} ( )
2021-06-30 09:05:09 +01:00
//TODO: remove this call when we remove graceful_exit_transfer_queue is removed
usesSegmentTransferQueue := true
progress , err := endpoint . db . GetProgress ( ctx , nodeID )
if err != nil && ! errs . Is ( err , sql . ErrNoRows ) {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
if progress != nil {
usesSegmentTransferQueue = progress . UsesSegmentTransferQueue
}
isDisqualified , err := endpoint . handleDisqualifiedNode ( ctx , nodeID , usesSegmentTransferQueue )
2019-11-07 17:19:34 +00:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
if isDisqualified {
2019-12-20 17:35:02 +00:00
return rpcstatus . Error ( rpcstatus . FailedPrecondition , "Disqualified nodes cannot graceful exit" )
2019-11-07 17:19:34 +00:00
}
2019-11-08 18:57:51 +00:00
msg , err := endpoint . checkExitStatus ( ctx , nodeID )
2019-10-11 22:18:05 +01:00
if err != nil {
2019-12-20 21:51:13 +00:00
if ErrIneligibleNodeAge . Has ( err ) {
return rpcstatus . Error ( rpcstatus . FailedPrecondition , err . Error ( ) )
}
2019-11-08 18:57:51 +00:00
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-11 22:18:05 +01:00
}
2019-11-08 18:57:51 +00:00
if msg != nil {
err = stream . Send ( msg )
2019-10-25 21:36:26 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-10-24 20:38:40 +01:00
return nil
2019-10-11 22:18:05 +01:00
}
2019-11-08 18:57:51 +00:00
// maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests
2019-11-21 22:03:16 +00:00
// and handles concurrency between sending logic and receiving logic
pending := NewPendingMap ( )
2019-10-11 22:18:05 +01:00
var group errgroup . Group
2020-01-23 18:28:32 +00:00
defer func ( ) {
err2 := errs2 . IgnoreCanceled ( group . Wait ( ) )
if err2 != nil {
endpoint . log . Error ( "incompleteLoop gave error" , zap . Error ( err2 ) )
}
} ( )
// we cancel this context in all situations where we want to exit the loop
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2019-10-11 22:18:05 +01:00
group . Go ( func ( ) error {
2019-10-29 18:40:42 +00:00
incompleteLoop := sync2 . NewCycle ( endpoint . interval )
2019-10-11 22:18:05 +01:00
2019-11-21 22:03:16 +00:00
loopErr := incompleteLoop . Run ( ctx , func ( ctx context . Context ) error {
if pending . Length ( ) == 0 {
2021-06-30 09:05:09 +01:00
incomplete , err := endpoint . db . GetIncompleteNotFailed ( ctx , nodeID , endpoint . config . EndpointBatchSize , 0 , usesSegmentTransferQueue )
2019-10-11 22:18:05 +01:00
if err != nil {
2019-11-21 22:03:16 +00:00
cancel ( )
return pending . DoneSending ( err )
2019-10-11 22:18:05 +01:00
}
if len ( incomplete ) == 0 {
2021-06-30 09:05:09 +01:00
incomplete , err = endpoint . db . GetIncompleteFailed ( ctx , nodeID , endpoint . config . MaxFailuresPerPiece , endpoint . config . EndpointBatchSize , 0 , usesSegmentTransferQueue )
2019-10-11 22:18:05 +01:00
if err != nil {
2019-11-21 22:03:16 +00:00
cancel ( )
return pending . DoneSending ( err )
2019-10-11 22:18:05 +01:00
}
}
if len ( incomplete ) == 0 {
2019-11-05 21:04:07 +00:00
endpoint . log . Debug ( "no more pieces to transfer for node" , zap . Stringer ( "Node ID" , nodeID ) )
2019-10-29 18:40:42 +00:00
cancel ( )
2019-11-21 22:03:16 +00:00
return pending . DoneSending ( nil )
2019-10-11 22:18:05 +01:00
}
for _ , inc := range incomplete {
err = endpoint . processIncomplete ( ctx , stream , pending , inc )
if err != nil {
2019-11-21 22:03:16 +00:00
cancel ( )
return pending . DoneSending ( err )
2019-10-11 22:18:05 +01:00
}
}
}
2019-10-29 18:40:42 +00:00
return nil
} )
2019-11-21 22:03:16 +00:00
return errs2 . IgnoreCanceled ( loopErr )
2019-10-11 22:18:05 +01:00
} )
for {
2019-11-21 22:03:16 +00:00
finishedPromise := pending . IsFinishedPromise ( )
finished , err := finishedPromise . Wait ( ctx )
2020-01-23 18:28:32 +00:00
err = errs2 . IgnoreCanceled ( err )
2019-11-21 22:03:16 +00:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-22 21:42:21 +01:00
}
2019-11-21 22:03:16 +00:00
// if there is no more work to receive send complete
if finished {
2021-06-30 09:05:09 +01:00
isDisqualified , err := endpoint . handleDisqualifiedNode ( ctx , nodeID , usesSegmentTransferQueue )
2019-11-07 17:19:34 +00:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
if isDisqualified {
2019-12-20 17:35:02 +00:00
return rpcstatus . Error ( rpcstatus . FailedPrecondition , "Disqualified nodes cannot graceful exit" )
2019-11-07 17:19:34 +00:00
}
2019-11-08 18:57:51 +00:00
// update exit status
exitStatusRequest , exitFailedReason , err := endpoint . generateExitStatusRequest ( ctx , nodeID )
2019-10-24 17:24:42 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-11 22:18:05 +01:00
}
2019-10-24 17:24:42 +01:00
2021-06-30 09:05:09 +01:00
err = endpoint . handleFinished ( ctx , stream , exitStatusRequest , exitFailedReason , usesSegmentTransferQueue )
2019-10-24 17:24:42 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2019-10-11 22:18:05 +01:00
break
}
2019-10-30 17:40:57 +00:00
done := make ( chan struct { } )
var request * pb . StorageNodeMessage
var recvErr error
go func ( ) {
request , recvErr = stream . Recv ( )
close ( done )
} ( )
timer := time . NewTimer ( endpoint . recvTimeout )
2019-11-08 18:57:51 +00:00
2019-10-30 17:40:57 +00:00
select {
case <- ctx . Done ( ) :
return rpcstatus . Error ( rpcstatus . Internal , Error . New ( "context canceled while waiting to receive message from storagenode" ) . Error ( ) )
case <- timer . C :
return rpcstatus . Error ( rpcstatus . DeadlineExceeded , Error . New ( "timeout while waiting to receive message from storagenode" ) . Error ( ) )
case <- done :
}
if recvErr != nil {
2019-11-08 18:57:51 +00:00
if errs . Is ( recvErr , io . EOF ) {
endpoint . log . Debug ( "received EOF when trying to receive messages from storage node" , zap . Stringer ( "node ID" , nodeID ) )
return nil
}
return rpcstatus . Error ( rpcstatus . Unknown , Error . Wrap ( recvErr ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
switch m := request . GetMessage ( ) . ( type ) {
case * pb . StorageNodeMessage_Succeeded :
2019-10-25 21:36:26 +01:00
err = endpoint . handleSucceeded ( ctx , stream , pending , nodeID , m )
2019-10-11 22:18:05 +01:00
if err != nil {
2019-11-05 19:13:45 +00:00
if metainfo . ErrNodeAlreadyExists . Has ( err ) {
// this will get retried
2020-12-16 16:47:31 +00:00
endpoint . log . Warn ( "node already exists in segment." , zap . Error ( err ) )
2019-11-05 19:13:45 +00:00
continue
}
2019-10-24 20:38:40 +01:00
if ErrInvalidArgument . Has ( err ) {
2020-05-13 16:33:30 +01:00
messageBytes , marshalErr := pb . Marshal ( request )
if marshalErr != nil {
return rpcstatus . Error ( rpcstatus . Internal , marshalErr . Error ( ) )
}
endpoint . log . Warn ( "storagenode failed validation for piece transfer" , zap . Stringer ( "node ID" , nodeID ) , zap . Binary ( "original message from storagenode" , messageBytes ) , zap . Error ( err ) )
2019-10-25 18:16:20 +01:00
// immediately fail and complete graceful exit for nodes that fail satellite validation
2019-11-07 17:19:34 +00:00
err = endpoint . db . IncrementProgress ( ctx , nodeID , 0 , 0 , 1 )
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2020-10-13 13:13:41 +01:00
mon . Meter ( "graceful_exit_fail_validation" ) . Mark ( 1 ) //mon:locked
2019-10-25 18:16:20 +01:00
2019-11-08 18:57:51 +00:00
exitStatusRequest := & overlay . ExitStatusRequest {
NodeID : nodeID ,
ExitFinishedAt : time . Now ( ) . UTC ( ) ,
ExitSuccess : false ,
2019-10-25 18:16:20 +01:00
}
2021-06-30 09:05:09 +01:00
err := endpoint . handleFinished ( ctx , stream , exitStatusRequest , pb . ExitFailed_VERIFICATION_FAILED , usesSegmentTransferQueue )
2019-10-25 18:16:20 +01:00
if err != nil {
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
break
2019-10-24 20:38:40 +01:00
}
return rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
2019-10-11 22:18:05 +01:00
}
case * pb . StorageNodeMessage_Failed :
err = endpoint . handleFailed ( ctx , pending , nodeID , m )
if err != nil {
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Internal , Error . Wrap ( err ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
default :
2019-10-24 20:38:40 +01:00
return rpcstatus . Error ( rpcstatus . Unknown , Error . New ( "unknown storage node message: %v" , m ) . Error ( ) )
2019-10-11 22:18:05 +01:00
}
}
return nil
}
2020-05-11 06:20:34 +01:00
func ( endpoint * Endpoint ) processIncomplete ( ctx context . Context , stream pb . DRPCSatelliteGracefulExit_ProcessStream , pending * PendingMap , incomplete * TransferQueueItem ) error {
2019-10-11 22:18:05 +01:00
nodeID := incomplete . NodeID
2019-11-07 16:13:05 +00:00
2019-11-13 14:54:50 +00:00
if incomplete . OrderLimitSendCount >= endpoint . config . MaxOrderLimitSendCount {
err := endpoint . db . IncrementProgress ( ctx , nodeID , 0 , 0 , 1 )
if err != nil {
return Error . Wrap ( err )
}
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , incomplete . Key , incomplete . StreamID , incomplete . Position , incomplete . PieceNum )
2019-11-13 14:54:50 +00:00
if err != nil {
return Error . Wrap ( err )
}
return nil
}
2021-06-30 09:05:09 +01:00
segment , err := endpoint . getValidSegment ( ctx , incomplete . Key , incomplete . StreamID , incomplete . Position , incomplete . RootPieceID )
2019-10-11 22:18:05 +01:00
if err != nil {
2020-12-16 16:47:31 +00:00
endpoint . log . Warn ( "invalid segment" , zap . Error ( err ) )
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , incomplete . Key , incomplete . StreamID , incomplete . Position , incomplete . PieceNum )
2019-11-07 16:13:05 +00:00
if err != nil {
return Error . Wrap ( err )
}
2019-10-11 22:18:05 +01:00
return nil
}
2020-12-16 16:47:31 +00:00
nodePiece , err := endpoint . getNodePiece ( ctx , segment , incomplete )
2019-10-11 22:18:05 +01:00
if err != nil {
2021-06-30 09:05:09 +01:00
deleteErr := endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , incomplete . Key , incomplete . StreamID , incomplete . Position , incomplete . PieceNum )
2019-11-08 18:57:51 +00:00
if deleteErr != nil {
return Error . Wrap ( deleteErr )
}
2019-10-11 22:18:05 +01:00
return Error . Wrap ( err )
}
2020-12-16 16:47:31 +00:00
pieceSize , err := endpoint . calculatePieceSize ( ctx , segment , incomplete )
2019-11-08 18:57:51 +00:00
if ErrAboveOptimalThreshold . Has ( err ) {
2020-12-16 16:47:31 +00:00
err = endpoint . UpdatePiecesCheckDuplicates ( ctx , segment , metabase . Pieces { } , metabase . Pieces { nodePiece } , false )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , incomplete . Key , incomplete . StreamID , incomplete . Position , incomplete . PieceNum )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
return nil
}
2019-11-08 18:57:51 +00:00
if err != nil {
return Error . Wrap ( err )
}
2019-10-11 22:18:05 +01:00
2019-11-08 18:57:51 +00:00
// populate excluded node IDs
2020-12-16 16:47:31 +00:00
pieces := segment . Pieces
2020-03-12 18:37:57 +00:00
excludedIDs := make ( [ ] storj . NodeID , len ( pieces ) )
2019-11-08 18:57:51 +00:00
for i , piece := range pieces {
2020-12-16 16:47:31 +00:00
excludedIDs [ i ] = piece . StorageNode
2019-11-08 18:57:51 +00:00
}
2019-10-11 22:18:05 +01:00
2019-11-08 18:57:51 +00:00
// get replacement node
request := & overlay . FindStorageNodesRequest {
2019-10-11 22:18:05 +01:00
RequestedCount : 1 ,
2020-03-12 18:37:57 +00:00
ExcludedIDs : excludedIDs ,
2019-10-11 22:18:05 +01:00
}
2020-05-06 14:05:31 +01:00
newNodes , err := endpoint . overlay . FindStorageNodesForGracefulExit ( ctx , * request )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
if len ( newNodes ) == 0 {
2020-09-03 10:38:54 +01:00
return Error . New ( "could not find a node to receive piece transfer: node ID %v, key %v, piece num %v" , nodeID , incomplete . Key , incomplete . PieceNum )
2019-10-11 22:18:05 +01:00
}
2019-11-08 18:57:51 +00:00
2019-10-11 22:18:05 +01:00
newNode := newNodes [ 0 ]
2020-03-28 14:56:05 +00:00
endpoint . log . Debug ( "found new node for piece transfer" , zap . Stringer ( "original node ID" , nodeID ) , zap . Stringer ( "replacement node ID" , newNode . ID ) ,
2021-06-30 09:05:09 +01:00
zap . ByteString ( "key" , incomplete . Key ) ,
zap . ByteString ( "streamID" , incomplete . StreamID [ : ] ) , zap . Uint32 ( "Part" , incomplete . Position . Part ) , zap . Uint32 ( "Index" , incomplete . Position . Index ) ,
zap . Int32 ( "piece num" , incomplete . PieceNum ) )
2019-10-11 22:18:05 +01:00
2020-12-16 16:47:31 +00:00
pieceID := segment . RootPieceID . Derive ( nodeID , incomplete . PieceNum )
2019-10-11 22:18:05 +01:00
2021-06-30 09:05:09 +01:00
limit , privateKey , err := endpoint . orders . CreateGracefulExitPutOrderLimit ( ctx , metabase . BucketLocation { } , newNode . ID , incomplete . PieceNum , segment . RootPieceID , int32 ( pieceSize ) )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
transferMsg := & pb . SatelliteMessage {
Message : & pb . SatelliteMessage_TransferPiece {
TransferPiece : & pb . TransferPiece {
2019-10-15 20:59:12 +01:00
OriginalPieceId : pieceID ,
2019-10-11 22:18:05 +01:00
AddressedOrderLimit : limit ,
PrivateKey : privateKey ,
} ,
} ,
}
err = stream . Send ( transferMsg )
if err != nil {
return Error . Wrap ( err )
}
2019-11-08 18:57:51 +00:00
2021-06-30 09:05:09 +01:00
err = endpoint . db . IncrementOrderLimitSendCount ( ctx , nodeID , incomplete . Key , incomplete . StreamID , incomplete . Position , incomplete . PieceNum )
2019-11-13 14:54:50 +00:00
if err != nil {
return Error . Wrap ( err )
}
2019-11-08 18:57:51 +00:00
// update pending queue with the transfer item
2019-11-21 22:03:16 +00:00
err = pending . Put ( pieceID , & PendingTransfer {
2020-12-16 16:47:31 +00:00
Key : incomplete . Key ,
2021-06-30 09:05:09 +01:00
StreamID : incomplete . StreamID ,
Position : incomplete . Position ,
2020-12-16 16:47:31 +00:00
PieceSize : pieceSize ,
SatelliteMessage : transferMsg ,
OriginalRootPieceID : segment . RootPieceID ,
PieceNum : uint16 ( incomplete . PieceNum ) , // TODO
2019-10-11 22:18:05 +01:00
} )
2019-11-21 22:03:16 +00:00
return err
2019-10-11 22:18:05 +01:00
}
2020-05-11 06:20:34 +01:00
func ( endpoint * Endpoint ) handleSucceeded ( ctx context . Context , stream pb . DRPCSatelliteGracefulExit_ProcessStream , pending * PendingMap , exitingNodeID storj . NodeID , message * pb . StorageNodeMessage_Succeeded ) ( err error ) {
2019-10-11 22:18:05 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2019-10-24 20:38:40 +01:00
originalPieceID := message . Succeeded . OriginalPieceId
2019-11-21 22:03:16 +00:00
transfer , ok := pending . Get ( originalPieceID )
2019-10-24 20:38:40 +01:00
if ! ok {
endpoint . log . Error ( "Could not find transfer item in pending queue" , zap . Stringer ( "Piece ID" , originalPieceID ) )
return Error . New ( "Could not find transfer item in pending queue" )
2019-10-11 22:18:05 +01:00
}
2019-10-24 20:38:40 +01:00
2019-11-08 18:57:51 +00:00
err = endpoint . validatePendingTransfer ( ctx , transfer )
2019-10-24 20:38:40 +01:00
if err != nil {
2019-11-08 18:57:51 +00:00
return Error . Wrap ( err )
2019-10-24 20:38:40 +01:00
}
2019-11-21 22:03:16 +00:00
receivingNodeID := transfer . SatelliteMessage . GetTransferPiece ( ) . GetAddressedOrderLimit ( ) . GetLimit ( ) . StorageNodeId
2019-10-24 20:38:40 +01:00
// get peerID and signee for new storage node
peerID , err := endpoint . peerIdentities . Get ( ctx , receivingNodeID )
if err != nil {
return Error . Wrap ( err )
}
2019-11-08 18:57:51 +00:00
// verify transferred piece
err = endpoint . verifyPieceTransferred ( ctx , message , transfer , peerID )
2019-10-24 20:38:40 +01:00
if err != nil {
2019-11-08 18:57:51 +00:00
return Error . Wrap ( err )
2019-10-11 22:18:05 +01:00
}
2021-06-30 09:05:09 +01:00
transferQueueItem , err := endpoint . db . GetTransferQueueItem ( ctx , exitingNodeID , transfer . Key , transfer . StreamID , transfer . Position , int32 ( transfer . PieceNum ) )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2021-06-30 09:05:09 +01:00
err = endpoint . updateSegment ( ctx , exitingNodeID , receivingNodeID , transfer . Key , transfer . StreamID , transfer . Position , transfer . PieceNum , transferQueueItem . RootPieceID )
2019-10-25 16:14:22 +01:00
if err != nil {
2019-11-05 19:13:45 +00:00
// remove the piece from the pending queue so it gets retried
2019-11-21 22:03:16 +00:00
deleteErr := pending . Delete ( originalPieceID )
2019-11-05 19:13:45 +00:00
2019-11-21 22:03:16 +00:00
return Error . Wrap ( errs . Combine ( err , deleteErr ) )
2019-10-25 16:14:22 +01:00
}
2019-10-11 22:18:05 +01:00
var failed int64
2019-10-24 17:24:42 +01:00
if transferQueueItem . FailedCount != nil && * transferQueueItem . FailedCount >= endpoint . config . MaxFailuresPerPiece {
2019-10-11 22:18:05 +01:00
failed = - 1
}
2019-11-21 22:03:16 +00:00
err = endpoint . db . IncrementProgress ( ctx , exitingNodeID , transfer . PieceSize , 1 , failed )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , exitingNodeID , transfer . Key , transfer . StreamID , transfer . Position , int32 ( transfer . PieceNum ) )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2019-11-21 22:03:16 +00:00
err = pending . Delete ( originalPieceID )
if err != nil {
return err
}
2019-10-11 22:18:05 +01:00
2019-10-25 21:36:26 +01:00
deleteMsg := & pb . SatelliteMessage {
Message : & pb . SatelliteMessage_DeletePiece {
DeletePiece : & pb . DeletePiece {
OriginalPieceId : originalPieceID ,
} ,
} ,
}
err = stream . Send ( deleteMsg )
if err != nil {
return Error . Wrap ( err )
}
2019-10-29 20:22:20 +00:00
2020-10-13 13:13:41 +01:00
mon . Meter ( "graceful_exit_transfer_piece_success" ) . Mark ( 1 ) //mon:locked
2019-10-11 22:18:05 +01:00
return nil
}
2019-11-21 22:03:16 +00:00
func ( endpoint * Endpoint ) handleFailed ( ctx context . Context , pending * PendingMap , nodeID storj . NodeID , message * pb . StorageNodeMessage_Failed ) ( err error ) {
2019-10-11 22:18:05 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-04-15 20:32:22 +01:00
endpoint . log . Warn ( "transfer failed" ,
zap . Stringer ( "Piece ID" , message . Failed . OriginalPieceId ) ,
zap . Stringer ( "nodeID" , nodeID ) ,
zap . Stringer ( "transfer error" , message . Failed . GetError ( ) ) ,
)
2020-10-13 13:13:41 +01:00
mon . Meter ( "graceful_exit_transfer_piece_fail" ) . Mark ( 1 ) //mon:locked
2019-11-08 18:57:51 +00:00
2019-10-15 20:59:12 +01:00
pieceID := message . Failed . OriginalPieceId
2019-11-21 22:03:16 +00:00
transfer , ok := pending . Get ( pieceID )
2019-10-11 22:18:05 +01:00
if ! ok {
2020-05-22 16:30:30 +01:00
endpoint . log . Warn ( "could not find transfer message in pending queue. skipping." , zap . Stringer ( "Piece ID" , pieceID ) , zap . Stringer ( "Node ID" , nodeID ) )
2019-10-11 22:18:05 +01:00
2020-05-22 16:30:30 +01:00
// this should be rare and it is not likely this is someone trying to do something malicious since it is a "failure"
return nil
2019-10-11 22:18:05 +01:00
}
2020-05-22 16:30:30 +01:00
2021-06-30 09:05:09 +01:00
transferQueueItem , err := endpoint . db . GetTransferQueueItem ( ctx , nodeID , transfer . Key , transfer . StreamID , transfer . Position , int32 ( transfer . PieceNum ) )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
now := time . Now ( ) . UTC ( )
failedCount := 1
if transferQueueItem . FailedCount != nil {
failedCount = * transferQueueItem . FailedCount + 1
}
errorCode := int ( pb . TransferFailed_Error_value [ message . Failed . Error . String ( ) ] )
2019-11-05 15:04:39 +00:00
// If the error code is NOT_FOUND, the node no longer has the piece.
// Remove the queue item and remove the node from the pointer.
// If the pointer is not piece hash verified, do not count this as a failure.
if pb . TransferFailed_Error ( errorCode ) == pb . TransferFailed_NOT_FOUND {
2021-06-30 09:05:09 +01:00
endpoint . log . Debug ( "piece not found on node" , zap . Stringer ( "node ID" , nodeID ) , zap . ByteString ( "key" , transfer . Key ) ,
zap . ByteString ( "streamID" , transfer . StreamID [ : ] ) , zap . Uint32 ( "Part" , transfer . Position . Part ) , zap . Uint32 ( "Index" , transfer . Position . Index ) ,
zap . Uint16 ( "piece num" , transfer . PieceNum ) )
2020-12-16 16:47:31 +00:00
2021-06-30 09:05:09 +01:00
segment , err := endpoint . getValidSegment ( ctx , transfer . Key , transfer . StreamID , transfer . Position , storj . PieceID { } )
2019-11-05 15:04:39 +00:00
if err != nil {
return Error . Wrap ( err )
}
2020-12-16 16:47:31 +00:00
pieces := segment . Pieces
var nodePiece metabase . Piece
2019-11-05 15:04:39 +00:00
for _ , piece := range pieces {
2020-12-16 16:47:31 +00:00
if piece . StorageNode == nodeID && piece . Number == transfer . PieceNum {
2019-11-05 15:04:39 +00:00
nodePiece = piece
}
}
2020-12-16 16:47:31 +00:00
if nodePiece == ( metabase . Piece { } ) {
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , transfer . Key , transfer . StreamID , transfer . Position , int32 ( transfer . PieceNum ) )
2019-11-05 15:04:39 +00:00
if err != nil {
return Error . Wrap ( err )
}
2019-11-21 22:03:16 +00:00
return pending . Delete ( pieceID )
2019-11-05 15:04:39 +00:00
}
2020-12-16 16:47:31 +00:00
err = endpoint . UpdatePiecesCheckDuplicates ( ctx , segment , metabase . Pieces { } , metabase . Pieces { nodePiece } , false )
2019-11-05 15:04:39 +00:00
if err != nil {
return Error . Wrap ( err )
}
2020-11-24 09:09:48 +00:00
err = endpoint . db . IncrementProgress ( ctx , nodeID , 0 , 0 , 1 )
if err != nil {
return Error . Wrap ( err )
2019-11-05 15:04:39 +00:00
}
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItem ( ctx , nodeID , transfer . Key , transfer . StreamID , transfer . Position , int32 ( transfer . PieceNum ) )
2019-11-05 15:04:39 +00:00
if err != nil {
return Error . Wrap ( err )
}
2019-11-21 22:03:16 +00:00
return pending . Delete ( pieceID )
2019-11-05 15:04:39 +00:00
}
2019-10-11 22:18:05 +01:00
transferQueueItem . LastFailedAt = & now
transferQueueItem . FailedCount = & failedCount
transferQueueItem . LastFailedCode = & errorCode
2021-06-30 09:05:09 +01:00
err = endpoint . db . UpdateTransferQueueItem ( ctx , * transferQueueItem , transfer . Key == nil )
2019-10-11 22:18:05 +01:00
if err != nil {
return Error . Wrap ( err )
}
2019-10-24 17:24:42 +01:00
// only increment overall failed count if piece failures has reached the threshold
if failedCount == endpoint . config . MaxFailuresPerPiece {
2019-10-11 22:18:05 +01:00
err = endpoint . db . IncrementProgress ( ctx , nodeID , 0 , 0 , 1 )
if err != nil {
return Error . Wrap ( err )
}
}
2019-11-21 22:03:16 +00:00
return pending . Delete ( pieceID )
2019-10-11 22:18:05 +01:00
}
2019-10-25 16:14:22 +01:00
2021-06-30 09:05:09 +01:00
func ( endpoint * Endpoint ) handleDisqualifiedNode ( ctx context . Context , nodeID storj . NodeID , usesSegmentTransferQueue bool ) ( isDisqualified bool , err error ) {
2019-11-08 18:57:51 +00:00
// check if node is disqualified
nodeInfo , err := endpoint . overlay . Get ( ctx , nodeID )
if err != nil {
return false , Error . Wrap ( err )
}
if nodeInfo . Disqualified != nil {
// update graceful exit status to be failed
exitStatusRequest := & overlay . ExitStatusRequest {
NodeID : nodeID ,
ExitFinishedAt : time . Now ( ) . UTC ( ) ,
ExitSuccess : false ,
}
_ , err = endpoint . overlaydb . UpdateExitStatus ( ctx , exitStatusRequest )
if err != nil {
return true , Error . Wrap ( err )
}
// remove remaining items from the queue
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItems ( ctx , nodeID , usesSegmentTransferQueue )
2019-11-08 18:57:51 +00:00
if err != nil {
return true , Error . Wrap ( err )
}
return true , nil
}
return false , nil
}
2021-06-30 09:05:09 +01:00
func ( endpoint * Endpoint ) handleFinished ( ctx context . Context , stream pb . DRPCSatelliteGracefulExit_ProcessStream , exitStatusRequest * overlay . ExitStatusRequest , failedReason pb . ExitFailed_Reason , usesSegmentTransferQueue bool ) error {
2019-11-08 18:57:51 +00:00
finishedMsg , err := endpoint . getFinishedMessage ( ctx , exitStatusRequest . NodeID , exitStatusRequest . ExitFinishedAt , exitStatusRequest . ExitSuccess , failedReason )
if err != nil {
return Error . Wrap ( err )
}
_ , err = endpoint . overlaydb . UpdateExitStatus ( ctx , exitStatusRequest )
if err != nil {
return Error . Wrap ( err )
}
err = stream . Send ( finishedMsg )
if err != nil {
return Error . Wrap ( err )
}
// remove remaining items from the queue after notifying nodes about their exit status
2021-06-30 09:05:09 +01:00
err = endpoint . db . DeleteTransferQueueItems ( ctx , exitStatusRequest . NodeID , usesSegmentTransferQueue )
2019-11-08 18:57:51 +00:00
if err != nil {
return Error . Wrap ( err )
}
return nil
}
func ( endpoint * Endpoint ) getFinishedMessage ( ctx context . Context , nodeID storj . NodeID , finishedAt time . Time , success bool , reason pb . ExitFailed_Reason ) ( message * pb . SatelliteMessage , err error ) {
2019-10-25 21:36:26 +01:00
if success {
unsigned := & pb . ExitCompleted {
SatelliteId : endpoint . signer . ID ( ) ,
NodeId : nodeID ,
Completed : finishedAt ,
}
signed , err := signing . SignExitCompleted ( ctx , endpoint . signer , unsigned )
if err != nil {
return nil , Error . Wrap ( err )
}
message = & pb . SatelliteMessage { Message : & pb . SatelliteMessage_ExitCompleted {
ExitCompleted : signed ,
} }
} else {
unsigned := & pb . ExitFailed {
SatelliteId : endpoint . signer . ID ( ) ,
NodeId : nodeID ,
Failed : finishedAt ,
}
if reason >= 0 {
unsigned . Reason = reason
}
signed , err := signing . SignExitFailed ( ctx , endpoint . signer , unsigned )
if err != nil {
return nil , Error . Wrap ( err )
}
message = & pb . SatelliteMessage { Message : & pb . SatelliteMessage_ExitFailed {
ExitFailed : signed ,
} }
2020-01-03 19:11:47 +00:00
err = endpoint . overlay . DisqualifyNode ( ctx , nodeID )
if err != nil {
return nil , Error . Wrap ( err )
}
2019-10-25 21:36:26 +01:00
}
return message , nil
}
2021-06-30 09:05:09 +01:00
func ( endpoint * Endpoint ) updateSegment ( ctx context . Context , exitingNodeID storj . NodeID , receivingNodeID storj . NodeID , key metabase . SegmentKey , streamID uuid . UUID , position metabase . SegmentPosition , pieceNumber uint16 , originalRootPieceID storj . PieceID ) ( err error ) {
2019-10-25 16:14:22 +01:00
defer mon . Task ( ) ( & ctx ) ( & err )
2020-12-16 16:47:31 +00:00
// remove the node from the segment
2021-06-30 09:05:09 +01:00
segment , err := endpoint . getValidSegment ( ctx , key , streamID , position , originalRootPieceID )
2019-10-25 16:14:22 +01:00
if err != nil {
return Error . Wrap ( err )
}
2020-12-16 16:47:31 +00:00
pieceMap := make ( map [ storj . NodeID ] metabase . Piece )
for _ , piece := range segment . Pieces {
pieceMap [ piece . StorageNode ] = piece
2019-10-27 18:20:22 +00:00
}
2020-12-16 16:47:31 +00:00
var toRemove metabase . Pieces
2019-10-27 18:20:22 +00:00
existingPiece , ok := pieceMap [ exitingNodeID ]
if ! ok {
return Error . New ( "node no longer has the piece. Node ID: %s" , exitingNodeID . String ( ) )
}
2020-12-16 16:47:31 +00:00
if existingPiece != ( metabase . Piece { } ) && existingPiece . Number != pieceNumber {
return Error . New ( "invalid existing piece info. Exiting Node ID: %s, PieceNum: %d" , exitingNodeID . String ( ) , pieceNumber )
2019-10-25 16:14:22 +01:00
}
2020-12-16 16:47:31 +00:00
toRemove = metabase . Pieces { existingPiece }
2019-10-27 18:20:22 +00:00
delete ( pieceMap , exitingNodeID )
2020-12-16 16:47:31 +00:00
var toAdd metabase . Pieces
2019-10-25 16:14:22 +01:00
if ! receivingNodeID . IsZero ( ) {
2020-12-16 16:47:31 +00:00
toAdd = metabase . Pieces { {
Number : pieceNumber ,
StorageNode : receivingNodeID ,
2019-10-25 16:14:22 +01:00
} }
}
2020-12-16 16:47:31 +00:00
err = endpoint . UpdatePiecesCheckDuplicates ( ctx , segment , toAdd , toRemove , true )
2019-10-25 16:14:22 +01:00
if err != nil {
return Error . Wrap ( err )
}
return nil
}
2019-11-07 16:13:05 +00:00
2019-11-08 18:57:51 +00:00
// checkExitStatus returns a satellite message based on a node current graceful exit status
// if a node hasn't started graceful exit, it will initialize the process
// if a node has finished graceful exit, it will return a finished message
// if a node has started graceful exit, but no transfer item is available yet, it will return an not ready message
2020-07-16 15:18:02 +01:00
// otherwise, the returned message will be nil.
2019-11-08 18:57:51 +00:00
func ( endpoint * Endpoint ) checkExitStatus ( ctx context . Context , nodeID storj . NodeID ) ( * pb . SatelliteMessage , error ) {
exitStatus , err := endpoint . overlaydb . GetExitStatus ( ctx , nodeID )
2019-11-07 17:19:34 +00:00
if err != nil {
2019-11-08 18:57:51 +00:00
return nil , Error . Wrap ( err )
2019-11-07 17:19:34 +00:00
}
2019-11-08 18:57:51 +00:00
if exitStatus . ExitFinishedAt != nil {
// TODO maybe we should store the reason in the DB so we know how it originally failed.
return endpoint . getFinishedMessage ( ctx , nodeID , * exitStatus . ExitFinishedAt , exitStatus . ExitSuccess , - 1 )
}
2019-11-07 17:19:34 +00:00
2019-11-08 18:57:51 +00:00
if exitStatus . ExitInitiatedAt == nil {
2019-12-20 21:51:13 +00:00
nodeDossier , err := endpoint . overlaydb . Get ( ctx , nodeID )
if err != nil {
endpoint . log . Error ( "unable to retrieve node dossier for attempted exiting node" , zap . Stringer ( "node ID" , nodeID ) )
return nil , Error . Wrap ( err )
}
geEligibilityDate := nodeDossier . CreatedAt . AddDate ( 0 , endpoint . config . NodeMinAgeInMonths , 0 )
if time . Now ( ) . Before ( geEligibilityDate ) {
return nil , ErrIneligibleNodeAge . New ( "will be eligible after %s" , geEligibilityDate . String ( ) )
}
2019-11-08 18:57:51 +00:00
request := & overlay . ExitStatusRequest { NodeID : nodeID , ExitInitiatedAt : time . Now ( ) . UTC ( ) }
node , err := endpoint . overlaydb . UpdateExitStatus ( ctx , request )
2019-11-07 17:19:34 +00:00
if err != nil {
2019-11-08 18:57:51 +00:00
return nil , Error . Wrap ( err )
2019-11-07 17:19:34 +00:00
}
2019-11-08 18:57:51 +00:00
err = endpoint . db . IncrementProgress ( ctx , nodeID , 0 , 0 , 0 )
2019-11-07 17:19:34 +00:00
if err != nil {
2019-11-08 18:57:51 +00:00
return nil , Error . Wrap ( err )
2019-11-07 17:19:34 +00:00
}
2019-11-08 18:57:51 +00:00
// graceful exit initiation metrics
age := time . Now ( ) . UTC ( ) . Sub ( node . CreatedAt . UTC ( ) )
2020-10-13 13:13:41 +01:00
mon . FloatVal ( "graceful_exit_init_node_age_seconds" ) . Observe ( age . Seconds ( ) ) //mon:locked
mon . IntVal ( "graceful_exit_init_node_audit_success_count" ) . Observe ( node . Reputation . AuditSuccessCount ) //mon:locked
mon . IntVal ( "graceful_exit_init_node_audit_total_count" ) . Observe ( node . Reputation . AuditCount ) //mon:locked
mon . IntVal ( "graceful_exit_init_node_piece_count" ) . Observe ( node . PieceCount ) //mon:locked
2019-11-08 18:57:51 +00:00
return & pb . SatelliteMessage { Message : & pb . SatelliteMessage_NotReady { NotReady : & pb . NotReady { } } } , nil
2019-11-07 17:19:34 +00:00
}
2019-11-08 18:57:51 +00:00
if exitStatus . ExitLoopCompletedAt == nil {
return & pb . SatelliteMessage { Message : & pb . SatelliteMessage_NotReady { NotReady : & pb . NotReady { } } } , nil
}
return nil , nil
}
func ( endpoint * Endpoint ) generateExitStatusRequest ( ctx context . Context , nodeID storj . NodeID ) ( * overlay . ExitStatusRequest , pb . ExitFailed_Reason , error ) {
var exitFailedReason pb . ExitFailed_Reason = - 1
progress , err := endpoint . db . GetProgress ( ctx , nodeID )
if err != nil {
return nil , exitFailedReason , rpcstatus . Error ( rpcstatus . Internal , err . Error ( ) )
}
2020-10-13 13:13:41 +01:00
mon . IntVal ( "graceful_exit_final_pieces_failed" ) . Observe ( progress . PiecesFailed ) //mon:locked
mon . IntVal ( "graceful_exit_final_pieces_succeess" ) . Observe ( progress . PiecesTransferred ) //mon:locked
mon . IntVal ( "graceful_exit_final_bytes_transferred" ) . Observe ( progress . BytesTransferred ) //mon:locked
2019-11-08 18:57:51 +00:00
processed := progress . PiecesFailed + progress . PiecesTransferred
if processed > 0 {
2020-10-13 13:13:41 +01:00
mon . IntVal ( "graceful_exit_successful_pieces_transfer_ratio" ) . Observe ( progress . PiecesTransferred / processed ) //mon:locked
2019-11-08 18:57:51 +00:00
}
exitStatusRequest := & overlay . ExitStatusRequest {
NodeID : progress . NodeID ,
ExitFinishedAt : time . Now ( ) . UTC ( ) ,
}
// check node's exiting progress to see if it has failed passed max failure threshold
if processed > 0 && float64 ( progress . PiecesFailed ) / float64 ( processed ) * 100 >= float64 ( endpoint . config . OverallMaxFailuresPercentage ) {
exitStatusRequest . ExitSuccess = false
exitFailedReason = pb . ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED
} else {
exitStatusRequest . ExitSuccess = true
}
if exitStatusRequest . ExitSuccess {
2020-10-13 13:13:41 +01:00
mon . Meter ( "graceful_exit_success" ) . Mark ( 1 ) //mon:locked
2019-11-08 18:57:51 +00:00
} else {
2020-10-13 13:13:41 +01:00
mon . Meter ( "graceful_exit_fail_max_failures_percentage" ) . Mark ( 1 ) //mon:locked
2019-11-08 18:57:51 +00:00
}
return exitStatusRequest , exitFailedReason , nil
}
2020-12-16 16:47:31 +00:00
func ( endpoint * Endpoint ) calculatePieceSize ( ctx context . Context , segment metabase . Segment , incomplete * TransferQueueItem ) ( int64 , error ) {
2019-11-08 18:57:51 +00:00
nodeID := incomplete . NodeID
// calculate piece size
2020-12-16 16:47:31 +00:00
redundancy , err := eestream . NewRedundancyStrategyFromStorj ( segment . Redundancy )
2019-11-08 18:57:51 +00:00
if err != nil {
return 0 , Error . Wrap ( err )
}
2020-12-16 16:47:31 +00:00
if len ( segment . Pieces ) > redundancy . OptimalThreshold ( ) {
endpoint . log . Debug ( "segment has more pieces than required. removing node from segment." , zap . Stringer ( "node ID" , nodeID ) , zap . ByteString ( "key" , incomplete . Key ) , zap . Int32 ( "piece num" , incomplete . PieceNum ) )
2019-11-08 18:57:51 +00:00
return 0 , ErrAboveOptimalThreshold . New ( "" )
}
2020-12-16 16:47:31 +00:00
return eestream . CalcPieceSize ( int64 ( segment . EncryptedSize ) , redundancy ) , nil
2019-11-07 17:19:34 +00:00
}
2021-06-30 09:05:09 +01:00
func ( endpoint * Endpoint ) getValidSegment ( ctx context . Context , key metabase . SegmentKey , streamID uuid . UUID , position metabase . SegmentPosition , originalRootPieceID storj . PieceID ) ( metabase . Segment , error ) {
// TODO: remove when table graceful_exit_transfer_queue is dropped
if key != nil {
location , err := metabase . ParseSegmentKey ( key )
if err != nil {
return metabase . Segment { } , Error . Wrap ( err )
}
segment , err := endpoint . metabase . GetSegmentByLocation ( ctx , metabase . GetSegmentByLocation {
SegmentLocation : location ,
} )
if err != nil {
return metabase . Segment { } , Error . Wrap ( err )
}
if ! originalRootPieceID . IsZero ( ) && originalRootPieceID != segment . RootPieceID {
return metabase . Segment { } , Error . New ( "segment has changed" )
}
return segment , nil
2020-12-16 16:47:31 +00:00
}
2021-06-30 09:05:09 +01:00
segment , err := endpoint . metabase . GetSegmentByPosition ( ctx , metabase . GetSegmentByPosition {
StreamID : streamID ,
Position : position ,
2020-12-16 16:47:31 +00:00
} )
2019-11-07 16:13:05 +00:00
if err != nil {
2020-12-16 16:47:31 +00:00
return metabase . Segment { } , Error . Wrap ( err )
2019-11-07 16:13:05 +00:00
}
2020-12-16 16:47:31 +00:00
if ! originalRootPieceID . IsZero ( ) && originalRootPieceID != segment . RootPieceID {
2021-02-10 10:15:19 +00:00
return metabase . Segment { } , Error . New ( "segment has changed" )
2019-11-07 16:13:05 +00:00
}
2020-12-16 16:47:31 +00:00
return segment , nil
2019-11-07 16:13:05 +00:00
}
2019-11-08 18:57:51 +00:00
2020-12-16 16:47:31 +00:00
func ( endpoint * Endpoint ) getNodePiece ( ctx context . Context , segment metabase . Segment , incomplete * TransferQueueItem ) ( metabase . Piece , error ) {
2019-11-08 18:57:51 +00:00
nodeID := incomplete . NodeID
2020-12-16 16:47:31 +00:00
var nodePiece metabase . Piece
for _ , piece := range segment . Pieces {
if piece . StorageNode == nodeID && int32 ( piece . Number ) == incomplete . PieceNum {
2019-11-08 18:57:51 +00:00
nodePiece = piece
}
}
2020-12-16 16:47:31 +00:00
if nodePiece == ( metabase . Piece { } ) {
2020-09-03 10:38:54 +01:00
endpoint . log . Debug ( "piece no longer held by node" , zap . Stringer ( "node ID" , nodeID ) , zap . ByteString ( "key" , incomplete . Key ) , zap . Int32 ( "piece num" , incomplete . PieceNum ) )
2020-12-16 16:47:31 +00:00
return metabase . Piece { } , Error . New ( "piece no longer held by node" )
2019-11-08 18:57:51 +00:00
}
return nodePiece , nil
}
2020-06-05 21:11:46 +01:00
// GracefulExitFeasibility returns node's joined at date, nodeMinAge and if graceful exit available.
func ( endpoint * Endpoint ) GracefulExitFeasibility ( ctx context . Context , req * pb . GracefulExitFeasibilityRequest ) ( _ * pb . GracefulExitFeasibilityResponse , err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
peer , err := identity . PeerIdentityFromContext ( ctx )
if err != nil {
return nil , rpcstatus . Error ( rpcstatus . Unauthenticated , Error . Wrap ( err ) . Error ( ) )
}
endpoint . log . Debug ( "graceful exit process" , zap . Stringer ( "Node ID" , peer . ID ) )
var response pb . GracefulExitFeasibilityResponse
nodeDossier , err := endpoint . overlaydb . Get ( ctx , peer . ID )
if err != nil {
endpoint . log . Error ( "unable to retrieve node dossier for attempted exiting node" , zap . Stringer ( "node ID" , peer . ID ) )
return nil , Error . Wrap ( err )
}
eligibilityDate := nodeDossier . CreatedAt . AddDate ( 0 , endpoint . config . NodeMinAgeInMonths , 0 )
if time . Now ( ) . Before ( eligibilityDate ) {
response . IsAllowed = false
} else {
response . IsAllowed = true
}
response . JoinedAt = nodeDossier . CreatedAt
response . MonthsRequired = int32 ( endpoint . config . NodeMinAgeInMonths )
return & response , nil
}
2020-12-16 16:47:31 +00:00
// UpdatePiecesCheckDuplicates atomically adds toAdd pieces and removes toRemove pieces from
// the segment.
//
// If checkDuplicates is true it will return an error if the nodes to be
// added are already in the segment.
// Then it will remove the toRemove pieces and then it will add the toAdd pieces.
func ( endpoint * Endpoint ) UpdatePiecesCheckDuplicates ( ctx context . Context , segment metabase . Segment , toAdd , toRemove metabase . Pieces , checkDuplicates bool ) ( err error ) {
defer mon . Task ( ) ( & ctx ) ( & err )
// put all existing pieces to a map
pieceMap := make ( map [ uint16 ] metabase . Piece )
nodePieceMap := make ( map [ storj . NodeID ] struct { } )
for _ , piece := range segment . Pieces {
pieceMap [ piece . Number ] = piece
if checkDuplicates {
nodePieceMap [ piece . StorageNode ] = struct { } { }
}
}
// Return an error if the segment already has a piece for this node
if checkDuplicates {
for _ , piece := range toAdd {
_ , ok := nodePieceMap [ piece . StorageNode ]
if ok {
return metainfo . ErrNodeAlreadyExists . New ( "node id already exists in segment. StreamID: %s, Position: %d, NodeID: %s" , segment . StreamID , segment . Position , piece . StorageNode . String ( ) )
}
nodePieceMap [ piece . StorageNode ] = struct { } { }
}
}
// remove the toRemove pieces from the map
// only if all piece number, node id
for _ , piece := range toRemove {
if piece == ( metabase . Piece { } ) {
continue
}
existing := pieceMap [ piece . Number ]
if existing != ( metabase . Piece { } ) && existing . StorageNode == piece . StorageNode {
delete ( pieceMap , piece . Number )
}
}
// add the toAdd pieces to the map
for _ , piece := range toAdd {
if piece == ( metabase . Piece { } ) {
continue
}
_ , exists := pieceMap [ piece . Number ]
if exists {
return Error . New ( "piece to add already exists (piece no: %d)" , piece . Number )
}
pieceMap [ piece . Number ] = piece
}
2020-12-21 13:48:29 +00:00
// copy the pieces from the map back to the segment, sorted by piece number
pieces := make ( metabase . Pieces , 0 , len ( pieceMap ) )
2020-12-16 16:47:31 +00:00
for _ , piece := range pieceMap {
pieces = append ( pieces , piece )
}
2020-12-21 13:48:29 +00:00
sort . Sort ( pieces )
2020-12-16 16:47:31 +00:00
err = endpoint . metabase . UpdateSegmentPieces ( ctx , metabase . UpdateSegmentPieces {
StreamID : segment . StreamID ,
Position : segment . Position ,
2021-03-12 11:23:44 +00:00
OldPieces : segment . Pieces ,
NewRedundancy : segment . Redundancy ,
NewPieces : pieces ,
2020-12-16 16:47:31 +00:00
} )
if err != nil {
if metabase . ErrSegmentNotFound . Has ( err ) {
err = storj . ErrObjectNotFound . Wrap ( err )
}
return Error . Wrap ( err )
}
return nil
}