cmd/storagenode: add forget-satellite subcommand

This change adds a new forget-satellite sub-command to
the storagenode CLI which cleans up untrusted satellite
data.

Issue: https://github.com/storj/storj/issues/6068
Change-Id: Iafa109fdc98afdba7582f568a61c22222da65f02
This commit is contained in:
Clement Sam 2023-08-01 09:46:55 +00:00 committed by Storj Robot
parent dcf3f25f93
commit f14fabc90a
16 changed files with 799 additions and 134 deletions

View File

@ -0,0 +1,242 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/satellites"
"storj.io/storj/storagenode/storagenodedb"
"storj.io/storj/storagenode/trust"
)
// runCfg defines configuration for run command.
type forgetSatelliteCfg struct {
storagenode.Config
SatelliteIDs []string `internal:"true"`
AllUntrusted bool `help:"Clean up all untrusted satellites" default:"false"`
Force bool `help:"Force removal of satellite data if not listed in satelliteDB cache or marked as untrusted" default:"false"`
}
func newForgetSatelliteCmd(f *Factory) *cobra.Command {
var cfg forgetSatelliteCfg
cmd := &cobra.Command{
Use: "forget-satellite [satellite_IDs...]",
Short: "Remove an untrusted satellite from the trust cache and clean up its data",
Long: "Forget a satellite.\n" +
"The command shows the list of the available untrusted satellites " +
"and removes the selected satellites from the trust cache and clean up the available data",
Example: `
# Specify satellite ID to forget
$ storagenode forget-satellite --identity-dir /path/to/identityDir --config-dir /path/to/configDir satellite_ID
# Specify multiple satellite IDs to forget
$ storagenode forget-satellite satellite_ID1 satellite_ID2 --identity-dir /path/to/identityDir --config-dir /path/to/configDir
# Clean up all untrusted satellites
# This checks for untrusted satellites in both the satelliteDB cache and the excluded satellites list
# specified in the config.yaml file
$ storagenode forget-satellite --all-untrusted --identity-dir /path/to/identityDir --config-dir /path/to/configDir
# For force removal of data for untrusted satellites that are not listed in satelliteDB cache or marked as untrusted
$ storagenode forget-satellite satellite_ID1 satellite_ID2 --force --identity-dir /path/to/identityDir --config-dir /path/to/configDir
`,
RunE: func(cmd *cobra.Command, args []string) error {
cfg.SatelliteIDs = args
if len(args) > 0 && cfg.AllUntrusted {
return errs.New("cannot specify both satellite IDs and --all-untrusted")
}
if len(args) == 0 && !cfg.AllUntrusted {
return errs.New("must specify either satellite ID(s) as arguments or --all-untrusted flag")
}
if cfg.AllUntrusted && cfg.Force {
return errs.New("cannot specify both --all-untrusted and --force")
}
ctx, _ := process.Ctx(cmd)
return cmdForgetSatellite(ctx, zap.L(), &cfg)
},
Annotations: map[string]string{"type": "helper"},
}
process.Bind(cmd, &cfg, f.Defaults, cfgstruct.ConfDir(f.ConfDir), cfgstruct.IdentityDir(f.IdentityDir))
return cmd
}
func cmdForgetSatellite(ctx context.Context, log *zap.Logger, cfg *forgetSatelliteCfg) (err error) {
// we don't really need the identity, but we load it as a sanity check
ident, err := cfg.Identity.Load()
if err != nil {
log.Fatal("Failed to load identity.", zap.Error(err))
} else {
log.Info("Identity loaded.", zap.Stringer("Node ID", ident.ID))
}
db, err := storagenodedb.OpenExisting(ctx, log.Named("db"), cfg.DatabaseConfig())
if err != nil {
return errs.New("Error starting master database on storagenode: %+v", err)
}
satelliteDB := db.Satellites()
// get list of excluded satellites
excludedSatellites := make(map[storj.NodeID]bool)
for _, rule := range cfg.Storage2.Trust.Exclusions.Rules {
url, err := trust.ParseSatelliteURL(rule.String())
if err != nil {
log.Warn("Failed to parse satellite URL from exclusions list", zap.Error(err), zap.String("rule", rule.String()))
continue
}
excludedSatellites[url.ID] = false // false means the satellite has not been cleaned up yet.
}
if len(cfg.SatelliteIDs) > 0 {
for _, satelliteIDStr := range cfg.SatelliteIDs {
satelliteID, err := storj.NodeIDFromString(satelliteIDStr)
if err != nil {
return err
}
satellite := satellites.Satellite{
SatelliteID: satelliteID,
Status: satellites.Untrusted,
}
// check if satellite is excluded
cleanedUp, isExcluded := excludedSatellites[satelliteID]
if !isExcluded {
sat, err := satelliteDB.GetSatellite(ctx, satelliteID)
if err != nil {
return err
}
if !satellite.SatelliteID.IsZero() {
satellite = sat
}
if satellite.SatelliteID.IsZero() && !cfg.Force {
return errs.New("satellite %v not found. Specify --force to force data deletion", satelliteID)
}
log.Warn("Satellite not found in satelliteDB cache. Forcing removal of satellite data.", zap.Stringer("satelliteID", satelliteID))
}
if cleanedUp {
log.Warn("Satellite already cleaned up", zap.Stringer("satelliteID", satelliteID))
continue
}
err = cleanupSatellite(ctx, log, cfg, db, satellite)
if err != nil {
return err
}
}
} else {
sats, err := satelliteDB.GetSatellites(ctx)
if err != nil {
return err
}
hasUntrusted := false
for _, satellite := range sats {
if satellite.Status != satellites.Untrusted {
continue
}
hasUntrusted = true
err = cleanupSatellite(ctx, log, cfg, db, satellite)
if err != nil {
return err
}
excludedSatellites[satellite.SatelliteID] = true // true means the satellite has been cleaned up.
}
// clean up excluded satellites that might not be in the satelliteDB cache.
for satelliteID, cleanedUp := range excludedSatellites {
if !cleanedUp {
satellite := satellites.Satellite{
SatelliteID: satelliteID,
Status: satellites.Untrusted,
}
hasUntrusted = true
err = cleanupSatellite(ctx, log, cfg, db, satellite)
if err != nil {
return err
}
}
}
if !hasUntrusted {
log.Info("No untrusted satellites found. You can add satellites to the exclusions list in the config.yaml file.")
}
}
return nil
}
func cleanupSatellite(ctx context.Context, log *zap.Logger, cfg *forgetSatelliteCfg, db *storagenodedb.DB, satellite satellites.Satellite) error {
if satellite.Status != satellites.Untrusted && !cfg.Force {
log.Error("Satellite is not untrusted. Skipping", zap.Stringer("satelliteID", satellite.SatelliteID))
return nil
}
log.Info("Removing satellite from trust cache.", zap.Stringer("satelliteID", satellite.SatelliteID))
cache, err := trust.LoadCache(cfg.Storage2.Trust.CachePath)
if err != nil {
return err
}
deleted := cache.DeleteSatelliteEntry(satellite.SatelliteID)
if deleted {
if err := cache.Save(ctx); err != nil {
return err
}
log.Info("Satellite removed from trust cache.", zap.Stringer("satelliteID", satellite.SatelliteID))
}
log.Info("Cleaning up satellite data.", zap.Stringer("satelliteID", satellite.SatelliteID))
blobs := pieces.NewBlobsUsageCache(log.Named("blobscache"), db.Pieces())
if err := blobs.DeleteNamespace(ctx, satellite.SatelliteID.Bytes()); err != nil {
return err
}
log.Info("Cleaning up the trash.", zap.Stringer("satelliteID", satellite.SatelliteID))
err = blobs.DeleteTrashNamespace(ctx, satellite.SatelliteID.Bytes())
if err != nil {
return err
}
log.Info("Removing satellite info from reputation DB.", zap.Stringer("satelliteID", satellite.SatelliteID))
err = db.Reputation().Delete(ctx, satellite.SatelliteID)
if err != nil {
return err
}
// delete v0 pieces for the satellite, if any.
log.Info("Removing satellite v0 pieces if any.", zap.Stringer("satelliteID", satellite.SatelliteID))
err = db.V0PieceInfo().WalkSatelliteV0Pieces(ctx, db.Pieces(), satellite.SatelliteID, func(access pieces.StoredPieceAccess) error {
return db.Pieces().Delete(ctx, access.BlobRef())
})
if err != nil {
return err
}
log.Info("Removing satellite from satellites DB.", zap.Stringer("satelliteID", satellite.SatelliteID))
err = db.Satellites().DeleteSatellite(ctx, satellite.SatelliteID)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,242 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"os"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"storj.io/common/identity"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode/blobstore"
"storj.io/storj/storagenode/blobstore/filestore"
"storj.io/storj/storagenode/reputation"
"storj.io/storj/storagenode/satellites"
)
func Test_newForgetSatelliteCmd_Error(t *testing.T) {
tests := []struct {
name string
args string
wantErr string
}{
{
name: "no args",
args: "",
wantErr: "must specify either satellite ID(s) as arguments or --all-untrusted flag",
},
{
name: "Both satellite ID and --all-untrusted flag specified",
args: "--all-untrusted 1234567890123456789012345678901234567890123456789012345678901234",
wantErr: "cannot specify both satellite IDs and --all-untrusted",
},
{
name: "--all-untrusted and --force specified",
args: "--all-untrusted --force",
wantErr: "cannot specify both --all-untrusted and --force",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cmd := newForgetSatelliteCmd(&Factory{})
cmd.SetArgs(strings.Fields(tt.args))
err := cmd.ExecuteContext(testcontext.New(t))
if tt.wantErr == "" {
require.NoError(t, err)
return
}
require.Equal(t, tt.wantErr, err.Error())
})
}
}
func Test_cmdForgetSatellite(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
address := planet.StorageNodes[0].Server.PrivateAddr().String()
db := planet.StorageNodes[0].DB
log := zaptest.NewLogger(t)
store, err := filestore.NewAt(log, db.Config().Pieces, filestore.DefaultConfig)
require.NoError(t, err)
satelliteID := planet.Satellites[0].ID()
blobSize := memory.KB
blobRef := blobstore.BlobRef{
Namespace: satelliteID.Bytes(),
Key: testrand.PieceID().Bytes(),
}
w, err := store.Create(ctx, blobRef, -1)
require.NoError(t, err)
_, err = w.Write(testrand.Bytes(blobSize))
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
// create a new satellite reputation
timestamp := time.Now().UTC()
reputationDB := db.Reputation()
stats := reputation.Stats{
SatelliteID: satelliteID,
Audit: reputation.Metric{
TotalCount: 6,
SuccessCount: 7,
Alpha: 8,
Beta: 9,
Score: 10,
UnknownAlpha: 11,
UnknownBeta: 12,
UnknownScore: 13,
},
OnlineScore: 14,
UpdatedAt: timestamp,
JoinedAt: timestamp,
}
err = reputationDB.Store(ctx, stats)
require.NoError(t, err)
// test that the reputation was stored correctly
rstats, err := reputationDB.Get(ctx, satelliteID)
require.NoError(t, err)
require.NotNil(t, rstats)
require.Equal(t, stats, *rstats)
// insert a new untrusted satellite in the database
err = db.Satellites().SetAddressAndStatus(ctx, satelliteID, address, satellites.Untrusted)
require.NoError(t, err)
// test that the satellite was inserted correctly
satellite, err := db.Satellites().GetSatellite(ctx, satelliteID)
require.NoError(t, err)
require.Equal(t, satellites.Untrusted, satellites.Status(satellite.Status))
// set up the identity
ident := planet.StorageNodes[0].Identity
identConfig := identity.Config{
CertPath: ctx.File("identity", "identity.cert"),
KeyPath: ctx.File("identity", "identity.Key"),
}
err = identConfig.Save(ident)
require.NoError(t, err)
planet.StorageNodes[0].Config.Identity = identConfig
// run the forget satellite command with All flag
err = cmdForgetSatellite(ctx, log, &forgetSatelliteCfg{
AllUntrusted: true,
Config: planet.StorageNodes[0].Config,
})
require.NoError(t, err)
// check that the blob was deleted
blobInfo, err := store.Stat(ctx, blobRef)
require.Error(t, err)
require.True(t, errs.Is(err, os.ErrNotExist))
require.Nil(t, blobInfo)
// check that the reputation was deleted
rstats, err = reputationDB.Get(ctx, satelliteID)
require.NoError(t, err)
require.Equal(t, &reputation.Stats{SatelliteID: satelliteID}, rstats)
// check that the satellite info was deleted from the database
satellite, err = db.Satellites().GetSatellite(ctx, satelliteID)
require.NoError(t, err)
require.True(t, satellite.SatelliteID.IsZero())
})
}
func Test_cmdForgetSatellite_Exclusions(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
address := planet.StorageNodes[0].Server.PrivateAddr().String()
db := planet.StorageNodes[0].DB
log := zaptest.NewLogger(t)
store, err := filestore.NewAt(log, db.Config().Pieces, filestore.DefaultConfig)
require.NoError(t, err)
satelliteID := planet.Satellites[0].ID()
blobSize := memory.KB
blobRef := blobstore.BlobRef{
Namespace: satelliteID.Bytes(),
Key: testrand.PieceID().Bytes(),
}
w, err := store.Create(ctx, blobRef, -1)
require.NoError(t, err)
_, err = w.Write(testrand.Bytes(blobSize))
require.NoError(t, err)
require.NoError(t, w.Commit(ctx))
// create a new satellite reputation
timestamp := time.Now().UTC()
reputationDB := db.Reputation()
stats := reputation.Stats{
SatelliteID: satelliteID,
Audit: reputation.Metric{
TotalCount: 6,
SuccessCount: 7,
Alpha: 8,
Beta: 9,
Score: 10,
UnknownAlpha: 11,
UnknownBeta: 12,
UnknownScore: 13,
},
OnlineScore: 14,
UpdatedAt: timestamp,
JoinedAt: timestamp,
}
err = reputationDB.Store(ctx, stats)
require.NoError(t, err)
// test that the reputation was stored correctly
rstats, err := reputationDB.Get(ctx, satelliteID)
require.NoError(t, err)
require.NotNil(t, rstats)
require.Equal(t, stats, *rstats)
// set up the identity
ident := planet.StorageNodes[0].Identity
identConfig := identity.Config{
CertPath: ctx.File("identity", "identity.cert"),
KeyPath: ctx.File("identity", "identity.Key"),
}
err = identConfig.Save(ident)
require.NoError(t, err)
planet.StorageNodes[0].Config.Identity = identConfig
// add the satellite to the exclusion list
err = planet.StorageNodes[0].Config.Storage2.Trust.Exclusions.Set(satelliteID.String() + "@" + address)
require.NoError(t, err)
// run the forget satellite command with All flag
err = cmdForgetSatellite(ctx, log, &forgetSatelliteCfg{
AllUntrusted: true,
Config: planet.StorageNodes[0].Config,
})
require.NoError(t, err)
// check that the blob was deleted
blobInfo, err := store.Stat(ctx, blobRef)
require.Error(t, err)
require.True(t, errs.Is(err, os.ErrNotExist))
require.Nil(t, blobInfo)
// check that the reputation was deleted
rstats, err = reputationDB.Get(ctx, satelliteID)
require.NoError(t, err)
require.Equal(t, &reputation.Stats{SatelliteID: satelliteID}, rstats)
// check that the satellite info was deleted from the database
satellite, err := db.Satellites().GetSatellite(ctx, satelliteID)
require.NoError(t, err)
require.True(t, satellite.SatelliteID.IsZero())
})
}

View File

@ -59,6 +59,7 @@ func newRootCmd(setDefaults bool) (*cobra.Command, *Factory) {
newIssueAPIKeyCmd(factory),
newGracefulExitInitCmd(factory),
newGracefulExitStatusCmd(factory),
newForgetSatelliteCmd(factory),
// internal hidden commands
internalcmd.NewUsedSpaceFilewalkerCmd().Command,
internalcmd.NewGCFilewalkerCmd().Command,

View File

@ -83,6 +83,8 @@ type Blobs interface {
DeleteWithStorageFormat(ctx context.Context, ref BlobRef, formatVer FormatVersion) error
// DeleteNamespace deletes blobs folder for a specific namespace.
DeleteNamespace(ctx context.Context, ref []byte) (err error)
// DeleteTrashNamespace deletes the trash folder for a given namespace.
DeleteTrashNamespace(ctx context.Context, namespace []byte) (err error)
// Trash marks a file for pending deletion.
Trash(ctx context.Context, ref BlobRef) error
// RestoreTrash restores all files in the trash for a given namespace and returns the keys restored.

View File

@ -541,6 +541,12 @@ func (dir *Dir) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore
return bytesEmptied, deletedKeys, errorsEncountered.Err()
}
// DeleteTrashNamespace deletes the entire trash namespace.
func (dir *Dir) DeleteTrashNamespace(ctx context.Context, namespace []byte) (err error) {
mon.Task()(&ctx)(&err)
return dir.deleteNamespace(ctx, dir.trashdir(), namespace)
}
// iterateStorageFormatVersions executes f for all storage format versions,
// starting with the oldest format version. It is more likely, in the general
// case, that we will find the piece with the newest format version instead,

View File

@ -141,6 +141,13 @@ func (store *blobStore) DeleteNamespace(ctx context.Context, ref []byte) (err er
return Error.Wrap(err)
}
// DeleteTrashNamespace deletes trash folder of specific satellite.
func (store *blobStore) DeleteTrashNamespace(ctx context.Context, namespace []byte) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.dir.DeleteTrashNamespace(ctx, namespace)
return Error.Wrap(err)
}
// Trash moves the ref to a trash directory.
func (store *blobStore) Trash(ctx context.Context, ref blobstore.BlobRef) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -192,6 +192,14 @@ func (bad *BadBlobs) DeleteNamespace(ctx context.Context, ref []byte) (err error
return bad.blobs.DeleteNamespace(ctx, ref)
}
// DeleteTrashNamespace deletes the trash folder for the namespace.
func (bad *BadBlobs) DeleteTrashNamespace(ctx context.Context, namespace []byte) error {
if err := bad.err.Err(); err != nil {
return err
}
return bad.blobs.DeleteTrashNamespace(ctx, namespace)
}
// Stat looks up disk metadata on the blob file.
func (bad *BadBlobs) Stat(ctx context.Context, ref blobstore.BlobRef) (blobstore.BlobInfo, error) {
if err := bad.err.Err(); err != nil {

View File

@ -139,6 +139,14 @@ func (slow *SlowBlobs) DeleteNamespace(ctx context.Context, ref []byte) (err err
return slow.blobs.DeleteNamespace(ctx, ref)
}
// DeleteTrashNamespace deletes the trash folder for the specified namespace.
func (slow *SlowBlobs) DeleteTrashNamespace(ctx context.Context, namespace []byte) error {
if err := slow.sleep(ctx); err != nil {
return err
}
return slow.blobs.DeleteTrashNamespace(ctx, namespace)
}
// Stat looks up disk metadata on the blob file.
func (slow *SlowBlobs) Stat(ctx context.Context, ref blobstore.BlobRef) (blobstore.BlobInfo, error) {
if err := slow.sleep(ctx); err != nil {

View File

@ -21,6 +21,8 @@ type DB interface {
Get(ctx context.Context, satelliteID storj.NodeID) (*Stats, error)
// All retrieves all stats from DB
All(ctx context.Context) ([]Stats, error)
// Delete removes stats for specific satellite
Delete(ctx context.Context, satelliteID storj.NodeID) error
}
// Stats consist of reputation metrics.

View File

@ -24,6 +24,8 @@ const (
ExitSucceeded = 3
// ExitFailed reflects a graceful exit that failed.
ExitFailed = 4
// Untrusted reflects a satellite that is not trusted.
Untrusted = 5
)
// ExitProgress contains the status of a graceful exit.
@ -50,8 +52,16 @@ type Satellite struct {
type DB interface {
// SetAddress inserts into satellite's db id, address.
SetAddress(ctx context.Context, satelliteID storj.NodeID, address string) error
// SetAddressAndStatus inserts into satellite's db id, address and status.
SetAddressAndStatus(ctx context.Context, satelliteID storj.NodeID, address string, status Status) error
// GetSatellite retrieves that satellite by ID
GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite Satellite, err error)
// GetSatellites retrieves all satellites, including untrusted ones.
GetSatellites(ctx context.Context) (sats []Satellite, err error)
// DeleteSatellite removes that satellite by ID.
DeleteSatellite(ctx context.Context, satelliteID storj.NodeID) error
// UpdateSatelliteStatus updates the status of the satellite.
UpdateSatelliteStatus(ctx context.Context, satelliteID storj.NodeID, status Status) error
// GetSatellitesUrls retrieves all satellite's id and urls.
GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error)
// InitiateGracefulExit updates the database to reflect the beginning of a graceful exit

View File

@ -229,3 +229,11 @@ func (db *reputationDB) All(ctx context.Context) (_ []reputation.Stats, err erro
return statsList, rows.Err()
}
// Delete removes stats for specific satellite.
func (db *reputationDB) Delete(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, "DELETE FROM reputation WHERE satellite_id = ?", satelliteID)
return ErrReputation.Wrap(err)
}

View File

@ -40,6 +40,21 @@ func (db *satellitesDB) SetAddress(ctx context.Context, satelliteID storj.NodeID
return ErrSatellitesDB.Wrap(err)
}
// SetAddressAndStatus inserts into satellite's db id, address, added time and status.
func (db *satellitesDB) SetAddressAndStatus(ctx context.Context, satelliteID storj.NodeID, address string, status satellites.Status) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx,
`INSERT INTO satellites (node_id, address, added_at, status) VALUES(?,?,?,?) ON CONFLICT (node_id) DO UPDATE SET address = EXCLUDED.address, status = EXCLUDED.status`,
satelliteID,
address,
time.Now().UTC(),
status,
)
return ErrSatellitesDB.Wrap(err)
}
// GetSatellite retrieves that satellite by ID.
func (db *satellitesDB) GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite satellites.Satellite, err error) {
defer mon.Task()(&ctx)(&err)
@ -59,6 +74,27 @@ func (db *satellitesDB) GetSatellite(ctx context.Context, satelliteID storj.Node
return satellite, rows.Err()
}
// GetSatellites retrieves all satellites.
func (db *satellitesDB) GetSatellites(ctx context.Context) (sats []satellites.Satellite, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.QueryContext(ctx, "SELECT node_id, added_at, status FROM satellites")
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var satellite satellites.Satellite
err := rows.Scan(&satellite.SatelliteID, &satellite.AddedAt, &satellite.Status)
if err != nil {
return nil, err
}
sats = append(sats, satellite)
}
return sats, rows.Err()
}
// GetSatellitesUrls retrieves all satellite's id and urls.
func (db *satellitesDB) GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) {
defer mon.Task()(&ctx)(&err)
@ -93,6 +129,14 @@ func (db *satellitesDB) GetSatellitesUrls(ctx context.Context) (satelliteURLs []
return urls, nil
}
// UpdateSatelliteStatus updates satellite status.
func (db *satellitesDB) UpdateSatelliteStatus(ctx context.Context, satelliteID storj.NodeID, status satellites.Status) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, "UPDATE satellites SET status = ? WHERE node_id = ?", status, satelliteID)
return ErrSatellitesDB.Wrap(err)
}
// InitiateGracefulExit updates the database to reflect the beginning of a graceful exit.
func (db *satellitesDB) InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) (err error) {
defer mon.Task()(&ctx)(&err)
@ -128,12 +172,11 @@ func (db *satellitesDB) UpdateGracefulExit(ctx context.Context, satelliteID stor
func (db *satellitesDB) CompleteGracefulExit(ctx context.Context, satelliteID storj.NodeID, finishedAt time.Time, exitStatus satellites.Status, completionReceipt []byte) (err error) {
defer mon.Task()(&ctx)(&err)
return ErrSatellitesDB.Wrap(withTx(ctx, db.GetDB(), func(tx tagsql.Tx) error {
query := `UPDATE satellites SET status = ? WHERE node_id = ?`
_, err = tx.ExecContext(ctx, query, exitStatus, satelliteID)
err := db.UpdateSatelliteStatus(ctx, satelliteID, exitStatus)
if err != nil {
return err
}
query = `UPDATE satellite_exit_progress SET finished_at = ?, completion_receipt = ? WHERE satellite_id = ?`
query := `UPDATE satellite_exit_progress SET finished_at = ?, completion_receipt = ? WHERE satellite_id = ?`
_, err = tx.ExecContext(ctx, query, finishedAt.UTC(), completionReceipt, satelliteID)
return err
}))
@ -163,3 +206,11 @@ func (db *satellitesDB) ListGracefulExits(ctx context.Context) (exitList []satel
return exitList, rows.Err()
}
// DeleteSatellite deletes the satellite from the database.
func (db *satellitesDB) DeleteSatellite(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, "DELETE FROM satellites WHERE node_id = ?", satelliteID)
return ErrSatellitesDB.Wrap(err)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/zeebo/errs"
"storj.io/common/fpath"
"storj.io/common/storj"
)
// Cache caches source information about trusted satellites.
@ -65,6 +66,20 @@ func (cache *Cache) Set(key string, entries []Entry) {
cache.data.Entries[key] = entries
}
// DeleteSatelliteEntry searches the cache for the provided satellite ID and removes it if
// found.
func (cache *Cache) DeleteSatelliteEntry(satelliteID storj.NodeID) (deleted bool) {
for s, entries := range cache.data.Entries {
for i, entry := range entries {
if entry.SatelliteURL.ID == satelliteID {
cache.data.Entries[s] = append(entries[:i], entries[i+1:]...)
return true
}
}
}
return false
}
// Save persists the cache to disk.
func (cache *Cache) Save(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -166,3 +166,45 @@ func TestCachePersistence(t *testing.T) {
}
}
func TestCache_DeleteSatelliteEntry(t *testing.T) {
url1, err := trust.ParseSatelliteURL("121RTSDpyNZVcEU84Ticf2L1ntiuUimbWgfATz21tuvgk3vzoA6@foo.test:7777")
require.NoError(t, err)
url2, err := trust.ParseSatelliteURL("12L9ZFwhzVpuEKMUNUqkaTLGzwY9G24tbiigLiXpmZWKwmcNDDs@b.bar.test:7777")
require.NoError(t, err)
entry1 := trust.Entry{
SatelliteURL: url1,
Authoritative: false,
}
entry2 := trust.Entry{
SatelliteURL: url2,
Authoritative: true,
}
entriesBefore := map[string][]trust.Entry{
"key": {entry1, entry2},
}
expectedEntriesAfter := map[string][]trust.Entry{
"key": {entry2},
}
ctx := testcontext.New(t)
defer ctx.Cleanup()
cachePath := ctx.File("cache.json")
require.NoError(t, trust.SaveCacheData(cachePath, &trust.CacheData{Entries: entriesBefore}))
cache, err := trust.LoadCache(cachePath)
require.NoError(t, err)
cache.DeleteSatelliteEntry(url1.ID)
require.NoError(t, cache.Save(ctx))
cacheAfter, err := trust.LoadCacheData(cachePath)
require.NoError(t, err)
require.Equal(t, &trust.CacheData{Entries: expectedEntriesAfter}, cacheAfter)
}

View File

@ -122,7 +122,16 @@ func (pool *Pool) Run(ctx context.Context) error {
}
for _, trustedSatellite := range pool.satellites {
if err := pool.satellitesDB.SetAddress(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address); err != nil {
status := satellites.Normal
// for cases where a satellite was previously marked as untrusted, but is now trusted
// we reset the status back to normal
satellite, err := pool.satellitesDB.GetSatellite(ctx, trustedSatellite.url.ID)
if err == nil && !satellite.SatelliteID.IsZero() {
if satellite.Status != satellites.Untrusted {
status = satellites.Status(satellite.Status)
}
}
if err := pool.satellitesDB.SetAddressAndStatus(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address, status); err != nil {
return err
}
}
@ -224,6 +233,10 @@ func (pool *Pool) Refresh(ctx context.Context) error {
if _, ok := trustedIDs[id]; !ok {
pool.log.Debug("Satellite is no longer trusted", zap.String("id", id.String()))
delete(pool.satellites, id)
err := pool.satellitesDB.UpdateSatelliteStatus(ctx, id, satellites.Untrusted)
if err != nil {
return err
}
}
}

View File

@ -19,6 +19,8 @@ import (
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
"storj.io/storj/storagenode/trust"
)
@ -29,172 +31,178 @@ func TestPoolRequiresCachePath(t *testing.T) {
}
func TestPoolVerifySatelliteID(t *testing.T) {
ctx, pool, source, _ := newPoolTest(t)
defer ctx.Cleanup()
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
pool, source, _ := newPoolTest(ctx, t, db)
defer ctx.Cleanup()
id := testrand.NodeID()
id := testrand.NodeID()
// Assert the ID is not trusted
err := pool.VerifySatelliteID(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
// Assert the ID is not trusted
err := pool.VerifySatelliteID(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
},
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
}
require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted
err = pool.VerifySatelliteID(context.Background(), id)
require.NoError(t, err)
// Assert the ID is now trusted
err = pool.VerifySatelliteID(context.Background(), id)
require.NoError(t, err)
// Refresh the pool after removing the trusted satellite
source.entries = nil
require.NoError(t, pool.Refresh(context.Background()))
// Refresh the pool after removing the trusted satellite
source.entries = nil
require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is no longer trusted
err = pool.VerifySatelliteID(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
// Assert the ID is no longer trusted
err = pool.VerifySatelliteID(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
})
}
func TestPoolGetSignee(t *testing.T) {
id := testrand.NodeID()
url := trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
}
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
id := testrand.NodeID()
url := trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
}
ctx, pool, source, resolver := newPoolTest(t)
defer ctx.Cleanup()
pool, source, resolver := newPoolTest(ctx, t, db)
defer ctx.Cleanup()
// ID is untrusted
_, err := pool.GetSignee(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
// ID is untrusted
_, err := pool.GetSignee(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{{SatelliteURL: url}}
require.NoError(t, pool.Refresh(context.Background()))
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{{SatelliteURL: url}}
require.NoError(t, pool.Refresh(context.Background()))
// Identity is uncached and resolving fails
_, err = pool.GetSignee(context.Background(), id)
require.EqualError(t, err, "trust: no identity")
// Identity is uncached and resolving fails
_, err = pool.GetSignee(context.Background(), id)
require.EqualError(t, err, "trust: no identity")
// Now make resolving succeed
identity := &identity.PeerIdentity{
ID: id,
Leaf: &x509.Certificate{},
}
resolver.SetIdentity(url.NodeURL(), identity)
signee, err := pool.GetSignee(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, id, signee.ID())
// Now make resolving succeed
identity := &identity.PeerIdentity{
ID: id,
Leaf: &x509.Certificate{},
}
resolver.SetIdentity(url.NodeURL(), identity)
signee, err := pool.GetSignee(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, id, signee.ID())
// Now make resolving fail but ensure we can still get the signee since
// the identity is cached.
resolver.SetIdentity(url.NodeURL(), nil)
signee, err = pool.GetSignee(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, id, signee.ID())
// Now make resolving fail but ensure we can still get the signee since
// the identity is cached.
resolver.SetIdentity(url.NodeURL(), nil)
signee, err = pool.GetSignee(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, id, signee.ID())
// Now update the address on the entry and assert that the identity is
// reset in the cache and needs to be refetched (and fails since we've
// hampered the resolver)
url.Host = "bar.test"
source.entries = []trust.Entry{{SatelliteURL: url}}
require.NoError(t, pool.Refresh(context.Background()))
_, err = pool.GetSignee(context.Background(), id)
require.EqualError(t, err, "trust: no identity")
// Now update the address on the entry and assert that the identity is
// reset in the cache and needs to be refetched (and fails since we've
// hampered the resolver)
url.Host = "bar.test"
source.entries = []trust.Entry{{SatelliteURL: url}}
require.NoError(t, pool.Refresh(context.Background()))
_, err = pool.GetSignee(context.Background(), id)
require.EqualError(t, err, "trust: no identity")
})
}
func TestPoolGetSatellites(t *testing.T) {
ctx, pool, source, _ := newPoolTest(t)
defer ctx.Cleanup()
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
pool, source, _ := newPoolTest(ctx, t, db)
defer ctx.Cleanup()
id1 := testrand.NodeID()
id2 := testrand.NodeID()
id1 := testrand.NodeID()
id2 := testrand.NodeID()
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id1,
Host: "foo.test",
Port: 7777,
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id1,
Host: "foo.test",
Port: 7777,
},
},
},
{
SatelliteURL: trust.SatelliteURL{
ID: id2,
Host: "bar.test",
Port: 7777,
{
SatelliteURL: trust.SatelliteURL{
ID: id2,
Host: "bar.test",
Port: 7777,
},
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
}
require.NoError(t, pool.Refresh(context.Background()))
expected := []storj.NodeID{id1, id2}
actual := pool.GetSatellites(context.Background())
assert.ElementsMatch(t, expected, actual)
expected := []storj.NodeID{id1, id2}
actual := pool.GetSatellites(context.Background())
assert.ElementsMatch(t, expected, actual)
})
}
func TestPoolGetAddress(t *testing.T) {
ctx, pool, source, _ := newPoolTest(t)
defer ctx.Cleanup()
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
pool, source, _ := newPoolTest(ctx, t, db)
defer ctx.Cleanup()
id := testrand.NodeID()
id := testrand.NodeID()
// Assert the ID is not trusted
nodeurl, err := pool.GetNodeURL(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
require.Empty(t, nodeurl)
// Assert the ID is not trusted
nodeurl, err := pool.GetNodeURL(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
require.Empty(t, nodeurl)
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
},
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
}
require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted and the correct address is returned
nodeurl, err = pool.GetNodeURL(context.Background(), id)
require.NoError(t, err)
require.Equal(t, id, nodeurl.ID)
require.Equal(t, "foo.test:7777", nodeurl.Address)
// Assert the ID is now trusted and the correct address is returned
nodeurl, err = pool.GetNodeURL(context.Background(), id)
require.NoError(t, err)
require.Equal(t, id, nodeurl.ID)
require.Equal(t, "foo.test:7777", nodeurl.Address)
// Refresh the pool with an updated trust entry with a new address
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "bar.test",
Port: 7777,
// Refresh the pool with an updated trust entry with a new address
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "bar.test",
Port: 7777,
},
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
}
require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted and the correct address is returned
nodeurl, err = pool.GetNodeURL(context.Background(), id)
require.NoError(t, err)
require.Equal(t, id, nodeurl.ID)
require.Equal(t, "bar.test:7777", nodeurl.Address)
// Assert the ID is now trusted and the correct address is returned
nodeurl, err = pool.GetNodeURL(context.Background(), id)
require.NoError(t, err)
require.Equal(t, id, nodeurl.ID)
require.Equal(t, "bar.test:7777", nodeurl.Address)
})
}
func newPoolTest(t *testing.T) (*testcontext.Context, *trust.Pool, *fakeSource, *fakeIdentityResolver) {
ctx := testcontext.New(t)
func newPoolTest(ctx *testcontext.Context, t *testing.T, db storagenode.DB) (*trust.Pool, *fakeSource, *fakeIdentityResolver) {
source := &fakeSource{}
resolver := newFakeIdentityResolver()
@ -203,13 +211,13 @@ func newPoolTest(t *testing.T) (*testcontext.Context, *trust.Pool, *fakeSource,
pool, err := trust.NewPool(log, resolver, trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
}, nil)
}, db.Satellites())
if err != nil {
ctx.Cleanup()
require.NoError(t, err)
}
return ctx, pool, source, resolver
return pool, source, resolver
}
type fakeIdentityResolver struct {