satellite/gracefulexit: Add graceful exit completed/failed receipt verification to satellite CLI (#3679)
This commit is contained in:
parent
2461ccd469
commit
9420fa9fc5
@ -6,15 +6,21 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/storj/pkg/identity"
|
||||||
|
"storj.io/storj/pkg/pb"
|
||||||
|
"storj.io/storj/pkg/signing"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
"storj.io/storj/satellite/gracefulexit"
|
"storj.io/storj/satellite/gracefulexit"
|
||||||
"storj.io/storj/satellite/satellitedb"
|
"storj.io/storj/satellite/satellitedb"
|
||||||
@ -98,3 +104,68 @@ func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Tim
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func verifyGracefulExitReceipt(ctx context.Context, identity *identity.FullIdentity, nodeID storj.NodeID, receipt string) error {
|
||||||
|
signee := signing.SigneeFromPeerIdentity(identity.PeerIdentity())
|
||||||
|
|
||||||
|
bytes, err := hex.DecodeString(receipt)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to unmarshal as an ExitCompleted first
|
||||||
|
completed := &pb.ExitCompleted{}
|
||||||
|
err = proto.Unmarshal(bytes, completed)
|
||||||
|
if err != nil {
|
||||||
|
// if it is not a ExitCompleted, try ExitFailed
|
||||||
|
failed := &pb.ExitFailed{}
|
||||||
|
err = proto.Unmarshal(bytes, failed)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = checkIDs(identity.PeerIdentity().ID, nodeID, failed.SatelliteId, failed.NodeId)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = signing.VerifyExitFailed(ctx, signee, failed)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeVerificationMessage(false, failed.SatelliteId, failed.NodeId, failed.Failed)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = checkIDs(identity.PeerIdentity().ID, nodeID, completed.SatelliteId, completed.NodeId)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = signing.VerifyExitCompleted(ctx, signee, completed)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeVerificationMessage(true, completed.SatelliteId, completed.NodeId, completed.Completed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkIDs(satelliteID storj.NodeID, providedSNID storj.NodeID, receiptSatelliteID storj.NodeID, receiptSNID storj.NodeID) error {
|
||||||
|
if satelliteID != receiptSatelliteID {
|
||||||
|
return errs.New("satellite ID (%v) does not match receipt satellite ID (%v).", satelliteID, receiptSatelliteID)
|
||||||
|
}
|
||||||
|
if providedSNID != receiptSNID {
|
||||||
|
return errs.New("provided storage node ID (%v) does not match receipt node ID (%v).", providedSNID, receiptSNID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeVerificationMessage(succeeded bool, satelliteID storj.NodeID, snID storj.NodeID, timestamp time.Time) error {
|
||||||
|
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
||||||
|
fmt.Fprintf(w, "Succeeded:\t%v\n", succeeded)
|
||||||
|
fmt.Fprintf(w, "Satellite ID:\t%v\n", satelliteID)
|
||||||
|
fmt.Fprintf(w, "Storage Node ID:\t%v\n", snID)
|
||||||
|
fmt.Fprintf(w, "Timestamp:\t%v\n", timestamp)
|
||||||
|
|
||||||
|
return errs.Wrap(w.Flush())
|
||||||
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"storj.io/storj/pkg/cfgstruct"
|
"storj.io/storj/pkg/cfgstruct"
|
||||||
"storj.io/storj/pkg/process"
|
"storj.io/storj/pkg/process"
|
||||||
"storj.io/storj/pkg/revocation"
|
"storj.io/storj/pkg/revocation"
|
||||||
|
"storj.io/storj/pkg/storj"
|
||||||
"storj.io/storj/private/fpath"
|
"storj.io/storj/private/fpath"
|
||||||
"storj.io/storj/private/version"
|
"storj.io/storj/private/version"
|
||||||
"storj.io/storj/satellite"
|
"storj.io/storj/satellite"
|
||||||
@ -97,6 +98,14 @@ var (
|
|||||||
RunE: cmdGracefulExit,
|
RunE: cmdGracefulExit,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
verifyGracefulExitReceiptCmd = &cobra.Command{
|
||||||
|
Use: "verify-exit-receipt [storage node ID] [receipt]",
|
||||||
|
Short: "Verify a graceful exit receipt",
|
||||||
|
Long: "Verify a graceful exit receipt is valid.",
|
||||||
|
Args: cobra.MinimumNArgs(2),
|
||||||
|
RunE: cmdVerifyGracefulExitReceipt,
|
||||||
|
}
|
||||||
|
|
||||||
runCfg Satellite
|
runCfg Satellite
|
||||||
setupCfg Satellite
|
setupCfg Satellite
|
||||||
|
|
||||||
@ -113,10 +122,12 @@ var (
|
|||||||
Output string `help:"destination of report output" default:""`
|
Output string `help:"destination of report output" default:""`
|
||||||
}
|
}
|
||||||
gracefulExitCfg struct {
|
gracefulExitCfg struct {
|
||||||
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
|
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
|
||||||
Output string `help:"destination of report output" default:""`
|
Output string `help:"destination of report output" default:""`
|
||||||
Completed bool `help:"whether to output (initiated and completed) or (initiated and not completed)" default:"false"`
|
Completed bool `help:"whether to output (initiated and completed) or (initiated and not completed)" default:"false"`
|
||||||
}
|
}
|
||||||
|
verifyGracefulExitReceiptCfg struct {
|
||||||
|
}
|
||||||
confDir string
|
confDir string
|
||||||
identityDir string
|
identityDir string
|
||||||
)
|
)
|
||||||
@ -137,6 +148,7 @@ func init() {
|
|||||||
reportsCmd.AddCommand(nodeUsageCmd)
|
reportsCmd.AddCommand(nodeUsageCmd)
|
||||||
reportsCmd.AddCommand(partnerAttributionCmd)
|
reportsCmd.AddCommand(partnerAttributionCmd)
|
||||||
reportsCmd.AddCommand(gracefulExitCmd)
|
reportsCmd.AddCommand(gracefulExitCmd)
|
||||||
|
reportsCmd.AddCommand(verifyGracefulExitReceiptCmd)
|
||||||
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
process.Bind(runMigrationCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(runMigrationCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(runAPICmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
@ -145,6 +157,7 @@ func init() {
|
|||||||
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
process.Bind(gracefulExitCmd, &gracefulExitCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(gracefulExitCmd, &gracefulExitCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
|
process.Bind(verifyGracefulExitReceiptCmd, &verifyGracefulExitReceiptCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
process.Bind(partnerAttributionCmd, &partnerAttribtionCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
process.Bind(partnerAttributionCmd, &partnerAttribtionCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -287,6 +300,23 @@ func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
|
|||||||
return w.Flush()
|
return w.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cmdVerifyGracefulExitReceipt(cmd *cobra.Command, args []string) (err error) {
|
||||||
|
ctx, _ := process.Ctx(cmd)
|
||||||
|
|
||||||
|
identity, err := runCfg.Identity.Load()
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the node ID is valid
|
||||||
|
nodeID, err := storj.NodeIDFromString(args[0])
|
||||||
|
if err != nil {
|
||||||
|
return errs.Combine(err, errs.New("Invalid node ID."))
|
||||||
|
}
|
||||||
|
|
||||||
|
return verifyGracefulExitReceipt(ctx, identity, nodeID, args[1])
|
||||||
|
}
|
||||||
|
|
||||||
func cmdGracefulExit(cmd *cobra.Command, args []string) (err error) {
|
func cmdGracefulExit(cmd *cobra.Command, args []string) (err error) {
|
||||||
ctx, _ := process.Ctx(cmd)
|
ctx, _ := process.Ctx(cmd)
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
@ -122,13 +123,22 @@ func (worker *Worker) Run(ctx context.Context, done func()) (err error) {
|
|||||||
zap.Stringer("Satellite ID", worker.satelliteID),
|
zap.Stringer("Satellite ID", worker.satelliteID),
|
||||||
zap.Stringer("reason", msg.ExitFailed.Reason))
|
zap.Stringer("reason", msg.ExitFailed.Reason))
|
||||||
|
|
||||||
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitFailed, msg.ExitFailed.GetExitFailureSignature())
|
exitFailedBytes, err := proto.Marshal(msg.ExitFailed)
|
||||||
|
if err != nil {
|
||||||
|
worker.log.Error("failed to marshal exit failed message.")
|
||||||
|
}
|
||||||
|
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitFailed, exitFailedBytes)
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
|
|
||||||
case *pb.SatelliteMessage_ExitCompleted:
|
case *pb.SatelliteMessage_ExitCompleted:
|
||||||
worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteID))
|
worker.log.Info("graceful exit completed.", zap.Stringer("Satellite ID", worker.satelliteID))
|
||||||
|
|
||||||
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitSucceeded, msg.ExitCompleted.GetExitCompleteSignature())
|
exitCompletedBytes, err := proto.Marshal(msg.ExitCompleted)
|
||||||
|
if err != nil {
|
||||||
|
worker.log.Error("failed to marshal exit completed message.")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = worker.satelliteDB.CompleteGracefulExit(ctx, worker.satelliteID, time.Now(), satellites.ExitSucceeded, exitCompletedBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.Wrap(err)
|
return errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user