storagenode/trust: ensure trust pool updates satellite status on Refresh
Fixes https://github.com/storj/storj/issues/6261 Change-Id: Ic01ce423156058dd4676fb073c0de3d768991d0e
This commit is contained in:
parent
8a1bedd367
commit
a2c162db9b
@ -118,7 +118,7 @@ func Test_cmdForgetSatellite(t *testing.T) {
|
||||
// test that the satellite was inserted correctly
|
||||
satellite, err := db.Satellites().GetSatellite(ctx, satelliteID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, satellites.Untrusted, satellites.Status(satellite.Status))
|
||||
require.Equal(t, satellites.Untrusted, satellite.Status)
|
||||
|
||||
// set up the identity
|
||||
ident := planet.StorageNodes[0].Identity
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/preflight"
|
||||
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
|
||||
"storj.io/storj/storagenode/trust"
|
||||
)
|
||||
|
||||
@ -48,138 +49,138 @@ func TestLocalTime_InSync(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLocalTime_OutOfSync(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
// set up mock satellite server configuration
|
||||
mockSatID, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
config := server.Config{
|
||||
Address: "127.0.0.1:0",
|
||||
PrivateAddress: "127.0.0.1:0",
|
||||
// set up mock satellite server configuration
|
||||
mockSatID, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
config := server.Config{
|
||||
Address: "127.0.0.1:0",
|
||||
PrivateAddress: "127.0.0.1:0",
|
||||
|
||||
Config: tlsopts.Config{
|
||||
PeerIDVersions: "*",
|
||||
Extensions: extensions.Config{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
Config: tlsopts.Config{
|
||||
PeerIDVersions: "*",
|
||||
Extensions: extensions.Config{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
mockSatTLSOptions, err := tlsopts.NewOptions(mockSatID, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("Less than 30m", func(t *testing.T) {
|
||||
// register mock GetTime endpoint to mock server
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
|
||||
contactServer, err := server.New(log, mockSatTLSOptions, config)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(contactServer.Close)
|
||||
|
||||
err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
|
||||
localTime: time.Now().Add(-25 * time.Minute),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
group.Go(func() error {
|
||||
return contactServer.Run(ctx)
|
||||
})
|
||||
|
||||
// get mock server address
|
||||
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
|
||||
require.NoError(t, err)
|
||||
port, err := strconv.Atoi(portStr)
|
||||
require.NoError(t, err)
|
||||
url := trust.SatelliteURL{
|
||||
ID: mockSatID.ID,
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
}
|
||||
mockSatTLSOptions, err := tlsopts.NewOptions(mockSatID, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// set up storagenode client
|
||||
source, err := trust.NewStaticURLSource(url.String())
|
||||
require.NoError(t, err)
|
||||
t.Run("Less than 30m", func(t *testing.T) {
|
||||
// register mock GetTime endpoint to mock server
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
|
||||
identity, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
|
||||
Sources: []trust.Source{source},
|
||||
CachePath: ctx.File("trust-cache.json"),
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
err = pool.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
contactServer, err := server.New(log, mockSatTLSOptions, config)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(contactServer.Close)
|
||||
|
||||
// should not return any error when node's clock is off no more than 30m
|
||||
localtime := preflight.NewLocalTime(log, preflight.Config{
|
||||
LocalTimeCheck: true,
|
||||
}, pool, dialer)
|
||||
err = localtime.Check(ctx)
|
||||
require.NoError(t, err)
|
||||
err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
|
||||
localTime: time.Now().Add(-25 * time.Minute),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
})
|
||||
group.Go(func() error {
|
||||
return contactServer.Run(ctx)
|
||||
})
|
||||
|
||||
t.Run("More than 30m", func(t *testing.T) {
|
||||
// register mock GetTime endpoint to mock server
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
// get mock server address
|
||||
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
|
||||
require.NoError(t, err)
|
||||
port, err := strconv.Atoi(portStr)
|
||||
require.NoError(t, err)
|
||||
url := trust.SatelliteURL{
|
||||
ID: mockSatID.ID,
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
contactServer, err := server.New(log, mockSatTLSOptions, config)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(contactServer.Close)
|
||||
// set up storagenode client
|
||||
source, err := trust.NewStaticURLSource(url.String())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
|
||||
localTime: time.Now().Add(-31 * time.Minute),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
identity, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
|
||||
Sources: []trust.Source{source},
|
||||
CachePath: ctx.File("trust-cache.json"),
|
||||
}, db.Satellites())
|
||||
require.NoError(t, err)
|
||||
err = pool.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// should not return any error when node's clock is off no more than 30m
|
||||
localtime := preflight.NewLocalTime(log, preflight.Config{
|
||||
LocalTimeCheck: true,
|
||||
}, pool, dialer)
|
||||
err = localtime.Check(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
group.Go(func() error {
|
||||
return contactServer.Run(ctx)
|
||||
})
|
||||
|
||||
// get mock server address
|
||||
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
|
||||
require.NoError(t, err)
|
||||
port, err := strconv.Atoi(portStr)
|
||||
require.NoError(t, err)
|
||||
url := trust.SatelliteURL{
|
||||
ID: mockSatID.ID,
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
}
|
||||
require.NoError(t, err)
|
||||
t.Run("More than 30m", func(t *testing.T) {
|
||||
// register mock GetTime endpoint to mock server
|
||||
var group errgroup.Group
|
||||
defer ctx.Check(group.Wait)
|
||||
|
||||
// set up storagenode client
|
||||
source, err := trust.NewStaticURLSource(url.String())
|
||||
require.NoError(t, err)
|
||||
contactServer, err := server.New(log, mockSatTLSOptions, config)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(contactServer.Close)
|
||||
|
||||
identity, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
|
||||
Sources: []trust.Source{source},
|
||||
CachePath: ctx.File("trust-cache.json"),
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
err = pool.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
|
||||
localTime: time.Now().Add(-31 * time.Minute),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// should return an error when node's clock is off by more than 30m with all trusted satellites
|
||||
localtime := preflight.NewLocalTime(log, preflight.Config{
|
||||
LocalTimeCheck: true,
|
||||
}, pool, dialer)
|
||||
err = localtime.Check(ctx)
|
||||
require.Error(t, err)
|
||||
group.Go(func() error {
|
||||
return contactServer.Run(ctx)
|
||||
})
|
||||
|
||||
// get mock server address
|
||||
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
|
||||
require.NoError(t, err)
|
||||
port, err := strconv.Atoi(portStr)
|
||||
require.NoError(t, err)
|
||||
url := trust.SatelliteURL{
|
||||
ID: mockSatID.ID,
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// set up storagenode client
|
||||
source, err := trust.NewStaticURLSource(url.String())
|
||||
require.NoError(t, err)
|
||||
|
||||
identity, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
|
||||
Sources: []trust.Source{source},
|
||||
CachePath: ctx.File("trust-cache.json"),
|
||||
}, db.Satellites())
|
||||
require.NoError(t, err)
|
||||
err = pool.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// should return an error when node's clock is off by more than 30m with all trusted satellites
|
||||
localtime := preflight.NewLocalTime(log, preflight.Config{
|
||||
LocalTimeCheck: true,
|
||||
}, pool, dialer)
|
||||
err = localtime.Check(ctx)
|
||||
require.Error(t, err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -13,19 +13,21 @@ import (
|
||||
// Status refers to the state of the relationship with a satellites.
|
||||
type Status = int
|
||||
|
||||
// It is important that the values/order of these Status constants are not changed
|
||||
// because they are stored in the database.
|
||||
const (
|
||||
// Unexpected status should not be used for sanity checking.
|
||||
Unexpected Status = 0
|
||||
// Normal status reflects a lack of graceful exit.
|
||||
Normal = 1
|
||||
Normal Status = 1
|
||||
// Exiting reflects an active graceful exit.
|
||||
Exiting = 2
|
||||
Exiting Status = 2
|
||||
// ExitSucceeded reflects a graceful exit that succeeded.
|
||||
ExitSucceeded = 3
|
||||
ExitSucceeded Status = 3
|
||||
// ExitFailed reflects a graceful exit that failed.
|
||||
ExitFailed = 4
|
||||
ExitFailed Status = 4
|
||||
// Untrusted reflects a satellite that is not trusted.
|
||||
Untrusted = 5
|
||||
Untrusted Status = 5
|
||||
)
|
||||
|
||||
// ExitProgress contains the status of a graceful exit.
|
||||
@ -36,14 +38,14 @@ type ExitProgress struct {
|
||||
StartingDiskUsage int64
|
||||
BytesDeleted int64
|
||||
CompletionReceipt []byte
|
||||
Status int32
|
||||
Status Status
|
||||
}
|
||||
|
||||
// Satellite contains the satellite and status.
|
||||
type Satellite struct {
|
||||
SatelliteID storj.NodeID
|
||||
AddedAt time.Time
|
||||
Status int32
|
||||
Status Status
|
||||
}
|
||||
|
||||
// DB works with satellite database.
|
||||
|
@ -120,21 +120,6 @@ func (pool *Pool) Run(ctx context.Context) error {
|
||||
pool.log.Error("Failed to refresh", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, trustedSatellite := range pool.satellites {
|
||||
status := satellites.Normal
|
||||
// for cases where a satellite was previously marked as untrusted, but is now trusted
|
||||
// we reset the status back to normal
|
||||
satellite, err := pool.satellitesDB.GetSatellite(ctx, trustedSatellite.url.ID)
|
||||
if err == nil && !satellite.SatelliteID.IsZero() {
|
||||
if satellite.Status != satellites.Untrusted {
|
||||
status = satellites.Status(satellite.Status)
|
||||
}
|
||||
}
|
||||
if err := pool.satellitesDB.SetAddressAndStatus(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address, status); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -229,7 +214,7 @@ func (pool *Pool) Refresh(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// remove trusted IDs that are no longer in the URL list
|
||||
for id := range pool.satellites {
|
||||
for id, info := range pool.satellites {
|
||||
if _, ok := trustedIDs[id]; !ok {
|
||||
pool.log.Debug("Satellite is no longer trusted", zap.String("id", id.String()))
|
||||
delete(pool.satellites, id)
|
||||
@ -237,6 +222,21 @@ func (pool *Pool) Refresh(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// for cases where a satellite was previously marked as untrusted, but is now trusted
|
||||
// we reset the status back to normal
|
||||
status := satellites.Normal
|
||||
dbSatellite, err := pool.satellitesDB.GetSatellite(ctx, info.url.ID)
|
||||
if err == nil && !dbSatellite.SatelliteID.IsZero() {
|
||||
if dbSatellite.Status != satellites.Untrusted {
|
||||
status = dbSatellite.Status
|
||||
}
|
||||
}
|
||||
if err := pool.satellitesDB.SetAddressAndStatus(ctx, info.url.ID, info.url.Address, status); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -20,6 +21,7 @@ import (
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/storagenode"
|
||||
"storj.io/storj/storagenode/satellites"
|
||||
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
|
||||
"storj.io/storj/storagenode/trust"
|
||||
)
|
||||
@ -33,7 +35,6 @@ func TestPoolRequiresCachePath(t *testing.T) {
|
||||
func TestPoolVerifySatelliteID(t *testing.T) {
|
||||
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
|
||||
pool, source, _ := newPoolTest(ctx, t, db)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
id := testrand.NodeID()
|
||||
|
||||
@ -77,7 +78,6 @@ func TestPoolGetSignee(t *testing.T) {
|
||||
}
|
||||
|
||||
pool, source, resolver := newPoolTest(ctx, t, db)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// ID is untrusted
|
||||
_, err := pool.GetSignee(context.Background(), id)
|
||||
@ -122,7 +122,6 @@ func TestPoolGetSignee(t *testing.T) {
|
||||
func TestPoolGetSatellites(t *testing.T) {
|
||||
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
|
||||
pool, source, _ := newPoolTest(ctx, t, db)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
id1 := testrand.NodeID()
|
||||
id2 := testrand.NodeID()
|
||||
@ -152,10 +151,115 @@ func TestPoolGetSatellites(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPool_SatelliteDB_Status(t *testing.T) {
|
||||
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
|
||||
source := &fakeSource{}
|
||||
|
||||
resolver := newFakeIdentityResolver()
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
config := trust.Config{
|
||||
Sources: []trust.Source{source},
|
||||
CachePath: ctx.File("trust-cache.json"),
|
||||
RefreshInterval: 0 * time.Second,
|
||||
}
|
||||
|
||||
pool, err := trust.NewPool(log, resolver, config, db.Satellites())
|
||||
require.NoError(t, err)
|
||||
|
||||
id1 := testrand.NodeID()
|
||||
id2 := testrand.NodeID()
|
||||
|
||||
// Refresh the pool with the new trust entry
|
||||
source.entries = []trust.Entry{
|
||||
{
|
||||
SatelliteURL: trust.SatelliteURL{
|
||||
ID: id1,
|
||||
Host: "foo.test",
|
||||
Port: 7777,
|
||||
},
|
||||
},
|
||||
{
|
||||
SatelliteURL: trust.SatelliteURL{
|
||||
ID: id2,
|
||||
Host: "bar.test",
|
||||
Port: 7777,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, pool.Refresh(context.Background()))
|
||||
|
||||
sats, err := db.Satellites().GetSatellites(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(sats))
|
||||
require.Equal(t, satellites.Normal, sats[0].Status)
|
||||
require.Equal(t, satellites.Normal, sats[1].Status)
|
||||
|
||||
// Refresh the pool with the new trust entry
|
||||
source.entries = []trust.Entry{
|
||||
{
|
||||
SatelliteURL: trust.SatelliteURL{
|
||||
ID: id2,
|
||||
Host: "bar.test",
|
||||
Port: 7777,
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(t, pool.Refresh(context.Background()))
|
||||
sats, err = db.Satellites().GetSatellites(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(sats))
|
||||
|
||||
for i := 0; i < len(sats); i++ {
|
||||
switch sats[i].SatelliteID {
|
||||
case id1:
|
||||
require.Equal(t, satellites.Untrusted, sats[i].Status)
|
||||
case id2:
|
||||
require.Equal(t, satellites.Normal, sats[i].Status)
|
||||
default:
|
||||
t.Fatal("unexpected satellite")
|
||||
}
|
||||
}
|
||||
|
||||
expected := []storj.NodeID{id2}
|
||||
actual := pool.GetSatellites(context.Background())
|
||||
assert.ElementsMatch(t, expected, actual)
|
||||
|
||||
// test cases when the untrusted satellite is now trusted
|
||||
source.entries = []trust.Entry{
|
||||
{
|
||||
SatelliteURL: trust.SatelliteURL{
|
||||
ID: id1,
|
||||
Host: "foo.test",
|
||||
Port: 7777,
|
||||
},
|
||||
},
|
||||
{
|
||||
SatelliteURL: trust.SatelliteURL{
|
||||
ID: id2,
|
||||
Host: "bar.test",
|
||||
Port: 7777,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, pool.Refresh(context.Background()))
|
||||
sats, err = db.Satellites().GetSatellites(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(sats))
|
||||
require.Equal(t, satellites.Normal, sats[0].Status)
|
||||
require.Equal(t, satellites.Normal, sats[1].Status)
|
||||
|
||||
expected = []storj.NodeID{id1, id2}
|
||||
actual = pool.GetSatellites(context.Background())
|
||||
assert.ElementsMatch(t, expected, actual)
|
||||
})
|
||||
}
|
||||
|
||||
func TestPoolGetAddress(t *testing.T) {
|
||||
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
|
||||
pool, source, _ := newPoolTest(ctx, t, db)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
id := testrand.NodeID()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user