storagenode/trust: wire up list into pool

- also updated ping chore to pick up trust changes
- fixed small typo in blueprint
- fixed flags for storj-sim
- wired up changes to testplanet

Change-Id: I02982f3a63a1b4150b82a009ee126b25ed51917d
This commit is contained in:
Andrew Harding 2019-11-15 17:59:32 -07:00
parent 9ac2c4d815
commit cb89496569
15 changed files with 531 additions and 170 deletions

View File

@ -562,7 +562,7 @@ func newNetwork(flags *Flags) (*Processes, error) {
}
process.Arguments["setup"] = append(process.Arguments["setup"],
"--storage.whitelisted-satellites", strings.Join(whitelisted, ","),
"--storage2.trust.sources", strings.Join(whitelisted, ","),
)
return nil
}

View File

@ -233,7 +233,7 @@ And the following `untrusted` list:
```
quz.test
2@qiz.test:7777
5
5@
```
The `trusted` list is pruned with the `untrusted` list, leaving the following `trusted` list:

View File

@ -32,6 +32,7 @@ import (
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/retain"
"storj.io/storj/storagenode/storagenodedb"
"storj.io/storj/storagenode/trust"
)
// newStorageNodes initializes storage nodes
@ -43,6 +44,15 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
}
}()
var sources []trust.Source
for _, u := range whitelistedSatellites {
source, err := trust.NewStaticURLSource(u.String())
if err != nil {
return nil, err
}
sources = append(sources, source)
}
for i := 0; i < count; i++ {
prefix := "storage" + strconv.Itoa(i)
log := planet.log.Named(prefix)
@ -82,7 +92,6 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
AllocatedDiskSpace: 1 * memory.GB,
AllocatedBandwidth: memory.TB,
KBucketRefreshInterval: defaultInterval,
WhitelistedSatellites: whitelistedSatellites,
},
Collector: collector.Config{
Interval: defaultInterval,
@ -111,6 +120,11 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
MinimumBandwidth: 100 * memory.MB,
MinimumDiskSpace: 100 * memory.MB,
},
Trust: trust.Config{
Sources: sources,
CachePath: filepath.Join(storageDir, "trust-cache.json"),
RefreshInterval: defaultInterval,
},
},
Retain: retain.Config{
Status: retain.Enabled,

View File

@ -46,6 +46,7 @@ fi
# setup the network
PATH=$RELEASE_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network --postgres=$STORJ_SIM_POSTGRES setup
# run upload part of backward compatibility tests from the lastest release branch
PATH=$RELEASE_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-backwards.sh upload
@ -79,6 +80,15 @@ ln $RELEASE_DIR/bin/storagenode `storj-sim network env STORAGENODE_2_DIR`/storag
ln $RELEASE_DIR/bin/storagenode `storj-sim network env STORAGENODE_3_DIR`/storagenode
ln $RELEASE_DIR/bin/storagenode `storj-sim network env STORAGENODE_4_DIR`/storagenode
# upgrade the trust configuration on the other half as the old configuration is
# most certainly not being used outside of test environments and is not
# backwards compatible (i.e. ignored)
sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" `storj-sim network env STORAGENODE_5_DIR`/config.yaml
sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" `storj-sim network env STORAGENODE_6_DIR`/config.yaml
sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" `storj-sim network env STORAGENODE_7_DIR`/config.yaml
sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" `storj-sim network env STORAGENODE_8_DIR`/config.yaml
sed -i -e "s#storage.whitelisted-satellites#storage2.trust.sources#g" `storj-sim network env STORAGENODE_9_DIR`/config.yaml
# run download part of backward compatibility tests from the current branch, using new uplink
PATH=$BRANCH_DIR/bin:$PATH storj-sim -x --host $STORJ_NETWORK_HOST4 network test bash "$SCRIPTDIR"/test-backwards.sh download

View File

@ -30,7 +30,7 @@ type Chore struct {
trust *trust.Pool
mu sync.Mutex
cycles []*sync2.Cycle
cycles map[storj.NodeID]*sync2.Cycle
started sync2.Fence
interval time.Duration
}
@ -50,6 +50,7 @@ func NewChore(log *zap.Logger, interval time.Duration, trust *trust.Pool, dialer
trust: trust,
cycles: make(map[storj.NodeID]*sync2.Cycle),
interval: interval,
}
}
@ -65,22 +66,64 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
return ctx.Err()
}
// configure the satellite ping cycles
chore.updateCycles(ctx, &group, chore.trust.GetSatellites(ctx))
// set up a cycle to update ping cycles on a frequent interval
refreshCycle := sync2.NewCycle(time.Minute)
refreshCycle.Start(ctx, &group, func(ctx context.Context) error {
chore.updateCycles(ctx, &group, chore.trust.GetSatellites(ctx))
return nil
})
defer refreshCycle.Close()
chore.started.Release()
return group.Wait()
}
func (chore *Chore) updateCycles(ctx context.Context, group *errgroup.Group, satellites []storj.NodeID) {
chore.mu.Lock()
for _, satellite := range chore.trust.GetSatellites(ctx) {
satellite := satellite
defer chore.mu.Unlock()
trustedIDs := make(map[storj.NodeID]struct{})
for _, satellite := range satellites {
satellite := satellite // alias the loop var since it is captured below
trustedIDs[satellite] = struct{}{}
if _, ok := chore.cycles[satellite]; ok {
// Ping cycle has already been started for this satellite
continue
}
// Set up a new ping cycle for the newly trusted satellite
chore.log.Debug("Starting cycle", zap.Stringer("Satellite ID", satellite))
cycle := sync2.NewCycle(chore.interval)
chore.cycles = append(chore.cycles, cycle)
chore.cycles[satellite] = cycle
cycle.Start(ctx, group, func(ctx context.Context) error {
return chore.pingSatellite(ctx, satellite)
})
}
cycle.Start(ctx, &group, func(ctx context.Context) error {
chore.log.Debug("starting cycle", zap.Stringer("Satellite ID", satellite))
// Stop the ping cycle for satellites that are no longer trusted
for satellite, cycle := range chore.cycles {
if _, ok := trustedIDs[satellite]; !ok {
chore.log.Debug("Stopping cycle", zap.Stringer("Satellite ID", satellite))
cycle.Close()
delete(chore.cycles, satellite)
}
}
}
func (chore *Chore) pingSatellite(ctx context.Context, satellite storj.NodeID) error {
interval := initialBackOff
attempts := 0
for {
mon.Meter("satellite_contact_request").Mark(1) //locked
err := chore.pingSatellite(ctx, satellite)
err := chore.pingSatelliteOnce(ctx, satellite)
attempts++
if err == nil {
return nil
@ -98,14 +141,10 @@ func (chore *Chore) Run(ctx context.Context) (err error) {
return nil
}
}
})
}
chore.mu.Unlock()
chore.started.Release()
return group.Wait()
}
func (chore *Chore) pingSatellite(ctx context.Context, id storj.NodeID) (err error) {
func (chore *Chore) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx, id)(&err)
self := chore.service.Local()

View File

@ -201,7 +201,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
}
{ // setup trust pool
peer.Storage2.Trust, err = trust.NewPool(peer.Dialer, config.Storage.WhitelistedSatellites)
peer.Storage2.Trust, err = trust.NewPool(log.Named("trust"), trust.Dialer(peer.Dialer), config.Storage2.Trust)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
@ -424,6 +424,12 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
func (peer *Peer) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// Refresh the trust pool first. It will be updated periodically via
// Run() below.
if err := peer.Storage2.Trust.Refresh(ctx); err != nil {
return err
}
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
@ -453,6 +459,9 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Bandwidth.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(peer.Storage2.Trust.Run(ctx))
})
group.Go(func() error {
// TODO: move the message into Server instead

View File

@ -326,9 +326,15 @@ func TestTrashAndRestore(t *testing.T) {
store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil)
tStore := &pieces.StoreForTest{store}
var satelliteURLs storj.NodeURLs
for _, satellite := range satellites {
satelliteURLs = append(satelliteURLs, storj.NodeURL{ID: satellite.satelliteID})
var satelliteURLs []trust.SatelliteURL
for i, satellite := range satellites {
// host:port pair must be unique or the trust pool will aggregate
// them into a single entry with the first one "winning".
satelliteURLs = append(satelliteURLs, trust.SatelliteURL{
ID: satellite.satelliteID,
Host: "localhost",
Port: i,
})
now := time.Now()
for _, piece := range satellite.pieces {
// If test has expiration, add to expiration db
@ -402,9 +408,18 @@ func TestTrashAndRestore(t *testing.T) {
}
}
// Empty trash by running the chore once
trust, err := trust.NewPool(rpc.Dialer{}, satelliteURLs)
// Initialize a trust pool
poolConfig := trust.Config{
CachePath: ctx.File("trust-cache.json"),
}
for _, satelliteURL := range satelliteURLs {
poolConfig.Sources = append(poolConfig.Sources, &trust.StaticURLSource{URL: satelliteURL})
}
trust, err := trust.NewPool(zaptest.NewLogger(t), trust.Dialer(rpc.Dialer{}), poolConfig)
require.NoError(t, err)
require.NoError(t, trust.Refresh(ctx))
// Empty trash by running the chore once
trashDur := 4 * 24 * time.Hour
chore := pieces.NewTrashChore(zaptest.NewLogger(t), 24*time.Hour, trashDur, trust, store)
go func() {

View File

@ -49,7 +49,7 @@ var _ pb.PiecestoreServer = (*Endpoint)(nil)
// OldConfig contains everything necessary for a server
type OldConfig struct {
Path string `help:"path to store data in" default:"$CONFDIR/storage"`
WhitelistedSatellites storj.NodeURLs `help:"a comma-separated list of approved satellite node urls" devDefault:"" releaseDefault:"12EayRS2V1kEsWESU9QMRseFhdxYxKicsiFmxrsLZHeLUtdps3S@us-central-1.tardigrade.io:7777,118UWpMCHzs6CvSgWd9BfFVjw5K9pZbJjkfZJexMtSkmKxvvAW@satellite.stefan-benten.de:7777,121RTSDpyNZVcEU84Ticf2L1ntiuUimbWgfATz21tuvgk3vzoA6@asia-east-1.tardigrade.io:7777,12L9ZFwhzVpuEKMUNUqkaTLGzwY9G24tbiigLiXpmZWKwmcNDDs@europe-west-1.tardigrade.io:7777"`
WhitelistedSatellites storj.NodeURLs `help:"a comma-separated list of approved satellite node urls (unused)" devDefault:"" releaseDefault:""`
AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"`
AllocatedBandwidth memory.Size `user:"true" help:"total allocated bandwidth in bytes" default:"2TB"`
KBucketRefreshInterval time.Duration `help:"how frequently Kademlia bucket should be refreshed with node stats" default:"1h0m0s"`
@ -64,6 +64,8 @@ type Config struct {
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s"`
Trust trust.Config
Monitor monitor.Config
Orders orders.Config
}

View File

@ -36,7 +36,7 @@ func LoadCache(path string) (*Cache, error) {
data, err := LoadCacheData(path)
switch {
case err == nil:
case os.IsNotExist(errs.Unwrap(err)):
case errs.IsFunc(err, os.IsNotExist):
data = NewCacheData()
default:
return nil, err

View File

@ -5,30 +5,71 @@ package trust
import (
"context"
"math/rand"
"sort"
"sync"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/sync2"
)
// Error is the default error class
var Error = errs.Class("trust")
var (
Error = errs.Class("trust")
var mon = monkit.Package()
mon = monkit.Package()
)
// IdentityResolver resolves peer identities from a node URL
type IdentityResolver interface {
// ResolveIdentity returns the peer identity of the peer located at the Node URL
ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error)
}
// IdentityResolverFunc is a convenience type for implementing IdentityResolver using a
// function literal.
type IdentityResolverFunc func(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error)
// ResolveIdentity returns the peer identity of the peer located at the Node URL
func (fn IdentityResolverFunc) ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) {
return fn(ctx, url)
}
// Dialer implements an IdentityResolver using an RPC dialer
func Dialer(dialer rpc.Dialer) IdentityResolver {
return IdentityResolverFunc(func(ctx context.Context, url storj.NodeURL) (_ *identity.PeerIdentity, err error) {
defer mon.Task()(&ctx)(&err)
conn, err := dialer.DialAddressID(ctx, url.Address, url.ID)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, conn.Close()) }()
return conn.PeerIdentity()
})
}
// Pool implements different peer verifications.
//
// architecture: Service
type Pool struct {
mu sync.RWMutex
dialer rpc.Dialer
log *zap.Logger
resolver IdentityResolver
refreshInterval time.Duration
trustedSatellites map[storj.NodeID]*satelliteInfoCache
listMu sync.Mutex
list *List
satellitesMu sync.RWMutex
satellites map[storj.NodeID]*satelliteInfoCache
}
// satelliteInfoCache caches identity information about a satellite
@ -39,34 +80,50 @@ type satelliteInfoCache struct {
}
// NewPool creates a new trust pool of the specified list of trusted satellites.
func NewPool(dialer rpc.Dialer, trustedSatellites storj.NodeURLs) (*Pool, error) {
func NewPool(log *zap.Logger, resolver IdentityResolver, config Config) (*Pool, error) {
// TODO: preload all satellite peer identities
// parse the comma separated list of approved satellite IDs into an array of storj.NodeIDs
trusted := make(map[storj.NodeID]*satelliteInfoCache)
cache, err := LoadCache(config.CachePath)
if err != nil {
return nil, err
}
for _, node := range trustedSatellites {
trusted[node.ID] = &satelliteInfoCache{url: node}
list, err := NewList(log, config.Sources, config.Exclusions.Rules, cache)
if err != nil {
return nil, err
}
return &Pool{
dialer: dialer,
trustedSatellites: trusted,
log: log,
resolver: resolver,
refreshInterval: config.RefreshInterval,
list: list,
satellites: make(map[storj.NodeID]*satelliteInfoCache),
}, nil
}
// Run periodically refreshes the pool. The initial refresh is intended to
// happen before run is call. Therefore Run does not refresh right away.
func (pool *Pool) Run(ctx context.Context) error {
for {
refreshAfter := jitter(pool.refreshInterval)
pool.log.Info("Scheduling next refresh", zap.Duration("after", refreshAfter))
if !sync2.Sleep(ctx, refreshAfter) {
return ctx.Err()
}
if err := pool.Refresh(ctx); err != nil {
pool.log.Error("Failed to refresh", zap.Error(err))
return err
}
}
}
// VerifySatelliteID checks whether id corresponds to a trusted satellite.
func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
pool.mu.RLock()
defer pool.mu.RUnlock()
_, ok := pool.trustedSatellites[id]
if !ok {
return Error.New("satellite %q is untrusted", id)
}
return nil
_, err = pool.getInfo(id)
return err
}
// GetSignee gets the corresponding signee for verifying signatures.
@ -74,20 +131,16 @@ func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) (err e
func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (_ signing.Signee, err error) {
defer mon.Task()(&ctx)(&err)
// lookup peer identity with id
pool.mu.RLock()
info, ok := pool.trustedSatellites[id]
pool.mu.RUnlock()
if !ok {
return nil, Error.New("signee %q is untrusted", id)
info, err := pool.getInfo(id)
if err != nil {
return nil, err
}
info.mu.Lock()
defer info.mu.Unlock()
if info.identity == nil {
identity, err := pool.FetchPeerIdentity(ctx, info.url)
identity, err := pool.resolver.ResolveIdentity(ctx, info.url)
if err != nil {
return nil, Error.Wrap(err)
}
@ -97,23 +150,13 @@ func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (_ signing.Sig
return signing.SigneeFromPeerIdentity(info.identity), nil
}
// FetchPeerIdentity dials the url and fetches the identity.
func (pool *Pool) FetchPeerIdentity(ctx context.Context, url storj.NodeURL) (_ *identity.PeerIdentity, err error) {
conn, err := pool.dialer.DialAddressID(ctx, url.Address, url.ID)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, conn.Close()) }()
return conn.PeerIdentity()
}
// GetSatellites returns a slice containing all trusted satellites
func (pool *Pool) GetSatellites(ctx context.Context) (satellites []storj.NodeID) {
defer mon.Task()(&ctx)(nil)
for sat := range pool.trustedSatellites {
for sat := range pool.satellites {
satellites = append(satellites, sat)
}
sort.Sort(storj.NodeIDList(satellites))
return satellites
}
@ -121,12 +164,86 @@ func (pool *Pool) GetSatellites(ctx context.Context) (satellites []storj.NodeID)
func (pool *Pool) GetAddress(ctx context.Context, id storj.NodeID) (_ string, err error) {
defer mon.Task()(&ctx)(&err)
pool.mu.RLock()
defer pool.mu.RUnlock()
info, ok := pool.trustedSatellites[id]
if !ok {
return "", Error.New("ID %v not found in trusted list", id)
info, err := pool.getInfo(id)
if err != nil {
return "", err
}
return info.url.Address, nil
}
// Refresh refreshes the set of trusted satellites in the pool. Concurrent
// callers will be synchronized so only one proceeds at a time.
func (pool *Pool) Refresh(ctx context.Context) error {
urls, err := pool.fetchURLs(ctx)
if err != nil {
return err
}
pool.satellitesMu.Lock()
defer pool.satellitesMu.Unlock()
// add/update trusted IDs
trustedIDs := make(map[storj.NodeID]struct{})
for _, url := range urls {
trustedIDs[url.ID] = struct{}{}
info, ok := pool.satellites[url.ID]
if !ok {
info = &satelliteInfoCache{
url: url,
}
pool.log.Debug("Satellite is trusted", zap.String("id", url.ID.String()))
pool.satellites[url.ID] = info
}
// update the URL address and reset the identity if it changed
if info.url.Address != url.Address {
pool.log.Debug("Satellite address updated; identity cache purged",
zap.String("id", url.ID.String()),
zap.String("old", info.url.Address),
zap.String("new", url.Address),
)
info.url.Address = url.Address
info.identity = nil
}
}
// remove trusted IDs that are no longer in the URL list
for id := range pool.satellites {
if _, ok := trustedIDs[id]; !ok {
pool.log.Debug("Satellite is no longer trusted", zap.String("id", id.String()))
delete(pool.satellites, id)
}
}
return nil
}
func (pool *Pool) getInfo(id storj.NodeID) (*satelliteInfoCache, error) {
pool.satellitesMu.RLock()
defer pool.satellitesMu.RUnlock()
info, ok := pool.satellites[id]
if !ok {
return nil, Error.New("satellite %q is untrusted", id)
}
return info, nil
}
func (pool *Pool) fetchURLs(ctx context.Context) ([]storj.NodeURL, error) {
// Typically there will only be one caller of refresh (i.e. Run()) but
// if at some point we might want on-demand refresh, and *List is designed
// to be used by a single goroutine (don't want multiple callers racing
// on the cache, etc).
pool.listMu.Lock()
defer pool.listMu.Unlock()
return pool.list.FetchURLs(ctx)
}
func jitter(t time.Duration) time.Duration {
nanos := rand.NormFloat64()*float64(t/4) + float64(t)
if nanos <= 0 {
nanos = 1
}
return time.Duration(nanos)
}

View File

@ -5,80 +5,235 @@ package trust_test
import (
"context"
"crypto/x509"
"errors"
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"go.uber.org/zap/zaptest"
"storj.io/storj/private/errs2"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/private/testrand"
"storj.io/storj/storagenode/trust"
)
func TestGetSignee(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
trust := planet.StorageNodes[0].Storage2.Trust
canceledContext, cancel := context.WithCancel(ctx)
cancel()
var group errgroup.Group
group.Go(func() error {
_, err := trust.GetSignee(canceledContext, planet.Satellites[0].ID())
if errs2.IsCanceled(err) {
return nil
}
// if the other goroutine races us,
// then we might get the certificate from the cache, however we shouldn't get an error
return err
})
group.Go(func() error {
cert, err := trust.GetSignee(ctx, planet.Satellites[0].ID())
if err != nil {
return err
}
if cert == nil {
return errors.New("didn't get certificate")
}
return nil
})
assert.NoError(t, group.Wait())
})
func TestPoolRequiresCachePath(t *testing.T) {
log := zaptest.NewLogger(t)
_, err := trust.NewPool(log, newFakeIdentityResolver(), trust.Config{})
require.EqualError(t, err, "trust: cache path cannot be empty")
}
func TestGetAddress(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 5, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
func TestPoolVerifySatelliteID(t *testing.T) {
ctx, pool, source, _ := newPoolTest(t)
defer ctx.Cleanup()
// test address is stored correctly
for _, sat := range planet.Satellites {
address, err := planet.StorageNodes[0].Storage2.Trust.GetAddress(ctx, sat.ID())
id := testrand.NodeID()
// Assert the ID is not trusted
err := pool.VerifySatelliteID(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted
err = pool.VerifySatelliteID(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, sat.Addr(), address)
}
var group errgroup.Group
// Refresh the pool after removing the trusted satellite
source.entries = nil
require.NoError(t, pool.Refresh(context.Background()))
// test parallel reads
for i := 0; i < 10; i++ {
group.Go(func() error {
address, err := planet.StorageNodes[0].Storage2.Trust.GetAddress(ctx, planet.Satellites[0].ID())
if err != nil {
return err
}
assert.Equal(t, planet.Satellites[0].Addr(), address)
return nil
})
}
assert.NoError(t, group.Wait())
})
// Assert the ID is no longer trusted
err = pool.VerifySatelliteID(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
}
func TestPoolGetSignee(t *testing.T) {
id := testrand.NodeID()
url := trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
}
ctx, pool, source, resolver := newPoolTest(t)
defer ctx.Cleanup()
// ID is untrusted
_, err := pool.GetSignee(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{{SatelliteURL: url}}
require.NoError(t, pool.Refresh(context.Background()))
// Identity is uncached and resolving fails
_, err = pool.GetSignee(context.Background(), id)
require.EqualError(t, err, "trust: no identity")
// Now make resolving succeed
identity := &identity.PeerIdentity{
ID: id,
Leaf: &x509.Certificate{},
}
resolver.SetIdentity(url.NodeURL(), identity)
signee, err := pool.GetSignee(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, id, signee.ID())
// Now make resolving fail but ensure we can still get the signee since
// the identity is cached.
resolver.SetIdentity(url.NodeURL(), nil)
signee, err = pool.GetSignee(context.Background(), id)
require.NoError(t, err)
assert.Equal(t, id, signee.ID())
// Now update the address on the entry and assert that the identity is
// reset in the cache and needs to be refetched (and fails since we've
// hampered the resolver)
url.Host = "bar.test"
source.entries = []trust.Entry{{SatelliteURL: url}}
require.NoError(t, pool.Refresh(context.Background()))
_, err = pool.GetSignee(context.Background(), id)
require.EqualError(t, err, "trust: no identity")
}
func TestPoolGetSatellites(t *testing.T) {
ctx, pool, source, _ := newPoolTest(t)
defer ctx.Cleanup()
id1 := testrand.NodeID()
id2 := testrand.NodeID()
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id1,
Host: "foo.test",
Port: 7777,
},
},
{
SatelliteURL: trust.SatelliteURL{
ID: id2,
Host: "bar.test",
Port: 7777,
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
expected := []storj.NodeID{id1, id2}
actual := pool.GetSatellites(context.Background())
assert.ElementsMatch(t, expected, actual)
}
func TestPoolGetAddress(t *testing.T) {
ctx, pool, source, _ := newPoolTest(t)
defer ctx.Cleanup()
id := testrand.NodeID()
// Assert the ID is not trusted
address, err := pool.GetAddress(context.Background(), id)
require.EqualError(t, err, fmt.Sprintf("trust: satellite %q is untrusted", id))
require.Empty(t, address)
// Refresh the pool with the new trust entry
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "foo.test",
Port: 7777,
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted and the correct address is returned
address, err = pool.GetAddress(context.Background(), id)
require.NoError(t, err)
require.Equal(t, "foo.test:7777", address)
// Refresh the pool with an updated trust entry with a new address
source.entries = []trust.Entry{
{
SatelliteURL: trust.SatelliteURL{
ID: id,
Host: "bar.test",
Port: 7777,
},
},
}
require.NoError(t, pool.Refresh(context.Background()))
// Assert the ID is now trusted and the correct address is returned
address, err = pool.GetAddress(context.Background(), id)
require.NoError(t, err)
require.Equal(t, "bar.test:7777", address)
}
func newPoolTest(t *testing.T) (*testcontext.Context, *trust.Pool, *fakeSource, *fakeIdentityResolver) {
ctx := testcontext.New(t)
source := &fakeSource{}
resolver := newFakeIdentityResolver()
log := zaptest.NewLogger(t)
pool, err := trust.NewPool(log, resolver, trust.Config{
Sources: []trust.Source{source},
CachePath: ctx.File("trust-cache.json"),
})
if err != nil {
ctx.Cleanup()
require.NoError(t, err)
}
return ctx, pool, source, resolver
}
type fakeIdentityResolver struct {
mu sync.Mutex
identities map[storj.NodeURL]*identity.PeerIdentity
}
func newFakeIdentityResolver() *fakeIdentityResolver {
return &fakeIdentityResolver{
identities: make(map[storj.NodeURL]*identity.PeerIdentity),
}
}
func (resolver *fakeIdentityResolver) SetIdentity(url storj.NodeURL, identity *identity.PeerIdentity) {
resolver.mu.Lock()
defer resolver.mu.Unlock()
resolver.identities[url] = identity
}
func (resolver *fakeIdentityResolver) ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) {
resolver.mu.Lock()
defer resolver.mu.Unlock()
identity := resolver.identities[url]
if identity == nil {
return nil, errors.New("no identity")
}
return identity, nil
}

View File

@ -41,14 +41,14 @@ func NewSource(config string) (Source, error) {
case "http", "https":
return NewHTTPSource(config)
case "storj":
return NewStaticSource(config)
return NewStaticURLSource(config)
default:
return nil, errs.New("unsupported schema %q", schema)
}
}
if isProbablySatelliteURL(config) {
return NewStaticSource(config)
return NewStaticURLSource(config)
}
return NewFileSource(config), nil

View File

@ -57,7 +57,7 @@ func TestNewSource(t *testing.T) {
{
name: "explicit satellite URL",
config: "storj://121RTSDpyNZVcEU84Ticf2L1ntiuUimbWgfATz21tuvgk3vzoA6@domain.test:7777",
typ: new(trust.StaticSource),
typ: new(trust.StaticURLSource),
},
{
name: "explicit bad satellite URL",
@ -67,7 +67,7 @@ func TestNewSource(t *testing.T) {
{
name: "satellite URL",
config: "121RTSDpyNZVcEU84Ticf2L1ntiuUimbWgfATz21tuvgk3vzoA6@domain.test:7777",
typ: new(trust.StaticSource),
typ: new(trust.StaticURLSource),
},
{
name: "partial satellite URL",

View File

@ -14,32 +14,32 @@ var (
ErrStaticSource = errs.Class("static source")
)
// StaticSource is a trust source that returns an explicitly trusted URL
type StaticSource struct {
url SatelliteURL
// StaticURLSource is a trust source that returns an explicitly trusted URL
type StaticURLSource struct {
URL SatelliteURL
}
// NewStaticSource takes an explicitly trusted URL and returns a new StaticSource.
func NewStaticSource(satelliteURL string) (*StaticSource, error) {
// NewStaticURLSource takes an explicitly trusted URL and returns a new StaticURLSource.
func NewStaticURLSource(satelliteURL string) (*StaticURLSource, error) {
url, err := ParseSatelliteURL(satelliteURL)
if err != nil {
return nil, ErrStaticSource.Wrap(err)
}
return &StaticSource{url: url}, nil
return &StaticURLSource{URL: url}, nil
}
// String implements the Source interface and returns the static trusted URL
func (source *StaticSource) String() string {
return source.url.String()
func (source *StaticURLSource) String() string {
return source.URL.String()
}
// Static implements the Source interface. It returns true.
func (source *StaticSource) Static() bool { return true }
func (source *StaticURLSource) Static() bool { return true }
// FetchEntries returns a trust entry for the explicitly trusted Satellite URL.
// The entry is authoritative.
func (source *StaticSource) FetchEntries(ctx context.Context) ([]Entry, error) {
func (source *StaticURLSource) FetchEntries(ctx context.Context) ([]Entry, error) {
return []Entry{
{SatelliteURL: source.url, Authoritative: true},
{SatelliteURL: source.URL, Authoritative: true},
}, nil
}

View File

@ -13,7 +13,7 @@ import (
"storj.io/storj/storagenode/trust"
)
func TestStaticSource(t *testing.T) {
func TestStaticURLSource(t *testing.T) {
url := makeSatelliteURL("domain.test")
for _, tt := range []struct {
@ -40,7 +40,7 @@ func TestStaticSource(t *testing.T) {
} {
tt := tt // quiet linting
t.Run(tt.name, func(t *testing.T) {
source, err := trust.NewStaticSource(tt.url)
source, err := trust.NewStaticURLSource(tt.url)
if tt.err != "" {
require.EqualError(t, err, tt.err)
return