satellite/gracefulexit: separate functional code in endpoint (#3476)

This commit is contained in:
Yingrong Zhao 2019-11-08 13:57:51 -05:00 committed by GitHub
parent 06a52e75b8
commit 69b0ae02bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 329 additions and 249 deletions

View File

@ -5,6 +5,19 @@ storj.io/storj/satellite/accounting/tally."total.objects" IntVal
storj.io/storj/satellite/accounting/tally."total.remote_bytes" IntVal
storj.io/storj/satellite/accounting/tally."total.remote_segments" IntVal
storj.io/storj/satellite/accounting/tally."total.segments" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_fail_max_failures_percentage" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_fail_validation" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_final_bytes_transferred" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_final_pieces_failed" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_final_pieces_succeess" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_age_seconds" FloatVal
storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_audit_success_count" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_audit_total_count" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_piece_count" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_success" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_successful_pieces_transfer_ratio" IntVal
storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter
storj.io/storj/satellite/repair/checker."checker_segment_age" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_healthy_count" IntVal
storj.io/storj/satellite/repair/checker."checker_segment_time_until_irreparable" IntVal

View File

@ -17,6 +17,9 @@ var (
// ErrNodeNotFound is returned if a graceful exit entry for a node does not exist in database
ErrNodeNotFound = errs.Class("graceful exit node not found")
// ErrAboveOptimalThreshold is returned if a graceful exit entry for a node has more pieces than required.
ErrAboveOptimalThreshold = errs.Class("pointer has more pieces than required")
mon = monkit.Package()
)

View File

@ -4,7 +4,6 @@
package gracefulexit
import (
"bytes"
"context"
"io"
"sync"
@ -191,7 +190,6 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
if err != nil {
return rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error())
}
// TODO should we error if the node is DQ'd?
nodeID := peer.ID
endpoint.log.Debug("graceful exit process", zap.Stringer("Node ID", nodeID))
@ -212,72 +210,20 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return rpcstatus.Error(rpcstatus.PermissionDenied, "Disqualified nodes cannot graceful exit")
}
eofHandler := func(err error) error {
if err == io.EOF {
endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("Node ID", nodeID))
return nil
}
if err != nil {
return rpcstatus.Error(rpcstatus.Unknown, Error.Wrap(err).Error())
}
return nil
}
exitStatus, err := endpoint.overlaydb.GetExitStatus(ctx, nodeID)
msg, err := endpoint.checkExitStatus(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exitStatus.ExitFinishedAt != nil {
// TODO maybe we should store the reason in the DB so we know how it originally failed.
finishedMsg, err := endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, *exitStatus.ExitFinishedAt, exitStatus.ExitSuccess, -1)
if msg != nil {
err = stream.Send(msg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = stream.Send(finishedMsg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
return nil
}
if exitStatus.ExitInitiatedAt == nil {
request := &overlay.ExitStatusRequest{NodeID: nodeID, ExitInitiatedAt: time.Now().UTC()}
node, err := endpoint.overlaydb.UpdateExitStatus(ctx, request)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 0)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
// graceful exit initiation metrics
age := time.Now().UTC().Sub(node.CreatedAt.UTC())
mon.FloatVal("graceful_exit_init_node_age_seconds").Observe(age.Seconds())
mon.IntVal("graceful_exit_init_node_audit_success_count").Observe(node.Reputation.AuditSuccessCount)
mon.IntVal("graceful_exit_init_node_audit_total_count").Observe(node.Reputation.AuditCount)
mon.IntVal("graceful_exit_init_node_piece_count").Observe(node.PieceCount)
err = stream.Send(&pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}})
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
return nil
}
if exitStatus.ExitLoopCompletedAt == nil {
err = stream.Send(&pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}})
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
return nil
}
// maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests
pending := newPendingMap()
// these are used to synchronize the "incomplete transfer loop" with the main thread (storagenode receive loop)
morePiecesFlag := true
loopRunningFlag := true
@ -291,6 +237,8 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
// maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests
pending := newPendingMap()
var group errgroup.Group
group.Go(func() error {
incompleteLoop := sync2.NewCycle(endpoint.interval)
@ -361,58 +309,13 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
return rpcstatus.Error(rpcstatus.PermissionDenied, "Disqualified nodes cannot graceful exit")
}
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
}
progress, err := endpoint.db.GetProgress(ctx, nodeID)
// update exit status
exitStatusRequest, exitFailedReason, err := endpoint.generateExitStatusRequest(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
var transferMsg *pb.SatelliteMessage
mon.IntVal("graceful_exit_final_pieces_failed").Observe(progress.PiecesFailed)
mon.IntVal("graceful_exit_final_pieces_succeess").Observe(progress.PiecesTransferred)
mon.IntVal("graceful_exit_final_bytes_transferred").Observe(progress.BytesTransferred)
processed := progress.PiecesFailed + progress.PiecesTransferred
if processed > 0 {
mon.IntVal("graceful_exit_successful_pieces_transfer_ratio").Observe(progress.PiecesTransferred / processed)
}
// check node's exiting progress to see if it has failed passed max failure threshold
if processed > 0 && float64(progress.PiecesFailed)/float64(processed)*100 >= float64(endpoint.config.OverallMaxFailuresPercentage) {
exitStatusRequest.ExitSuccess = false
transferMsg, err = endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
} else {
exitStatusRequest.ExitSuccess = true
transferMsg, err = endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, -1)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
if exitStatusRequest.ExitSuccess {
mon.Meter("graceful_exit_success").Mark(1)
} else {
mon.Meter("graceful_exit_fail_max_failures_percentage").Mark(1)
}
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = stream.Send(transferMsg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
// remove remaining items from the queue after notifying nodes about their exit status
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID)
err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -444,6 +347,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}()
timer := time.NewTimer(endpoint.recvTimeout)
select {
case <-ctx.Done():
return rpcstatus.Error(rpcstatus.Internal, Error.New("context canceled while waiting to receive message from storagenode").Error())
@ -452,7 +356,11 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
case <-done:
}
if recvErr != nil {
return eofHandler(recvErr)
if errs.Is(recvErr, io.EOF) {
endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("node ID", nodeID))
return nil
}
return rpcstatus.Error(rpcstatus.Unknown, Error.Wrap(recvErr).Error())
}
switch m := request.GetMessage().(type) {
@ -467,40 +375,23 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}
if ErrInvalidArgument.Has(err) {
// immediately fail and complete graceful exit for nodes that fail satellite validation
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
mon.Meter("graceful_exit_fail_validation").Mark(1) //locked
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
ExitSuccess: false,
}
finishedMsg, err := endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, pb.ExitFailed_VERIFICATION_FAILED)
err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
mon.Meter("graceful_exit_fail_validation").Mark(1)
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = stream.Send(finishedMsg)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
// remove remaining items from the queue after notifying nodes about their exit status
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
break
}
return rpcstatus.Error(rpcstatus.Internal, err.Error())
@ -535,38 +426,21 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
if err != nil {
return Error.Wrap(err)
}
}
remote := pointer.GetRemote()
pieces := remote.GetRemotePieces()
var nodePiece *pb.RemotePiece
excludedNodeIDs := make([]storj.NodeID, len(pieces))
for i, piece := range pieces {
if piece.NodeId == nodeID && piece.PieceNum == incomplete.PieceNum {
nodePiece = piece
}
excludedNodeIDs[i] = piece.NodeId
}
if nodePiece == nil {
endpoint.log.Debug("piece no longer held by node", zap.Stringer("Node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Path, incomplete.PieceNum)
if err != nil {
return Error.Wrap(err)
}
return nil
}
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
nodePiece, err := endpoint.getNodePiece(ctx, pointer, incomplete)
if err != nil {
deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Path, incomplete.PieceNum)
if deleteErr != nil {
return Error.Wrap(deleteErr)
}
return Error.Wrap(err)
}
if len(remote.GetRemotePieces()) > redundancy.OptimalThreshold() {
endpoint.log.Debug("pointer has more pieces than required. removing node from pointer.", zap.Stringer("Node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
pieceSize, err := endpoint.calculatePieceSize(ctx, pointer, incomplete, nodePiece)
if ErrAboveOptimalThreshold.Has(err) {
_, err = endpoint.metainfo.UpdatePieces(ctx, string(incomplete.Path), pointer, nil, []*pb.RemotePiece{nodePiece})
if err != nil {
return Error.Wrap(err)
@ -576,20 +450,29 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
if err != nil {
return Error.Wrap(err)
}
return nil
}
if err != nil {
return Error.Wrap(err)
}
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
// populate excluded node IDs
remote := pointer.GetRemote()
pieces := remote.RemotePieces
excludedNodeIDs := make([]storj.NodeID, len(pieces))
for i, piece := range pieces {
excludedNodeIDs[i] = piece.NodeId
}
request := overlay.FindStorageNodesRequest{
// get replacement node
request := &overlay.FindStorageNodesRequest{
RequestedCount: 1,
FreeBandwidth: pieceSize,
FreeDisk: pieceSize,
ExcludedNodes: excludedNodeIDs,
}
newNodes, err := endpoint.overlay.FindStorageNodes(ctx, request)
newNodes, err := endpoint.overlay.FindStorageNodes(ctx, *request)
if err != nil {
return Error.Wrap(err)
}
@ -597,6 +480,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
if len(newNodes) == 0 {
return Error.New("could not find a node to receive piece transfer: node ID %v, path %v, piece num %v", nodeID, incomplete.Path, incomplete.PieceNum)
}
newNode := newNodes[0]
endpoint.log.Debug("found new node for piece transfer", zap.Stringer("original node ID", nodeID), zap.Stringer("replacement node ID", newNode.Id),
zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
@ -627,6 +511,8 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS
if err != nil {
return Error.Wrap(err)
}
// update pending queue with the transfer item
pending.put(pieceID, &pendingTransfer{
path: incomplete.Path,
pieceSize: pieceSize,
@ -649,78 +535,22 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr
return Error.New("Could not find transfer item in pending queue")
}
if transfer.satelliteMessage == nil {
return Error.New("Satellite message cannot be nil")
}
if transfer.satelliteMessage.GetTransferPiece() == nil {
return Error.New("Satellite message transfer piece cannot be nil")
}
if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit() == nil {
return Error.New("Addressed order limit on transfer piece cannot be nil")
}
if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil {
return Error.New("Addressed order limit on transfer piece cannot be nil")
}
if transfer.path == nil {
return Error.New("Transfer path cannot be nil")
}
originalOrderLimit := message.Succeeded.GetOriginalOrderLimit()
if originalOrderLimit == nil {
return ErrInvalidArgument.New("Original order limit cannot be nil")
}
originalPieceHash := message.Succeeded.GetOriginalPieceHash()
if originalPieceHash == nil {
return ErrInvalidArgument.New("Original piece hash cannot be nil")
}
replacementPieceHash := message.Succeeded.GetReplacementPieceHash()
if replacementPieceHash == nil {
return ErrInvalidArgument.New("Replacement piece hash cannot be nil")
}
// verify that the original piece hash and replacement piece hash match
if !bytes.Equal(originalPieceHash.Hash, replacementPieceHash.Hash) {
return ErrInvalidArgument.New("Piece hashes for transferred piece don't match")
}
// verify that the satellite signed the original order limit
err = endpoint.orders.VerifyOrderLimitSignature(ctx, originalOrderLimit)
err = endpoint.validatePendingTransfer(ctx, transfer)
if err != nil {
return ErrInvalidArgument.Wrap(err)
}
// verify that the public key on the order limit signed the original piece hash
err = signing.VerifyUplinkPieceHashSignature(ctx, originalOrderLimit.UplinkPublicKey, originalPieceHash)
if err != nil {
return ErrInvalidArgument.Wrap(err)
}
if originalOrderLimit.PieceId != message.Succeeded.OriginalPieceId {
return ErrInvalidArgument.New("Invalid original piece ID")
return Error.Wrap(err)
}
receivingNodeID := transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId
if transfer.originalPointer == nil || transfer.originalPointer.GetRemote() == nil {
return Error.New("could not get remote pointer from transfer item")
}
calculatedNewPieceID := transfer.originalPointer.GetRemote().RootPieceId.Derive(receivingNodeID, transfer.pieceNum)
if calculatedNewPieceID != replacementPieceHash.PieceId {
return ErrInvalidArgument.New("Invalid replacement piece ID")
}
// get peerID and signee for new storage node
peerID, err := endpoint.peerIdentities.Get(ctx, receivingNodeID)
if err != nil {
return Error.Wrap(err)
}
signee := signing.SigneeFromPeerIdentity(peerID)
// verify that the new node signed the replacement piece hash
err = signing.VerifyPieceHashSignature(ctx, signee, replacementPieceHash)
// verify transferred piece
err = endpoint.verifyPieceTransferred(ctx, message, transfer, peerID)
if err != nil {
return ErrInvalidArgument.Wrap(err)
return Error.Wrap(err)
}
transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.path, transfer.pieceNum)
if err != nil {
return Error.Wrap(err)
@ -764,14 +594,15 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr
return Error.Wrap(err)
}
mon.Meter("graceful_exit_transfer_piece_success").Mark(1)
mon.Meter("graceful_exit_transfer_piece_success").Mark(1) //locked
return nil
}
func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) {
defer mon.Task()(&ctx)(&err)
endpoint.log.Warn("transfer failed", zap.Stringer("Piece ID", message.Failed.OriginalPieceId), zap.Stringer("transfer error", message.Failed.GetError()))
mon.Meter("graceful_exit_transfer_piece_fail").Mark(1)
mon.Meter("graceful_exit_transfer_piece_fail").Mark(1) //locked
pieceID := message.Failed.OriginalPieceId
transfer, ok := pending.get(pieceID)
if !ok {
@ -870,7 +701,64 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap,
return nil
}
func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, signer signing.Signer, nodeID storj.NodeID, finishedAt time.Time, success bool, reason pb.ExitFailed_Reason) (message *pb.SatelliteMessage, err error) {
func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) {
// check if node is disqualified
nodeInfo, err := endpoint.overlay.Get(ctx, nodeID)
if err != nil {
return false, Error.Wrap(err)
}
if nodeInfo.Disqualified != nil {
// update graceful exit status to be failed
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
ExitSuccess: false,
}
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
return true, Error.Wrap(err)
}
// remove remaining items from the queue
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID)
if err != nil {
return true, Error.Wrap(err)
}
return true, nil
}
return false, nil
}
func (endpoint *Endpoint) handleFinished(ctx context.Context, stream processStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason) error {
finishedMsg, err := endpoint.getFinishedMessage(ctx, exitStatusRequest.NodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, failedReason)
if err != nil {
return Error.Wrap(err)
}
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
return Error.Wrap(err)
}
err = stream.Send(finishedMsg)
if err != nil {
return Error.Wrap(err)
}
// remove remaining items from the queue after notifying nodes about their exit status
err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID)
if err != nil {
return Error.Wrap(err)
}
return nil
}
func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, nodeID storj.NodeID, finishedAt time.Time, success bool, reason pb.ExitFailed_Reason) (message *pb.SatelliteMessage, err error) {
if success {
unsigned := &pb.ExitCompleted{
SatelliteId: endpoint.signer.ID(),
@ -950,36 +838,105 @@ func (endpoint *Endpoint) updatePointer(ctx context.Context, originalPointer *pb
return nil
}
func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) {
// check if node is disqualified
nodeInfo, err := endpoint.overlay.Get(ctx, nodeID)
// checkExitStatus returns a satellite message based on a node current graceful exit status
// if a node hasn't started graceful exit, it will initialize the process
// if a node has finished graceful exit, it will return a finished message
// if a node has started graceful exit, but no transfer item is available yet, it will return an not ready message
// otherwise, the returned message will be nil
func (endpoint *Endpoint) checkExitStatus(ctx context.Context, nodeID storj.NodeID) (*pb.SatelliteMessage, error) {
exitStatus, err := endpoint.overlaydb.GetExitStatus(ctx, nodeID)
if err != nil {
return false, Error.Wrap(err)
return nil, Error.Wrap(err)
}
if nodeInfo.Disqualified != nil {
// update graceful exit status to be failed
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: nodeID,
ExitFinishedAt: time.Now().UTC(),
ExitSuccess: false,
}
_, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest)
if err != nil {
return true, Error.Wrap(err)
}
// remove remaining items from the queue
err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID)
if err != nil {
return true, Error.Wrap(err)
}
return true, nil
if exitStatus.ExitFinishedAt != nil {
// TODO maybe we should store the reason in the DB so we know how it originally failed.
return endpoint.getFinishedMessage(ctx, nodeID, *exitStatus.ExitFinishedAt, exitStatus.ExitSuccess, -1)
}
return false, nil
if exitStatus.ExitInitiatedAt == nil {
request := &overlay.ExitStatusRequest{NodeID: nodeID, ExitInitiatedAt: time.Now().UTC()}
node, err := endpoint.overlaydb.UpdateExitStatus(ctx, request)
if err != nil {
return nil, Error.Wrap(err)
}
err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 0)
if err != nil {
return nil, Error.Wrap(err)
}
// graceful exit initiation metrics
age := time.Now().UTC().Sub(node.CreatedAt.UTC())
mon.FloatVal("graceful_exit_init_node_age_seconds").Observe(age.Seconds()) //locked
mon.IntVal("graceful_exit_init_node_audit_success_count").Observe(node.Reputation.AuditSuccessCount) //locked
mon.IntVal("graceful_exit_init_node_audit_total_count").Observe(node.Reputation.AuditCount) //locked
mon.IntVal("graceful_exit_init_node_piece_count").Observe(node.PieceCount) //locked
return &pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}, nil
}
if exitStatus.ExitLoopCompletedAt == nil {
return &pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}, nil
}
return nil, nil
}
func (endpoint *Endpoint) generateExitStatusRequest(ctx context.Context, nodeID storj.NodeID) (*overlay.ExitStatusRequest, pb.ExitFailed_Reason, error) {
var exitFailedReason pb.ExitFailed_Reason = -1
progress, err := endpoint.db.GetProgress(ctx, nodeID)
if err != nil {
return nil, exitFailedReason, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
mon.IntVal("graceful_exit_final_pieces_failed").Observe(progress.PiecesFailed) //locked
mon.IntVal("graceful_exit_final_pieces_succeess").Observe(progress.PiecesTransferred) //locked
mon.IntVal("graceful_exit_final_bytes_transferred").Observe(progress.BytesTransferred) //locked
processed := progress.PiecesFailed + progress.PiecesTransferred
if processed > 0 {
mon.IntVal("graceful_exit_successful_pieces_transfer_ratio").Observe(progress.PiecesTransferred / processed) //locked
}
exitStatusRequest := &overlay.ExitStatusRequest{
NodeID: progress.NodeID,
ExitFinishedAt: time.Now().UTC(),
}
// check node's exiting progress to see if it has failed passed max failure threshold
if processed > 0 && float64(progress.PiecesFailed)/float64(processed)*100 >= float64(endpoint.config.OverallMaxFailuresPercentage) {
exitStatusRequest.ExitSuccess = false
exitFailedReason = pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED
} else {
exitStatusRequest.ExitSuccess = true
}
if exitStatusRequest.ExitSuccess {
mon.Meter("graceful_exit_success").Mark(1) //locked
} else {
mon.Meter("graceful_exit_fail_max_failures_percentage").Mark(1) //locked
}
return exitStatusRequest, exitFailedReason, nil
}
func (endpoint *Endpoint) calculatePieceSize(ctx context.Context, pointer *pb.Pointer, incomplete *TransferQueueItem, nodePiece *pb.RemotePiece) (int64, error) {
nodeID := incomplete.NodeID
// calculate piece size
redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy())
if err != nil {
return 0, Error.Wrap(err)
}
pieces := pointer.GetRemote().GetRemotePieces()
if len(pieces) > redundancy.OptimalThreshold() {
endpoint.log.Debug("pointer has more pieces than required. removing node from pointer.", zap.Stringer("node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
return 0, ErrAboveOptimalThreshold.New("")
}
return eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy), nil
}
func (endpoint *Endpoint) getValidPointer(ctx context.Context, path string, pieceNum int32, originalRootPieceID storj.PieceID) (*pb.Pointer, error) {
@ -1000,3 +957,23 @@ func (endpoint *Endpoint) getValidPointer(ctx context.Context, path string, piec
}
return pointer, nil
}
func (endpoint *Endpoint) getNodePiece(ctx context.Context, pointer *pb.Pointer, incomplete *TransferQueueItem) (*pb.RemotePiece, error) {
remote := pointer.GetRemote()
nodeID := incomplete.NodeID
pieces := remote.GetRemotePieces()
var nodePiece *pb.RemotePiece
for _, piece := range pieces {
if piece.NodeId == nodeID && piece.PieceNum == incomplete.PieceNum {
nodePiece = piece
}
}
if nodePiece == nil {
endpoint.log.Debug("piece no longer held by node", zap.Stringer("node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum))
return nil, Error.New("piece no longer held by node")
}
return nodePiece, nil
}

View File

@ -0,0 +1,87 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"bytes"
"context"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/signing"
)
func (endpoint *Endpoint) validatePendingTransfer(ctx context.Context, transfer *pendingTransfer) error {
if transfer.satelliteMessage == nil {
return Error.New("Satellite message cannot be nil")
}
if transfer.satelliteMessage.GetTransferPiece() == nil {
return Error.New("Satellite message transfer piece cannot be nil")
}
if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit() == nil {
return Error.New("Addressed order limit on transfer piece cannot be nil")
}
if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil {
return Error.New("Addressed order limit on transfer piece cannot be nil")
}
if transfer.path == nil {
return Error.New("Transfer path cannot be nil")
}
if transfer.originalPointer == nil || transfer.originalPointer.GetRemote() == nil {
return Error.New("could not get remote pointer from transfer item")
}
return nil
}
func (endpoint *Endpoint) verifyPieceTransferred(ctx context.Context, message *pb.StorageNodeMessage_Succeeded, transfer *pendingTransfer, receivingNodePeerID *identity.PeerIdentity) error {
originalOrderLimit := message.Succeeded.GetOriginalOrderLimit()
if originalOrderLimit == nil {
return ErrInvalidArgument.New("Original order limit cannot be nil")
}
originalPieceHash := message.Succeeded.GetOriginalPieceHash()
if originalPieceHash == nil {
return ErrInvalidArgument.New("Original piece hash cannot be nil")
}
replacementPieceHash := message.Succeeded.GetReplacementPieceHash()
if replacementPieceHash == nil {
return ErrInvalidArgument.New("Replacement piece hash cannot be nil")
}
// verify that the original piece hash and replacement piece hash match
if !bytes.Equal(originalPieceHash.Hash, replacementPieceHash.Hash) {
return ErrInvalidArgument.New("Piece hashes for transferred piece don't match")
}
// verify that the satellite signed the original order limit
err := signing.VerifyOrderLimitSignature(ctx, endpoint.signer, originalOrderLimit)
if err != nil {
return ErrInvalidArgument.Wrap(err)
}
// verify that the public key on the order limit signed the original piece hash
err = signing.VerifyUplinkPieceHashSignature(ctx, originalOrderLimit.UplinkPublicKey, originalPieceHash)
if err != nil {
return ErrInvalidArgument.Wrap(err)
}
if originalOrderLimit.PieceId != message.Succeeded.OriginalPieceId {
return ErrInvalidArgument.New("Invalid original piece ID")
}
receivingNodeID := transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId
calculatedNewPieceID := transfer.originalPointer.GetRemote().RootPieceId.Derive(receivingNodeID, transfer.pieceNum)
if calculatedNewPieceID != replacementPieceHash.PieceId {
return ErrInvalidArgument.New("Invalid replacement piece ID")
}
signee := signing.SigneeFromPeerIdentity(receivingNodePeerID)
// verify that the new node signed the replacement piece hash
err = signing.VerifyPieceHashSignature(ctx, signee, replacementPieceHash)
if err != nil {
return ErrInvalidArgument.Wrap(err)
}
return nil
}