From 69b0ae02bfb8ef5fe1aecb80ad8cbe0af98fa415 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao Date: Fri, 8 Nov 2019 13:57:51 -0500 Subject: [PATCH] satellite/gracefulexit: separate functional code in endpoint (#3476) --- monkit.lock | 13 + satellite/gracefulexit/common.go | 3 + satellite/gracefulexit/endpoint.go | 475 +++++++++++++-------------- satellite/gracefulexit/validation.go | 87 +++++ 4 files changed, 329 insertions(+), 249 deletions(-) create mode 100644 satellite/gracefulexit/validation.go diff --git a/monkit.lock b/monkit.lock index a3dcca8ac..1200d4bf0 100644 --- a/monkit.lock +++ b/monkit.lock @@ -5,6 +5,19 @@ storj.io/storj/satellite/accounting/tally."total.objects" IntVal storj.io/storj/satellite/accounting/tally."total.remote_bytes" IntVal storj.io/storj/satellite/accounting/tally."total.remote_segments" IntVal storj.io/storj/satellite/accounting/tally."total.segments" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_fail_max_failures_percentage" Meter +storj.io/storj/satellite/gracefulexit."graceful_exit_fail_validation" Meter +storj.io/storj/satellite/gracefulexit."graceful_exit_final_bytes_transferred" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_final_pieces_failed" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_final_pieces_succeess" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_age_seconds" FloatVal +storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_audit_success_count" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_audit_total_count" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_init_node_piece_count" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_success" Meter +storj.io/storj/satellite/gracefulexit."graceful_exit_successful_pieces_transfer_ratio" IntVal +storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter +storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter storj.io/storj/satellite/repair/checker."checker_segment_age" IntVal storj.io/storj/satellite/repair/checker."checker_segment_healthy_count" IntVal storj.io/storj/satellite/repair/checker."checker_segment_time_until_irreparable" IntVal diff --git a/satellite/gracefulexit/common.go b/satellite/gracefulexit/common.go index f9ceff81c..aa897524a 100644 --- a/satellite/gracefulexit/common.go +++ b/satellite/gracefulexit/common.go @@ -17,6 +17,9 @@ var ( // ErrNodeNotFound is returned if a graceful exit entry for a node does not exist in database ErrNodeNotFound = errs.Class("graceful exit node not found") + // ErrAboveOptimalThreshold is returned if a graceful exit entry for a node has more pieces than required. + ErrAboveOptimalThreshold = errs.Class("pointer has more pieces than required") + mon = monkit.Package() ) diff --git a/satellite/gracefulexit/endpoint.go b/satellite/gracefulexit/endpoint.go index 27af9f549..f6415f8a5 100644 --- a/satellite/gracefulexit/endpoint.go +++ b/satellite/gracefulexit/endpoint.go @@ -4,7 +4,6 @@ package gracefulexit import ( - "bytes" "context" "io" "sync" @@ -191,7 +190,6 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { if err != nil { return rpcstatus.Error(rpcstatus.Unauthenticated, Error.Wrap(err).Error()) } - // TODO should we error if the node is DQ'd? nodeID := peer.ID endpoint.log.Debug("graceful exit process", zap.Stringer("Node ID", nodeID)) @@ -212,72 +210,20 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { return rpcstatus.Error(rpcstatus.PermissionDenied, "Disqualified nodes cannot graceful exit") } - eofHandler := func(err error) error { - if err == io.EOF { - endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("Node ID", nodeID)) - return nil - } - if err != nil { - return rpcstatus.Error(rpcstatus.Unknown, Error.Wrap(err).Error()) - } - return nil - } - - exitStatus, err := endpoint.overlaydb.GetExitStatus(ctx, nodeID) + msg, err := endpoint.checkExitStatus(ctx, nodeID) if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) + return rpcstatus.Error(rpcstatus.Internal, err.Error()) } - if exitStatus.ExitFinishedAt != nil { - // TODO maybe we should store the reason in the DB so we know how it originally failed. - finishedMsg, err := endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, *exitStatus.ExitFinishedAt, exitStatus.ExitSuccess, -1) + if msg != nil { + err = stream.Send(msg) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } - err = stream.Send(finishedMsg) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) - } return nil } - if exitStatus.ExitInitiatedAt == nil { - request := &overlay.ExitStatusRequest{NodeID: nodeID, ExitInitiatedAt: time.Now().UTC()} - node, err := endpoint.overlaydb.UpdateExitStatus(ctx, request) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) - } - err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 0) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) - } - - // graceful exit initiation metrics - age := time.Now().UTC().Sub(node.CreatedAt.UTC()) - mon.FloatVal("graceful_exit_init_node_age_seconds").Observe(age.Seconds()) - mon.IntVal("graceful_exit_init_node_audit_success_count").Observe(node.Reputation.AuditSuccessCount) - mon.IntVal("graceful_exit_init_node_audit_total_count").Observe(node.Reputation.AuditCount) - mon.IntVal("graceful_exit_init_node_piece_count").Observe(node.PieceCount) - - err = stream.Send(&pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) - } - return nil - } - - if exitStatus.ExitLoopCompletedAt == nil { - err = stream.Send(&pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) - } - return nil - } - - // maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests - pending := newPendingMap() - // these are used to synchronize the "incomplete transfer loop" with the main thread (storagenode receive loop) morePiecesFlag := true loopRunningFlag := true @@ -291,6 +237,8 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) } + // maps pieceIDs to pendingTransfers to keep track of ongoing piece transfer requests + pending := newPendingMap() var group errgroup.Group group.Go(func() error { incompleteLoop := sync2.NewCycle(endpoint.interval) @@ -361,58 +309,13 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { return rpcstatus.Error(rpcstatus.PermissionDenied, "Disqualified nodes cannot graceful exit") } - exitStatusRequest := &overlay.ExitStatusRequest{ - NodeID: nodeID, - ExitFinishedAt: time.Now().UTC(), - } - - progress, err := endpoint.db.GetProgress(ctx, nodeID) + // update exit status + exitStatusRequest, exitFailedReason, err := endpoint.generateExitStatusRequest(ctx, nodeID) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } - var transferMsg *pb.SatelliteMessage - mon.IntVal("graceful_exit_final_pieces_failed").Observe(progress.PiecesFailed) - mon.IntVal("graceful_exit_final_pieces_succeess").Observe(progress.PiecesTransferred) - mon.IntVal("graceful_exit_final_bytes_transferred").Observe(progress.BytesTransferred) - - processed := progress.PiecesFailed + progress.PiecesTransferred - if processed > 0 { - mon.IntVal("graceful_exit_successful_pieces_transfer_ratio").Observe(progress.PiecesTransferred / processed) - } - - // check node's exiting progress to see if it has failed passed max failure threshold - if processed > 0 && float64(progress.PiecesFailed)/float64(processed)*100 >= float64(endpoint.config.OverallMaxFailuresPercentage) { - - exitStatusRequest.ExitSuccess = false - transferMsg, err = endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - } else { - exitStatusRequest.ExitSuccess = true - transferMsg, err = endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, -1) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - } - if exitStatusRequest.ExitSuccess { - mon.Meter("graceful_exit_success").Mark(1) - } else { - mon.Meter("graceful_exit_fail_max_failures_percentage").Mark(1) - } - _, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - - err = stream.Send(transferMsg) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) - } - - // remove remaining items from the queue after notifying nodes about their exit status - err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID) + err = endpoint.handleFinished(ctx, stream, exitStatusRequest, exitFailedReason) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } @@ -444,6 +347,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { }() timer := time.NewTimer(endpoint.recvTimeout) + select { case <-ctx.Done(): return rpcstatus.Error(rpcstatus.Internal, Error.New("context canceled while waiting to receive message from storagenode").Error()) @@ -452,7 +356,11 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { case <-done: } if recvErr != nil { - return eofHandler(recvErr) + if errs.Is(recvErr, io.EOF) { + endpoint.log.Debug("received EOF when trying to receive messages from storage node", zap.Stringer("node ID", nodeID)) + return nil + } + return rpcstatus.Error(rpcstatus.Unknown, Error.Wrap(recvErr).Error()) } switch m := request.GetMessage().(type) { @@ -467,40 +375,23 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) { } if ErrInvalidArgument.Has(err) { // immediately fail and complete graceful exit for nodes that fail satellite validation + err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1) + if err != nil { + return rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + mon.Meter("graceful_exit_fail_validation").Mark(1) //locked + exitStatusRequest := &overlay.ExitStatusRequest{ NodeID: nodeID, ExitFinishedAt: time.Now().UTC(), ExitSuccess: false, } - finishedMsg, err := endpoint.getFinishedMessage(ctx, endpoint.signer, nodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, pb.ExitFailed_VERIFICATION_FAILED) + err := endpoint.handleFinished(ctx, stream, exitStatusRequest, pb.ExitFailed_VERIFICATION_FAILED) if err != nil { return rpcstatus.Error(rpcstatus.Internal, err.Error()) } - - err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 1) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - - mon.Meter("graceful_exit_fail_validation").Mark(1) - - _, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - - err = stream.Send(finishedMsg) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error()) - } - - // remove remaining items from the queue after notifying nodes about their exit status - err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID) - if err != nil { - return rpcstatus.Error(rpcstatus.Internal, err.Error()) - } - break } return rpcstatus.Error(rpcstatus.Internal, err.Error()) @@ -535,38 +426,21 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS if err != nil { return Error.Wrap(err) } - } - remote := pointer.GetRemote() - pieces := remote.GetRemotePieces() - - var nodePiece *pb.RemotePiece - excludedNodeIDs := make([]storj.NodeID, len(pieces)) - for i, piece := range pieces { - if piece.NodeId == nodeID && piece.PieceNum == incomplete.PieceNum { - nodePiece = piece - } - excludedNodeIDs[i] = piece.NodeId - } - - if nodePiece == nil { - endpoint.log.Debug("piece no longer held by node", zap.Stringer("Node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) - - err = endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Path, incomplete.PieceNum) - if err != nil { - return Error.Wrap(err) - } return nil } - redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy()) + nodePiece, err := endpoint.getNodePiece(ctx, pointer, incomplete) if err != nil { + deleteErr := endpoint.db.DeleteTransferQueueItem(ctx, nodeID, incomplete.Path, incomplete.PieceNum) + if deleteErr != nil { + return Error.Wrap(deleteErr) + } return Error.Wrap(err) } - if len(remote.GetRemotePieces()) > redundancy.OptimalThreshold() { - endpoint.log.Debug("pointer has more pieces than required. removing node from pointer.", zap.Stringer("Node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) - + pieceSize, err := endpoint.calculatePieceSize(ctx, pointer, incomplete, nodePiece) + if ErrAboveOptimalThreshold.Has(err) { _, err = endpoint.metainfo.UpdatePieces(ctx, string(incomplete.Path), pointer, nil, []*pb.RemotePiece{nodePiece}) if err != nil { return Error.Wrap(err) @@ -576,20 +450,29 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS if err != nil { return Error.Wrap(err) } - return nil } + if err != nil { + return Error.Wrap(err) + } - pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy) + // populate excluded node IDs + remote := pointer.GetRemote() + pieces := remote.RemotePieces + excludedNodeIDs := make([]storj.NodeID, len(pieces)) + for i, piece := range pieces { + excludedNodeIDs[i] = piece.NodeId + } - request := overlay.FindStorageNodesRequest{ + // get replacement node + request := &overlay.FindStorageNodesRequest{ RequestedCount: 1, FreeBandwidth: pieceSize, FreeDisk: pieceSize, ExcludedNodes: excludedNodeIDs, } - newNodes, err := endpoint.overlay.FindStorageNodes(ctx, request) + newNodes, err := endpoint.overlay.FindStorageNodes(ctx, *request) if err != nil { return Error.Wrap(err) } @@ -597,6 +480,7 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS if len(newNodes) == 0 { return Error.New("could not find a node to receive piece transfer: node ID %v, path %v, piece num %v", nodeID, incomplete.Path, incomplete.PieceNum) } + newNode := newNodes[0] endpoint.log.Debug("found new node for piece transfer", zap.Stringer("original node ID", nodeID), zap.Stringer("replacement node ID", newNode.Id), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) @@ -627,6 +511,8 @@ func (endpoint *Endpoint) processIncomplete(ctx context.Context, stream processS if err != nil { return Error.Wrap(err) } + + // update pending queue with the transfer item pending.put(pieceID, &pendingTransfer{ path: incomplete.Path, pieceSize: pieceSize, @@ -649,78 +535,22 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr return Error.New("Could not find transfer item in pending queue") } - if transfer.satelliteMessage == nil { - return Error.New("Satellite message cannot be nil") - } - if transfer.satelliteMessage.GetTransferPiece() == nil { - return Error.New("Satellite message transfer piece cannot be nil") - } - if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit() == nil { - return Error.New("Addressed order limit on transfer piece cannot be nil") - } - if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil { - return Error.New("Addressed order limit on transfer piece cannot be nil") - } - if transfer.path == nil { - return Error.New("Transfer path cannot be nil") - } - - originalOrderLimit := message.Succeeded.GetOriginalOrderLimit() - if originalOrderLimit == nil { - return ErrInvalidArgument.New("Original order limit cannot be nil") - } - originalPieceHash := message.Succeeded.GetOriginalPieceHash() - if originalPieceHash == nil { - return ErrInvalidArgument.New("Original piece hash cannot be nil") - } - replacementPieceHash := message.Succeeded.GetReplacementPieceHash() - if replacementPieceHash == nil { - return ErrInvalidArgument.New("Replacement piece hash cannot be nil") - } - - // verify that the original piece hash and replacement piece hash match - if !bytes.Equal(originalPieceHash.Hash, replacementPieceHash.Hash) { - return ErrInvalidArgument.New("Piece hashes for transferred piece don't match") - } - - // verify that the satellite signed the original order limit - err = endpoint.orders.VerifyOrderLimitSignature(ctx, originalOrderLimit) + err = endpoint.validatePendingTransfer(ctx, transfer) if err != nil { - return ErrInvalidArgument.Wrap(err) - } - - // verify that the public key on the order limit signed the original piece hash - err = signing.VerifyUplinkPieceHashSignature(ctx, originalOrderLimit.UplinkPublicKey, originalPieceHash) - if err != nil { - return ErrInvalidArgument.Wrap(err) - } - - if originalOrderLimit.PieceId != message.Succeeded.OriginalPieceId { - return ErrInvalidArgument.New("Invalid original piece ID") + return Error.Wrap(err) } receivingNodeID := transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId - if transfer.originalPointer == nil || transfer.originalPointer.GetRemote() == nil { - return Error.New("could not get remote pointer from transfer item") - } - calculatedNewPieceID := transfer.originalPointer.GetRemote().RootPieceId.Derive(receivingNodeID, transfer.pieceNum) - if calculatedNewPieceID != replacementPieceHash.PieceId { - return ErrInvalidArgument.New("Invalid replacement piece ID") - } - // get peerID and signee for new storage node peerID, err := endpoint.peerIdentities.Get(ctx, receivingNodeID) if err != nil { return Error.Wrap(err) } - signee := signing.SigneeFromPeerIdentity(peerID) - - // verify that the new node signed the replacement piece hash - err = signing.VerifyPieceHashSignature(ctx, signee, replacementPieceHash) + // verify transferred piece + err = endpoint.verifyPieceTransferred(ctx, message, transfer, peerID) if err != nil { - return ErrInvalidArgument.Wrap(err) + return Error.Wrap(err) } - transferQueueItem, err := endpoint.db.GetTransferQueueItem(ctx, exitingNodeID, transfer.path, transfer.pieceNum) if err != nil { return Error.Wrap(err) @@ -764,14 +594,15 @@ func (endpoint *Endpoint) handleSucceeded(ctx context.Context, stream processStr return Error.Wrap(err) } - mon.Meter("graceful_exit_transfer_piece_success").Mark(1) + mon.Meter("graceful_exit_transfer_piece_success").Mark(1) //locked return nil } func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, nodeID storj.NodeID, message *pb.StorageNodeMessage_Failed) (err error) { defer mon.Task()(&ctx)(&err) endpoint.log.Warn("transfer failed", zap.Stringer("Piece ID", message.Failed.OriginalPieceId), zap.Stringer("transfer error", message.Failed.GetError())) - mon.Meter("graceful_exit_transfer_piece_fail").Mark(1) + mon.Meter("graceful_exit_transfer_piece_fail").Mark(1) //locked + pieceID := message.Failed.OriginalPieceId transfer, ok := pending.get(pieceID) if !ok { @@ -870,7 +701,64 @@ func (endpoint *Endpoint) handleFailed(ctx context.Context, pending *pendingMap, return nil } -func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, signer signing.Signer, nodeID storj.NodeID, finishedAt time.Time, success bool, reason pb.ExitFailed_Reason) (message *pb.SatelliteMessage, err error) { +func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) { + // check if node is disqualified + nodeInfo, err := endpoint.overlay.Get(ctx, nodeID) + if err != nil { + return false, Error.Wrap(err) + } + + if nodeInfo.Disqualified != nil { + // update graceful exit status to be failed + exitStatusRequest := &overlay.ExitStatusRequest{ + NodeID: nodeID, + ExitFinishedAt: time.Now().UTC(), + ExitSuccess: false, + } + + _, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest) + if err != nil { + return true, Error.Wrap(err) + } + + // remove remaining items from the queue + err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID) + if err != nil { + return true, Error.Wrap(err) + } + + return true, nil + } + + return false, nil +} + +func (endpoint *Endpoint) handleFinished(ctx context.Context, stream processStream, exitStatusRequest *overlay.ExitStatusRequest, failedReason pb.ExitFailed_Reason) error { + finishedMsg, err := endpoint.getFinishedMessage(ctx, exitStatusRequest.NodeID, exitStatusRequest.ExitFinishedAt, exitStatusRequest.ExitSuccess, failedReason) + if err != nil { + return Error.Wrap(err) + } + + _, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest) + if err != nil { + return Error.Wrap(err) + } + + err = stream.Send(finishedMsg) + if err != nil { + return Error.Wrap(err) + } + + // remove remaining items from the queue after notifying nodes about their exit status + err = endpoint.db.DeleteTransferQueueItems(ctx, exitStatusRequest.NodeID) + if err != nil { + return Error.Wrap(err) + } + + return nil +} + +func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, nodeID storj.NodeID, finishedAt time.Time, success bool, reason pb.ExitFailed_Reason) (message *pb.SatelliteMessage, err error) { if success { unsigned := &pb.ExitCompleted{ SatelliteId: endpoint.signer.ID(), @@ -950,36 +838,105 @@ func (endpoint *Endpoint) updatePointer(ctx context.Context, originalPointer *pb return nil } -func (endpoint *Endpoint) handleDisqualifiedNode(ctx context.Context, nodeID storj.NodeID) (isDisqualified bool, err error) { - // check if node is disqualified - nodeInfo, err := endpoint.overlay.Get(ctx, nodeID) +// checkExitStatus returns a satellite message based on a node current graceful exit status +// if a node hasn't started graceful exit, it will initialize the process +// if a node has finished graceful exit, it will return a finished message +// if a node has started graceful exit, but no transfer item is available yet, it will return an not ready message +// otherwise, the returned message will be nil +func (endpoint *Endpoint) checkExitStatus(ctx context.Context, nodeID storj.NodeID) (*pb.SatelliteMessage, error) { + exitStatus, err := endpoint.overlaydb.GetExitStatus(ctx, nodeID) if err != nil { - return false, Error.Wrap(err) + return nil, Error.Wrap(err) } - if nodeInfo.Disqualified != nil { - // update graceful exit status to be failed - exitStatusRequest := &overlay.ExitStatusRequest{ - NodeID: nodeID, - ExitFinishedAt: time.Now().UTC(), - ExitSuccess: false, - } - - _, err = endpoint.overlaydb.UpdateExitStatus(ctx, exitStatusRequest) - if err != nil { - return true, Error.Wrap(err) - } - - // remove remaining items from the queue - err = endpoint.db.DeleteTransferQueueItems(ctx, nodeID) - if err != nil { - return true, Error.Wrap(err) - } - - return true, nil + if exitStatus.ExitFinishedAt != nil { + // TODO maybe we should store the reason in the DB so we know how it originally failed. + return endpoint.getFinishedMessage(ctx, nodeID, *exitStatus.ExitFinishedAt, exitStatus.ExitSuccess, -1) } - return false, nil + if exitStatus.ExitInitiatedAt == nil { + request := &overlay.ExitStatusRequest{NodeID: nodeID, ExitInitiatedAt: time.Now().UTC()} + node, err := endpoint.overlaydb.UpdateExitStatus(ctx, request) + if err != nil { + return nil, Error.Wrap(err) + } + err = endpoint.db.IncrementProgress(ctx, nodeID, 0, 0, 0) + if err != nil { + return nil, Error.Wrap(err) + } + + // graceful exit initiation metrics + age := time.Now().UTC().Sub(node.CreatedAt.UTC()) + mon.FloatVal("graceful_exit_init_node_age_seconds").Observe(age.Seconds()) //locked + mon.IntVal("graceful_exit_init_node_audit_success_count").Observe(node.Reputation.AuditSuccessCount) //locked + mon.IntVal("graceful_exit_init_node_audit_total_count").Observe(node.Reputation.AuditCount) //locked + mon.IntVal("graceful_exit_init_node_piece_count").Observe(node.PieceCount) //locked + + return &pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}, nil + } + + if exitStatus.ExitLoopCompletedAt == nil { + return &pb.SatelliteMessage{Message: &pb.SatelliteMessage_NotReady{NotReady: &pb.NotReady{}}}, nil + } + + return nil, nil +} + +func (endpoint *Endpoint) generateExitStatusRequest(ctx context.Context, nodeID storj.NodeID) (*overlay.ExitStatusRequest, pb.ExitFailed_Reason, error) { + var exitFailedReason pb.ExitFailed_Reason = -1 + progress, err := endpoint.db.GetProgress(ctx, nodeID) + if err != nil { + return nil, exitFailedReason, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + mon.IntVal("graceful_exit_final_pieces_failed").Observe(progress.PiecesFailed) //locked + mon.IntVal("graceful_exit_final_pieces_succeess").Observe(progress.PiecesTransferred) //locked + mon.IntVal("graceful_exit_final_bytes_transferred").Observe(progress.BytesTransferred) //locked + processed := progress.PiecesFailed + progress.PiecesTransferred + + if processed > 0 { + mon.IntVal("graceful_exit_successful_pieces_transfer_ratio").Observe(progress.PiecesTransferred / processed) //locked + } + + exitStatusRequest := &overlay.ExitStatusRequest{ + NodeID: progress.NodeID, + ExitFinishedAt: time.Now().UTC(), + } + // check node's exiting progress to see if it has failed passed max failure threshold + if processed > 0 && float64(progress.PiecesFailed)/float64(processed)*100 >= float64(endpoint.config.OverallMaxFailuresPercentage) { + exitStatusRequest.ExitSuccess = false + exitFailedReason = pb.ExitFailed_OVERALL_FAILURE_PERCENTAGE_EXCEEDED + } else { + exitStatusRequest.ExitSuccess = true + } + + if exitStatusRequest.ExitSuccess { + mon.Meter("graceful_exit_success").Mark(1) //locked + } else { + mon.Meter("graceful_exit_fail_max_failures_percentage").Mark(1) //locked + } + + return exitStatusRequest, exitFailedReason, nil + +} + +func (endpoint *Endpoint) calculatePieceSize(ctx context.Context, pointer *pb.Pointer, incomplete *TransferQueueItem, nodePiece *pb.RemotePiece) (int64, error) { + nodeID := incomplete.NodeID + + // calculate piece size + redundancy, err := eestream.NewRedundancyStrategyFromProto(pointer.GetRemote().GetRedundancy()) + if err != nil { + return 0, Error.Wrap(err) + } + + pieces := pointer.GetRemote().GetRemotePieces() + if len(pieces) > redundancy.OptimalThreshold() { + endpoint.log.Debug("pointer has more pieces than required. removing node from pointer.", zap.Stringer("node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) + + return 0, ErrAboveOptimalThreshold.New("") + } + + return eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy), nil } func (endpoint *Endpoint) getValidPointer(ctx context.Context, path string, pieceNum int32, originalRootPieceID storj.PieceID) (*pb.Pointer, error) { @@ -1000,3 +957,23 @@ func (endpoint *Endpoint) getValidPointer(ctx context.Context, path string, piec } return pointer, nil } + +func (endpoint *Endpoint) getNodePiece(ctx context.Context, pointer *pb.Pointer, incomplete *TransferQueueItem) (*pb.RemotePiece, error) { + remote := pointer.GetRemote() + nodeID := incomplete.NodeID + + pieces := remote.GetRemotePieces() + var nodePiece *pb.RemotePiece + for _, piece := range pieces { + if piece.NodeId == nodeID && piece.PieceNum == incomplete.PieceNum { + nodePiece = piece + } + } + + if nodePiece == nil { + endpoint.log.Debug("piece no longer held by node", zap.Stringer("node ID", nodeID), zap.ByteString("path", incomplete.Path), zap.Int32("piece num", incomplete.PieceNum)) + return nil, Error.New("piece no longer held by node") + } + + return nodePiece, nil +} diff --git a/satellite/gracefulexit/validation.go b/satellite/gracefulexit/validation.go new file mode 100644 index 000000000..217408e7e --- /dev/null +++ b/satellite/gracefulexit/validation.go @@ -0,0 +1,87 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package gracefulexit + +import ( + "bytes" + "context" + + "storj.io/storj/pkg/identity" + "storj.io/storj/pkg/pb" + "storj.io/storj/pkg/signing" +) + +func (endpoint *Endpoint) validatePendingTransfer(ctx context.Context, transfer *pendingTransfer) error { + if transfer.satelliteMessage == nil { + return Error.New("Satellite message cannot be nil") + } + if transfer.satelliteMessage.GetTransferPiece() == nil { + return Error.New("Satellite message transfer piece cannot be nil") + } + if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit() == nil { + return Error.New("Addressed order limit on transfer piece cannot be nil") + } + if transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit() == nil { + return Error.New("Addressed order limit on transfer piece cannot be nil") + } + if transfer.path == nil { + return Error.New("Transfer path cannot be nil") + } + if transfer.originalPointer == nil || transfer.originalPointer.GetRemote() == nil { + return Error.New("could not get remote pointer from transfer item") + } + + return nil +} + +func (endpoint *Endpoint) verifyPieceTransferred(ctx context.Context, message *pb.StorageNodeMessage_Succeeded, transfer *pendingTransfer, receivingNodePeerID *identity.PeerIdentity) error { + originalOrderLimit := message.Succeeded.GetOriginalOrderLimit() + if originalOrderLimit == nil { + return ErrInvalidArgument.New("Original order limit cannot be nil") + } + originalPieceHash := message.Succeeded.GetOriginalPieceHash() + if originalPieceHash == nil { + return ErrInvalidArgument.New("Original piece hash cannot be nil") + } + replacementPieceHash := message.Succeeded.GetReplacementPieceHash() + if replacementPieceHash == nil { + return ErrInvalidArgument.New("Replacement piece hash cannot be nil") + } + + // verify that the original piece hash and replacement piece hash match + if !bytes.Equal(originalPieceHash.Hash, replacementPieceHash.Hash) { + return ErrInvalidArgument.New("Piece hashes for transferred piece don't match") + } + + // verify that the satellite signed the original order limit + err := signing.VerifyOrderLimitSignature(ctx, endpoint.signer, originalOrderLimit) + if err != nil { + return ErrInvalidArgument.Wrap(err) + } + + // verify that the public key on the order limit signed the original piece hash + err = signing.VerifyUplinkPieceHashSignature(ctx, originalOrderLimit.UplinkPublicKey, originalPieceHash) + if err != nil { + return ErrInvalidArgument.Wrap(err) + } + + if originalOrderLimit.PieceId != message.Succeeded.OriginalPieceId { + return ErrInvalidArgument.New("Invalid original piece ID") + } + + receivingNodeID := transfer.satelliteMessage.GetTransferPiece().GetAddressedOrderLimit().GetLimit().StorageNodeId + calculatedNewPieceID := transfer.originalPointer.GetRemote().RootPieceId.Derive(receivingNodeID, transfer.pieceNum) + if calculatedNewPieceID != replacementPieceHash.PieceId { + return ErrInvalidArgument.New("Invalid replacement piece ID") + } + + signee := signing.SigneeFromPeerIdentity(receivingNodePeerID) + + // verify that the new node signed the replacement piece hash + err = signing.VerifyPieceHashSignature(ctx, signee, replacementPieceHash) + if err != nil { + return ErrInvalidArgument.Wrap(err) + } + return nil +}