cmd/satellite: restore-trash command

Change-Id: I80fc932c12147692d49cde277784871ac611fcad
This commit is contained in:
JT Olio 2021-02-18 08:33:49 -07:00
parent 50239e66f7
commit 3ae3389ddc
3 changed files with 107 additions and 1 deletions

View File

@ -34,6 +34,7 @@ import (
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/payments/stripecoinpayments"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite/satellitedb/dbx"
@ -231,6 +232,13 @@ var (
Long: "Cleanup Graceful Exit data which is lingering in the transfer queue DB table on nodes which has finished the exit.",
RunE: cmdConsistencyGECleanup,
}
restoreTrashCmd = &cobra.Command{
Use: "restore-trash [node-id-1 node-id-2 node-id-3 ...]",
Short: "Restore trash",
Long: "Tell storage nodes to undo garbage collection. " +
"If node ids aren't provided, *all* nodes are used.",
RunE: cmdRestoreTrash,
}
runCfg Satellite
setupCfg Satellite
@ -293,6 +301,7 @@ func init() {
rootCmd.AddCommand(compensationCmd)
rootCmd.AddCommand(billingCmd)
rootCmd.AddCommand(consistencyCmd)
rootCmd.AddCommand(restoreTrashCmd)
reportsCmd.AddCommand(nodeUsageCmd)
reportsCmd.AddCommand(partnerAttributionCmd)
reportsCmd.AddCommand(reportsGracefulExitCmd)
@ -313,6 +322,7 @@ func init() {
process.Bind(runAdminCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runRepairerCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runGCCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(restoreTrashCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
@ -743,6 +753,63 @@ func cmdConsistencyGECleanup(cmd *cobra.Command, args []string) error {
return cleanupGEOrphanedData(ctx, before.UTC())
}
func cmdRestoreTrash(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
log := zap.L()
db, err := satellitedb.Open(ctx, log.Named("restore-trash"), runCfg.Database, satellitedb.Options{ApplicationName: "satellite-restore-trash"})
if err != nil {
return errs.New("Error creating new master database connection: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
undelete := func(ctx context.Context, node *overlay.SelectedNode) error {
// TODO
return nil
}
if len(args) == 0 {
var nodes []*overlay.SelectedNode
err = db.OverlayCache().IterateAllNodes(ctx, func(ctx context.Context, node *overlay.SelectedNode) error {
nodes = append(nodes, node)
return nil
})
for _, node := range nodes {
err = undelete(ctx, node)
if err != nil {
return err
}
}
if err != nil {
return err
}
} else {
for _, nodeid := range args {
parsedNodeID, err := storj.NodeIDFromString(nodeid)
if err != nil {
return err
}
dossier, err := db.OverlayCache().Get(ctx, parsedNodeID)
if err != nil {
return err
}
err = undelete(ctx, &overlay.SelectedNode{
ID: dossier.Id,
Address: dossier.Address,
LastNet: dossier.LastNet,
LastIPPort: dossier.LastIPPort,
})
if err != nil {
return err
}
}
}
return nil
}
func main() {
process.ExecCustomDebug(rootCmd)
}

View File

@ -103,6 +103,9 @@ type DB interface {
// AuditHistoryDB includes operations for interfacing with the audit history table.
AuditHistoryDB
// IterateAllNodes will call cb on all known nodes (used in restore trash contexts)
IterateAllNodes(context.Context, func(context.Context, *SelectedNode) error) error
}
// NodeCheckInInfo contains all the info that will be updated when a node checkins.

View File

@ -1589,7 +1589,7 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
audit_reputation_alpha, audit_reputation_beta,
unknown_audit_reputation_alpha, unknown_audit_reputation_beta,
major, minor, patch, hash, timestamp, release,
last_ip_port,
last_ip_port,
wallet_features
)
VALUES (
@ -1679,3 +1679,39 @@ func (cache *overlaycache) TestUnvetNode(ctx context.Context, nodeID storj.NodeI
_, err = cache.Get(ctx, nodeID)
return err
}
// IterateAllNodes will call cb on all known nodes (used in restore trash contexts)
func (cache *overlaycache) IterateAllNodes(ctx context.Context, cb func(context.Context, *overlay.SelectedNode) error) (err error) {
defer mon.Task()(&ctx)(&err)
var rows tagsql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT last_net, id, address, last_ip_port
FROM nodes
`))
if err != nil {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var node overlay.SelectedNode
node.Address = &pb.NodeAddress{Transport: pb.NodeTransport_TCP_TLS_GRPC}
var lastIPPort sql.NullString
err = rows.Scan(&node.LastNet, &node.ID, &node.Address.Address, &lastIPPort)
if err != nil {
return Error.Wrap(err)
}
if lastIPPort.Valid {
node.LastIPPort = lastIPPort.String
}
err = cb(ctx, &node)
if err != nil {
return err
}
}
return rows.Err()
}