storj/storagenode/gracefulexit/worker.go
Yingrong Zhao b7b19289d1 bump storj.io/common to latest
Change-Id: I16e337660ce8e1ef332cc842dbf4cfa067b9b98b
2020-03-25 09:08:40 -04:00

367 lines
12 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"bytes"
"context"
"io"
"os"
"time"
"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/satellites"
"storj.io/uplink/private/ecclient"
)
// Worker is responsible for completing the graceful exit for a given satellite.
type Worker struct {
log *zap.Logger
store *pieces.Store
satelliteDB satellites.DB
dialer rpc.Dialer
limiter *sync2.Limiter
satelliteID storj.NodeID
satelliteAddr string
ecclient ecclient.Client
minBytesPerSecond memory.Size
minDownloadTimeout time.Duration
}
// NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, store *pieces.Store, satelliteDB satellites.DB, dialer rpc.Dialer, satelliteID storj.NodeID, satelliteAddr string, config Config) *Worker {
return &Worker{
log: log,
store: store,
satelliteDB: satelliteDB,
dialer: dialer,
limiter: sync2.NewLimiter(config.NumConcurrentTransfers),
satelliteID: satelliteID,
satelliteAddr: satelliteAddr,
ecclient: ecclient.NewClient(log, dialer, 0),
minBytesPerSecond: config.MinBytesPerSecond,
minDownloadTimeout: config.MinDownloadTimeout,
}
}
// Run calls the satellite endpoint, transfers pieces, validates, and responds with success or failure.
// It also marks the satellite finished once all the pieces have been transferred
func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
defer mon.Task()(&ctx)(&err)
defer done()
worker.log.Debug("running worker")
conn, err := worker.dialer.DialAddressID(ctx, worker.satelliteAddr, worker.satelliteID)
if err != nil {
return errs.Wrap(err)
}
defer func() {
err = errs.Combine(err, conn.Close())
}()
client := pb.NewDRPCSatelliteGracefulExitClient(conn)
c, err := client.Process(ctx)
if err != nil {
return errs.Wrap(err)
}
for {
response, err := c.Recv()
if errs.Is(err, io.EOF) {
// Done
return nil
}
if errs2.IsRPC(err, rpcstatus.FailedPrecondition) {
// delete the entry from satellite table and inform graceful exit has failed to start
deleteErr := worker.satelliteDB.CancelGracefulExit(ctx, worker.satelliteID)
if deleteErr != nil {
// TODO: what to do now?
return errs.Combine(deleteErr, err)
}
return errs.Wrap(err)
}
if err != nil {
// TODO what happened
return errs.Wrap(err)
}
switch msg := response.GetMessage().(type) {
case *pb.SatelliteMessage_NotReady:
return nil
case *pb.SatelliteMessage_TransferPiece:
transferPieceMsg := msg.TransferPiece
worker.limiter.Go(ctx, func() {
err = worker.transferPiece(ctx, transferPieceMsg, c)
if err != nil {
worker.log.Error("failed to transfer piece.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Error(errs.Wrap(err)))
}
})
case *pb.SatelliteMessage_DeletePiece:
deletePieceMsg := msg.DeletePiece
worker.limiter.Go(ctx, func() {
pieceID := deletePieceMsg.OriginalPieceId
err := worker.deleteOnePiece(ctx, pieceID)
if err != nil {
worker.log.Error("failed to delete piece.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
}
})
case *pb.SatelliteMessage_ExitFailed:
worker.log.Error("graceful exit failed.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("reason", msg.ExitFailed.Reason))
exitFailedBytes, err := proto.Marshal(msg.ExitFailed)
if err != nil {
worker.log.Error("failed to marshal exit failed message.")
}
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitFailed, exitFailedBytes)
return errs.Wrap(err)
case *pb.SatelliteMessage_ExitCompleted:
worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteID))
exitCompletedBytes, err := proto.Marshal(msg.ExitCompleted)
if err != nil {
worker.log.Error("failed to marshal exit completed message.")
}
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitSucceeded, exitCompletedBytes)
if err != nil {
return errs.Wrap(err)
}
// delete all remaining pieces
err = worker.deleteAllPieces(ctx)
return errs.Wrap(err)
default:
// TODO handle err
worker.log.Error("unknown graceful exit message.", zap.Stringer("Satellite ID", worker.satelliteID))
}
}
}
type gracefulExitStream interface {
Context() context.Context
Send(*pb.StorageNodeMessage) error
Recv() (*pb.SatelliteMessage, error)
}
func (worker *Worker) transferPiece(ctx context.Context, transferPiece *pb.TransferPiece, c gracefulExitStream) error {
pieceID := transferPiece.OriginalPieceId
reader, err := worker.store.Reader(ctx, worker.satelliteID, pieceID)
if err != nil {
transferErr := pb.TransferFailed_UNKNOWN
if errs.Is(err, os.ErrNotExist) {
transferErr = pb.TransferFailed_NOT_FOUND
}
worker.log.Error("failed to get piece reader.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, transferErr, pieceID, c.Send)
return err
}
addrLimit := transferPiece.GetAddressedOrderLimit()
pk := transferPiece.PrivateKey
originalHash, originalOrderLimit, err := worker.store.GetHashAndLimit(ctx, worker.satelliteID, pieceID, reader)
if err != nil {
worker.log.Error("failed to get piece hash and order limit.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_UNKNOWN, pieceID, c.Send)
return err
}
if worker.minBytesPerSecond == 0 {
// set minBytesPerSecond to default 128B if set to 0
worker.minBytesPerSecond = 128 * memory.B
}
maxTransferTime := time.Duration(int64(time.Second) * originalHash.PieceSize / worker.minBytesPerSecond.Int64())
if maxTransferTime < worker.minDownloadTimeout {
maxTransferTime = worker.minDownloadTimeout
}
putCtx, cancel := context.WithTimeout(ctx, maxTransferTime)
defer cancel()
pieceHash, peerID, err := worker.ecclient.PutPiece(putCtx, ctx, addrLimit, pk, reader)
if err != nil {
if piecestore.ErrVerifyUntrusted.Has(err) {
worker.log.Error("failed hash verification.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
} else {
worker.log.Error("failed to put piece.",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
// TODO look at error type to decide on the transfer error
worker.handleFailure(ctx, pb.TransferFailed_STORAGE_NODE_UNAVAILABLE, pieceID, c.Send)
}
return err
}
if !bytes.Equal(originalHash.Hash, pieceHash.Hash) {
worker.log.Error("piece hash from new storagenode does not match",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return Error.New("piece hash from new storagenode does not match")
}
if pieceHash.PieceId != addrLimit.Limit.PieceId {
worker.log.Error("piece id from new storagenode does not match order limit",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return Error.New("piece id from new storagenode does not match order limit")
}
signee := signing.SigneeFromPeerIdentity(peerID)
err = signing.VerifyPieceHashSignature(ctx, signee, pieceHash)
if err != nil {
worker.log.Error("invalid piece hash signature from new storagenode",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(errs.Wrap(err)))
worker.handleFailure(ctx, pb.TransferFailed_HASH_VERIFICATION, pieceID, c.Send)
return err
}
success := &pb.StorageNodeMessage{
Message: &pb.StorageNodeMessage_Succeeded{
Succeeded: &pb.TransferSucceeded{
OriginalPieceId: transferPiece.OriginalPieceId,
OriginalPieceHash: &originalHash,
OriginalOrderLimit: &originalOrderLimit,
ReplacementPieceHash: pieceHash,
},
},
}
worker.log.Info("piece transferred to new storagenode",
zap.Stringer("Storagenode ID", addrLimit.Limit.StorageNodeId),
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID))
return c.Send(success)
}
// deleteOnePiece deletes one piece stored for a satellite
func (worker *Worker) deleteOnePiece(ctx context.Context, pieceID storj.PieceID) error {
piece, err := worker.store.Reader(ctx, worker.satelliteID, pieceID)
if err != nil {
if !errs2.IsCanceled(err) {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID), zap.Error(err))
}
return err
}
err = worker.deletePiece(ctx, pieceID)
if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err))
return err
}
// update graceful exit progress
size := piece.Size()
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteID, size)
}
// deletePiece deletes one piece stored for a satellite, without updating satellite Graceful Exit status
func (worker *Worker) deletePiece(ctx context.Context, pieceID storj.PieceID) error {
err := worker.store.Delete(ctx, worker.satelliteID, pieceID)
if err != nil {
worker.log.Debug("failed to delete a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID),
zap.Error(err))
delErr := worker.store.DeleteFailed(ctx, pieces.ExpiredInfo{
SatelliteID: worker.satelliteID,
PieceID: pieceID,
InPieceInfo: true,
}, time.Now().UTC())
if delErr != nil {
worker.log.Debug("failed to mark a deletion failure for a piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID), zap.Error(err))
}
return errs.Combine(err, delErr)
}
worker.log.Debug("delete piece",
zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", pieceID))
return err
}
// deleteAllPieces deletes pieces stored for a satellite
func (worker *Worker) deleteAllPieces(ctx context.Context) error {
var totalDeleted int64
err := worker.store.WalkSatellitePieces(ctx, worker.satelliteID, func(piece pieces.StoredPieceAccess) error {
err := worker.deletePiece(ctx, piece.PieceID())
if err == nil {
_, size, err := piece.Size(ctx)
if err != nil {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID),
zap.Stringer("Piece ID", piece.PieceID()), zap.Error(err))
}
totalDeleted += size
}
return err
})
if err != nil && !errs2.IsCanceled(err) {
worker.log.Debug("failed to retrieve piece info", zap.Stringer("Satellite ID", worker.satelliteID), zap.Error(err))
}
// update graceful exit progress
return worker.satelliteDB.UpdateGracefulExit(ctx, worker.satelliteID, totalDeleted)
}
func (worker *Worker) handleFailure(ctx context.Context, transferError pb.TransferFailed_Error, pieceID pb.PieceID, send func(*pb.StorageNodeMessage) error) {
failure := &pb.StorageNodeMessage{
Message: &pb.StorageNodeMessage_Failed{
Failed: &pb.TransferFailed{
OriginalPieceId: pieceID,
Error: transferError,
},
},
}
sendErr := send(failure)
if sendErr != nil {
worker.log.Error("unable to send failure.", zap.Stringer("Satellite ID", worker.satelliteID))
}
}
// Close halts the worker.
func (worker *Worker) Close() error {
worker.limiter.Wait()
return nil
}