storj/cmd/satellite/gracefulexit.go
Ivan Fraixedes 7fb86617fc satellite/satellitedb: Use CRDB AS OF SYSTEM & batch for GE
Use the 'AS OF SYSTEM TIME' Cockroach DB clause for the Graceful Exit
(a.k.a GE) queries that count the delete the GE queue items of nodes
which have already exited the network.

Split the subquery used for deleting all the transfer queue items of
nodes which has exited when CRDB is used and batch the queries because
CRDB struggles when executing in a single query unlike Postgres.

The new test which has been added to this commit to verify the CRDB
batch logic for deleting all the transfer queue items of the exited
nodes has raised that the Enqueue method has to run in baches when CRDB
is used otherwise CRDB has return the error "driver: bad connection"
when a big a amount of items are passed to be enqueued. This error
didn't happen with the current test implementation it was with an
initial one that it was creating a big amount of exited nodes and
transfer queue items for those nodes.

Change-Id: I6a099cdbc515a240596bc93141fea3182c2e50a9
2021-05-07 13:09:19 -04:00

249 lines
7.5 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/csv"
"encoding/hex"
"fmt"
"io"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/satellitedb"
)
// generateGracefulExitCSV creates a report with graceful exit data for exiting or exited nodes in a given period.
func generateGracefulExitCSV(ctx context.Context, completed bool, start time.Time, end time.Time, output io.Writer) error {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), reportsGracefulExitCfg.Database, satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
var nodeIDs storj.NodeIDList
if completed {
nodeIDs, err = db.OverlayCache().GetGracefulExitCompletedByTimeFrame(ctx, start, end)
if err != nil {
return err
}
} else {
nodeIDs, err = db.OverlayCache().GetGracefulExitIncompleteByTimeFrame(ctx, start, end)
if err != nil {
return err
}
}
w := csv.NewWriter(output)
headers := []string{
"nodeID",
"walletAddress",
"nodeCreationDate",
"initiatedGracefulExit",
"completedGracefulExit",
"transferredGB",
}
if err := w.Write(headers); err != nil {
return err
}
for _, id := range nodeIDs {
node, err := db.OverlayCache().Get(ctx, id)
if err != nil {
return err
}
exitProgress, err := db.GracefulExit().GetProgress(ctx, id)
if gracefulexit.ErrNodeNotFound.Has(err) {
exitProgress = &gracefulexit.Progress{}
} else if err != nil {
return err
}
exitStatus := node.ExitStatus
exitFinished := ""
if exitStatus.ExitFinishedAt != nil {
exitFinished = exitStatus.ExitFinishedAt.Format("2006-01-02")
}
nextRow := []string{
node.Id.String(),
node.Operator.Wallet,
node.CreatedAt.Format("2006-01-02"),
exitStatus.ExitInitiatedAt.Format("2006-01-02"),
exitFinished,
strconv.FormatInt(exitProgress.BytesTransferred, 10),
}
if err := w.Write(nextRow); err != nil {
return err
}
}
if err := w.Error(); err != nil {
return err
}
w.Flush()
if output != os.Stdout {
if completed {
fmt.Println("Generated report for nodes that have completed graceful exit.")
} else {
fmt.Println("Generated report for nodes that are in the process of graceful exiting.")
}
}
return err
}
func verifyGracefulExitReceipt(ctx context.Context, identity *identity.FullIdentity, nodeID storj.NodeID, receipt string) error {
signee := signing.SigneeFromPeerIdentity(identity.PeerIdentity())
bytes, err := hex.DecodeString(receipt)
if err != nil {
return errs.Wrap(err)
}
// try to unmarshal as an ExitCompleted first
completed := &pb.ExitCompleted{}
err = pb.Unmarshal(bytes, completed)
if err != nil {
// if it is not a ExitCompleted, try ExitFailed
failed := &pb.ExitFailed{}
err = pb.Unmarshal(bytes, failed)
if err != nil {
return errs.Wrap(err)
}
err = checkIDs(identity.PeerIdentity().ID, nodeID, failed.SatelliteId, failed.NodeId)
if err != nil {
return err
}
err = signing.VerifyExitFailed(ctx, signee, failed)
if err != nil {
return errs.Wrap(err)
}
return writeVerificationMessage(false, failed.SatelliteId, failed.NodeId, failed.Failed)
}
err = checkIDs(identity.PeerIdentity().ID, nodeID, completed.SatelliteId, completed.NodeId)
if err != nil {
return err
}
err = signing.VerifyExitCompleted(ctx, signee, completed)
if err != nil {
return errs.Wrap(err)
}
return writeVerificationMessage(true, completed.SatelliteId, completed.NodeId, completed.Completed)
}
func cleanupGEOrphanedData(ctx context.Context, before time.Time, config gracefulexit.Config) (err error) {
db, err := satellitedb.Open(ctx, zap.L().Named("db"), consistencyGECleanupCfg.Database,
satellitedb.Options{ApplicationName: "satellite-gracefulexit"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
nodesItems, err := db.GracefulExit().CountFinishedTransferQueueItemsByNode(ctx, before, config.AsOfSystemTimeInterval)
if err != nil {
return err
}
if len(nodesItems) == 0 {
fmt.Printf("There isn't any item left in the DB for nodes exited before %s\n", before.Format("2006-01-02"))
return nil
}
{ // print the nodesItems
fmt.Println(" Node ID | Num. Items ")
fmt.Println("----------------------------------------------------------------------------------------")
var totalItems int64
for id, n := range nodesItems {
sid := id.String()
// 61 is the char positions between the beginning of the line and the next
// column separator, and 24 the char positions for the second column
// length. Measuring the length of the first column value (node ID), we
// calculate how many positions to shift to start printing in the next
// column and then we tell the Printf to align the value to the right by
// 24 positions which is where the column ends and where last column char
// value should end.
fmt.Printf(fmt.Sprintf(" %%s %%%dd\n", 24+61-len(sid)), sid, n)
totalItems += n
}
fmt.Println("----------------------------------------------------------------------------------------")
fmt.Printf(" Total | %22d \n\n", totalItems)
}
_, err = fmt.Printf("Confirm that you want to delete the above items from the DB? (confirm with 'yes') ")
if err != nil {
return err
}
var confirm string
n, err := fmt.Scanln(&confirm)
if err != nil {
if n != 0 {
return err
}
// fmt.Scanln cannot handle empty input
confirm = "n"
}
if strings.ToLower(confirm) != "yes" {
fmt.Println("Aborted, NO ITEMS have been deleted")
return nil
}
queueTotal, err := db.GracefulExit().DeleteAllFinishedTransferQueueItems(ctx, before, config.AsOfSystemTimeInterval, config.TransferQueueBatchSize)
if err != nil {
fmt.Println("Error, NO ITEMS have been deleted from transfer queue")
return err
}
progressTotal, err := db.GracefulExit().DeleteFinishedExitProgress(ctx, before, config.AsOfSystemTimeInterval)
if err != nil {
fmt.Printf("Error, %d stale entries were deleted from exit progress table. More stale entries might remain.\n", progressTotal)
return err
}
fmt.Printf("%d items have been deleted from transfer queue and %d items deleted from exit progress\n", queueTotal, progressTotal)
return nil
}
func checkIDs(satelliteID storj.NodeID, providedSNID storj.NodeID, receiptSatelliteID storj.NodeID, receiptSNID storj.NodeID) error {
if satelliteID != receiptSatelliteID {
return errs.New("satellite ID (%v) does not match receipt satellite ID (%v).", satelliteID, receiptSatelliteID)
}
if providedSNID != receiptSNID {
return errs.New("provided storage node ID (%v) does not match receipt node ID (%v).", providedSNID, receiptSNID)
}
return nil
}
func writeVerificationMessage(succeeded bool, satelliteID storj.NodeID, snID storj.NodeID, timestamp time.Time) error {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "Succeeded:\t%v\n", succeeded)
fmt.Fprintf(w, "Satellite ID:\t%v\n", satelliteID)
fmt.Fprintf(w, "Storage Node ID:\t%v\n", snID)
fmt.Fprintf(w, "Timestamp:\t%v\n", timestamp)
return errs.Wrap(w.Flush())
}