storagenode/satellites: address added, caching satellite's addresses from trust

Change-Id: Ica3eea5b8d81b176c6a4385fea803730b08ece16
This commit is contained in:
Qweder93 2021-07-08 12:14:15 +03:00 committed by Nikolai Siedov
parent a767aed591
commit 4d0fe39235
16 changed files with 211 additions and 17 deletions

View File

@ -34,6 +34,7 @@ func TestHeldAmountApi(t *testing.T) {
console := sno.Console
payoutsDB := sno.DB.Payout()
reputationDB := sno.DB.Reputation()
satellitesDB := sno.DB.Satellites()
baseURL := fmt.Sprintf("http://%s/api/heldamount", console.Listener.Addr())
// pause nodestats reputation cache because later tests assert a specific joinedat.
@ -362,6 +363,9 @@ func TestHeldAmountApi(t *testing.T) {
})
require.NoError(t, err)
err = satellitesDB.SetAddress(ctx, satellite.ID(), satellite.Addr())
require.NoError(t, err)
// should return all heldback history inserted earlier
url := fmt.Sprintf("%s/held-history", baseURL)
res, err := httpGet(ctx, url)

View File

@ -62,11 +62,11 @@ func TestPayoutsEndpointSummary(t *testing.T) {
}
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: trust.SatelliteURL{ID: satelliteID}})
trustPool, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig)
trustPool, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig, db.Satellites())
require.NoError(t, err)
require.NoError(t, trustPool.Refresh(ctx))
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), trustPool)
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil)
require.NoError(t, err)
estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool)
endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService)
@ -155,11 +155,11 @@ func TestPayoutsEndpointEstimations(t *testing.T) {
}
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: trust.SatelliteURL{ID: satelliteID}})
trustPool, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig)
trustPool, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig, db.Satellites())
require.NoError(t, err)
require.NoError(t, trustPool.Refresh(ctx))
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), trustPool)
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil)
require.NoError(t, err)
estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool)
endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService)
@ -226,11 +226,11 @@ func TestPayoutsUndistributedEndpoint(t *testing.T) {
}
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: trust.SatelliteURL{ID: satelliteID}})
trustPool, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig)
trustPool, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig, db.Satellites())
require.NoError(t, err)
require.NoError(t, trustPool.Refresh(ctx))
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), trustPool)
payoutsService, err := payouts.NewService(log, db.Payout(), db.Reputation(), db.Satellites(), nil)
require.NoError(t, err)
estimatedPayoutsService := estimatedpayouts.NewService(db.Bandwidth(), db.Reputation(), db.StorageUsage(), db.Pricing(), db.Satellites(), trustPool)
endpoint := multinode.NewPayoutEndpoint(log, service, db.Payout(), estimatedPayoutsService, payoutsService)

View File

@ -18,7 +18,6 @@ import (
"storj.io/storj/storagenode/payouts"
"storj.io/storj/storagenode/pricing"
"storj.io/storj/storagenode/reputation"
"storj.io/storj/storagenode/satellites"
"storj.io/storj/storagenode/storageusage"
"storj.io/storj/storagenode/trust"
)
@ -36,7 +35,6 @@ type CacheStorage struct {
StorageUsage storageusage.DB
Payout payouts.DB
Pricing pricing.DB
Satellites satellites.DB
}
// Cache runs cache loop and stores reputation stats and storage usage into db.

View File

@ -26,11 +26,12 @@ func TestServiceHeldAmountHistory(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
log := zaptest.NewLogger(t)
payoutsDB := db.Payout()
satellitesDB := db.Satellites()
source := &fakeSource{}
pool, err := trust.NewPool(log, newFakeIdentityResolver(), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
})
}, satellitesDB)
require.NoError(t, err)
satelliteID1 := testrand.NodeID()

View File

@ -375,7 +375,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
}
{ // setup trust pool
peer.Storage2.Trust, err = trust.NewPool(log.Named("trust"), trust.Dialer(peer.Dialer), config.Storage2.Trust)
peer.Storage2.Trust, err = trust.NewPool(log.Named("trust"), trust.Dialer(peer.Dialer), config.Storage2.Trust, peer.DB.Satellites())
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -606,7 +606,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
StorageUsage: peer.DB.StorageUsage(),
Payout: peer.DB.Payout(),
Pricing: peer.DB.Pricing(),
Satellites: peer.DB.Satellites(),
},
peer.NodeStats.Service,
peer.Payout.Endpoint,

View File

@ -413,7 +413,7 @@ func TestTrashAndRestore(t *testing.T) {
for _, satelliteURL := range satelliteURLs {
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: satelliteURL})
}
trust, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig)
trust, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig, db.Satellites())
require.NoError(t, err)
require.NoError(t, trust.Refresh(ctx))

View File

@ -113,7 +113,7 @@ func TestLocalTime_OutOfSync(t *testing.T) {
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
})
}, nil)
require.NoError(t, err)
err = pool.Refresh(ctx)
require.NoError(t, err)
@ -169,7 +169,7 @@ func TestLocalTime_OutOfSync(t *testing.T) {
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
})
}, nil)
require.NoError(t, err)
err = pool.Refresh(ctx)
require.NoError(t, err)

View File

@ -0,0 +1,36 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package satellites_test
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
func TestSatellitesDB(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
satellitesDB := db.Satellites()
id := testrand.NodeID()
err := satellitesDB.SetAddress(ctx, id, "test_addr1")
require.NoError(t, err)
satellites, err := satellitesDB.GetSatellitesUrls(ctx)
require.NoError(t, err)
require.Equal(t, satellites[0].Address, "test_addr1")
err = satellitesDB.SetAddress(ctx, id, "test_addr2")
require.NoError(t, err)
satellites, err = satellitesDB.GetSatellitesUrls(ctx)
require.NoError(t, err)
require.Equal(t, satellites[0].Address, "test_addr2")
})
}

View File

@ -48,8 +48,12 @@ type Satellite struct {
//
// architecture: Database
type DB interface {
// SetAddress inserts into satellite's db id, address.
SetAddress(ctx context.Context, satelliteID storj.NodeID, address string) error
// GetSatellite retrieves that satellite by ID
GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite Satellite, err error)
// GetSatellitesUrls retrieves all satellite's id and urls.
GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error)
// InitiateGracefulExit updates the database to reflect the beginning of a graceful exit
InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) error
// CancelGracefulExit removes that satellite by ID

View File

@ -1987,6 +1987,15 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
`ALTER TABLE reputation ADD COLUMN vetted_at TIMESTAMP`,
},
},
{
DB: &db.satellitesDB.DB,
Description: "Add address to satellites, inserts stefan-benten satellite into satellites db",
Version: 53,
Action: migrate.SQL{
`ALTER TABLE satellites ADD COLUMN address TEXT;
UPDATE satellites SET address = 'satellite.stefan-benten.de:7777' WHERE node_id = X'004ae89e970e703df42ba4ab1416a3b30b7e1d8e14aa0e558f7ee26800000000'`,
},
},
},
}
}

View File

@ -25,6 +25,21 @@ type satellitesDB struct {
dbContainerImpl
}
// SetAddress inserts into satellite's db id, address, added time.
func (db *satellitesDB) SetAddress(ctx context.Context, satelliteID storj.NodeID, address string) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx,
`INSERT INTO satellites (node_id, address, added_at, status) VALUES(?,?,?,?) ON CONFLICT (node_id) DO UPDATE SET address = EXCLUDED.address`,
satelliteID,
address,
time.Now().UTC(),
satellites.Normal,
)
return ErrSatellitesDB.Wrap(err)
}
// GetSatellite retrieves that satellite by ID.
func (db *satellitesDB) GetSatellite(ctx context.Context, satelliteID storj.NodeID) (satellite satellites.Satellite, err error) {
defer mon.Task()(&ctx)(&err)
@ -44,6 +59,40 @@ func (db *satellitesDB) GetSatellite(ctx context.Context, satelliteID storj.Node
return satellite, rows.Err()
}
// GetSatellitesUrls retrieves all satellite's id and urls.
func (db *satellitesDB) GetSatellitesUrls(ctx context.Context) (satelliteURLs []storj.NodeURL, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT
node_id,
address
FROM satellites`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
var urls []storj.NodeURL
for rows.Next() {
var url storj.NodeURL
err := rows.Scan(&url.ID, &url.Address)
if err != nil {
return nil, ErrPayout.Wrap(err)
}
urls = append(urls, url)
}
if err = rows.Err(); err != nil {
return nil, ErrPayout.Wrap(err)
}
return urls, nil
}
// InitiateGracefulExit updates the database to reflect the beginning of a graceful exit.
func (db *satellitesDB) InitiateGracefulExit(ctx context.Context, satelliteID storj.NodeID, intitiatedAt time.Time, startingDiskUsage int64) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -667,6 +667,11 @@ func Schema() map[string]*dbschema.Schema {
Type: "TIMESTAMP",
IsNullable: false,
},
&dbschema.Column{
Name: "address",
Type: "TEXT",
IsNullable: true,
},
&dbschema.Column{
Name: "node_id",
Type: "BLOB",
@ -729,3 +734,4 @@ func Schema() map[string]*dbschema.Schema {
"used_serial": &dbschema.Schema{},
}
}

View File

@ -67,6 +67,7 @@ var States = MultiDBStates{
&v50,
&v51,
&v52,
&v53,
},
}

View File

@ -0,0 +1,77 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v53 = MultiDBState{
Version: 53,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: v52.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v52.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: &DBState{
SQL: `
-- table to store nodestats cache
CREATE TABLE reputation (
satellite_id BLOB NOT NULL,
audit_success_count INTEGER NOT NULL,
audit_total_count INTEGER NOT NULL,
audit_reputation_alpha REAL NOT NULL,
audit_reputation_beta REAL NOT NULL,
audit_reputation_score REAL NOT NULL,
audit_unknown_reputation_alpha REAL NOT NULL,
audit_unknown_reputation_beta REAL NOT NULL,
audit_unknown_reputation_score REAL NOT NULL,
online_score REAL NOT NULL,
audit_history BLOB,
disqualified_at TIMESTAMP,
updated_at TIMESTAMP NOT NULL,
suspended_at TIMESTAMP,
offline_suspended_at TIMESTAMP,
offline_under_review_at TIMESTAMP,
vetted_at TIMESTAMP,
joined_at TIMESTAMP NOT NULL,
PRIMARY KEY (satellite_id)
);
INSERT INTO reputation (satellite_id, audit_success_count, audit_total_count, audit_reputation_alpha, audit_reputation_beta, audit_reputation_score, audit_unknown_reputation_alpha, audit_unknown_reputation_beta, audit_unknown_reputation_score, online_score, audit_history, disqualified_at, updated_at, suspended_at, offline_suspended_at, offline_under_review_at, vetted_at, joined_at) VALUES
(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000', 1, 1, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, NULL, '2019-07-19 20:00:00+00:00', '2019-08-23 20:00:00+00:00', NULL, NULL, NULL, NULL, '1970-01-01 00:00:00+00:00'),
(X'953fdf144a088a4116a1f6acfc8475c78278c018849db050d894a89572e56d00', 1, 1, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, NULL, '2019-07-19 20:00:00+00:00', '2019-08-23 20:00:00+00:00', NULL, NULL, NULL, '2019-06-25 20:00:00+00:00', '1970-01-01 00:00:00+00:00'),
(X'1a438a44e3cc9ab9faaacc1c034339f0ebec05f310f0ba270414dac753882f00', 1, 1, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, NULL, NULL, '2019-08-23 20:00:00+00:00', NULL, NULL, NULL, NULL, '1970-01-01 00:00:00+00:00');
`,
},
storagenodedb.PieceSpaceUsedDBName: v52.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v52.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v52.DBStates[storagenodedb.PieceExpirationDBName],
storagenodedb.OrdersDBName: v52.DBStates[storagenodedb.OrdersDBName],
storagenodedb.BandwidthDBName: v52.DBStates[storagenodedb.BandwidthDBName],
storagenodedb.SatellitesDBName: &DBState{
SQL: `
CREATE TABLE satellites (
node_id BLOB NOT NULL,
address TEXT,
added_at TIMESTAMP NOT NULL,
status INTEGER NOT NULL,
PRIMARY KEY (node_id)
);
CREATE TABLE satellite_exit_progress (
satellite_id BLOB NOT NULL,
initiated_at TIMESTAMP,
finished_at TIMESTAMP,
starting_disk_usage INTEGER NOT NULL,
bytes_deleted INTEGER NOT NULL,
completion_receipt BLOB,
FOREIGN KEY (satellite_id) REFERENCES satellites (node_id)
);
INSERT INTO satellites (node_id, added_at, status) VALUES
(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000', '2019-09-10 20:00:00+00:00', 0);
INSERT INTO satellite_exit_progress VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000','2019-09-10 20:00:00+00:00', null, 100, 0, null);
`,
},
storagenodedb.DeprecatedInfoDBName: v52.DBStates[storagenodedb.DeprecatedInfoDBName],
storagenodedb.NotificationsDBName: v52.DBStates[storagenodedb.NotificationsDBName],
storagenodedb.HeldAmountDBName: v52.DBStates[storagenodedb.HeldAmountDBName],
storagenodedb.PricingDBName: v52.DBStates[storagenodedb.PricingDBName],
storagenodedb.APIKeysDBName: v52.DBStates[storagenodedb.APIKeysDBName],
},
}

View File

@ -19,6 +19,7 @@ import (
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/satellites"
)
// Error is the default error class.
@ -68,6 +69,8 @@ type Pool struct {
listMu sync.Mutex
list *List
satellitesDB satellites.DB
satellitesMu sync.RWMutex
satellites map[storj.NodeID]*satelliteInfoCache
}
@ -80,7 +83,7 @@ type satelliteInfoCache struct {
}
// NewPool creates a new trust pool of the specified list of trusted satellites.
func NewPool(log *zap.Logger, resolver IdentityResolver, config Config) (*Pool, error) {
func NewPool(log *zap.Logger, resolver IdentityResolver, config Config, satellitesDB satellites.DB) (*Pool, error) {
// TODO: preload all satellite peer identities
cache, err := LoadCache(config.CachePath)
@ -98,6 +101,7 @@ func NewPool(log *zap.Logger, resolver IdentityResolver, config Config) (*Pool,
resolver: resolver,
refreshInterval: config.RefreshInterval,
list: list,
satellitesDB: satellitesDB,
satellites: make(map[storj.NodeID]*satelliteInfoCache),
}, nil
}
@ -115,6 +119,12 @@ func (pool *Pool) Run(ctx context.Context) error {
pool.log.Error("Failed to refresh", zap.Error(err))
return err
}
for _, trustedSatellite := range pool.satellites {
if err := pool.satellitesDB.SetAddress(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address); err != nil {
return err
}
}
}
}

View File

@ -24,7 +24,7 @@ import (
func TestPoolRequiresCachePath(t *testing.T) {
log := zaptest.NewLogger(t)
_, err := trust.NewPool(log, newFakeIdentityResolver(), trust.Config{})
_, err := trust.NewPool(log, newFakeIdentityResolver(), trust.Config{}, nil)
require.EqualError(t, err, "trust: cache path cannot be empty")
}
@ -203,7 +203,7 @@ func newPoolTest(t *testing.T) (*testcontext.Context, *trust.Pool, *fakeSource,
pool, err := trust.NewPool(log, resolver, trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
})
}, nil)
if err != nil {
ctx.Cleanup()
require.NoError(t, err)