From a2c162db9bac1889f8dd0d5ac318bd620198de85 Mon Sep 17 00:00:00 2001 From: Clement Sam Date: Mon, 25 Sep 2023 18:15:05 +0000 Subject: [PATCH] storagenode/trust: ensure trust pool updates satellite status on Refresh Fixes https://github.com/storj/storj/issues/6261 Change-Id: Ic01ce423156058dd4676fb073c0de3d768991d0e --- cmd/storagenode/cmd_forget_satellite_test.go | 2 +- storagenode/preflight/localtime_test.go | 227 ++++++++++--------- storagenode/satellites/satellites.go | 16 +- storagenode/trust/service.go | 32 +-- storagenode/trust/service_test.go | 112 ++++++++- 5 files changed, 248 insertions(+), 141 deletions(-) diff --git a/cmd/storagenode/cmd_forget_satellite_test.go b/cmd/storagenode/cmd_forget_satellite_test.go index b51eb9ea9..8a8397ee1 100644 --- a/cmd/storagenode/cmd_forget_satellite_test.go +++ b/cmd/storagenode/cmd_forget_satellite_test.go @@ -118,7 +118,7 @@ func Test_cmdForgetSatellite(t *testing.T) { // test that the satellite was inserted correctly satellite, err := db.Satellites().GetSatellite(ctx, satelliteID) require.NoError(t, err) - require.Equal(t, satellites.Untrusted, satellites.Status(satellite.Status)) + require.Equal(t, satellites.Untrusted, satellite.Status) // set up the identity ident := planet.StorageNodes[0].Identity diff --git a/storagenode/preflight/localtime_test.go b/storagenode/preflight/localtime_test.go index 43d93fdbe..56762af01 100644 --- a/storagenode/preflight/localtime_test.go +++ b/storagenode/preflight/localtime_test.go @@ -24,6 +24,7 @@ import ( "storj.io/storj/private/testplanet" "storj.io/storj/storagenode" "storj.io/storj/storagenode/preflight" + "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" "storj.io/storj/storagenode/trust" ) @@ -48,138 +49,138 @@ func TestLocalTime_InSync(t *testing.T) { } func TestLocalTime_OutOfSync(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() + storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { - log := zaptest.NewLogger(t) + log := zaptest.NewLogger(t) - // set up mock satellite server configuration - mockSatID, err := testidentity.NewTestIdentity(ctx) - require.NoError(t, err) - config := server.Config{ - Address: "127.0.0.1:0", - PrivateAddress: "127.0.0.1:0", + // set up mock satellite server configuration + mockSatID, err := testidentity.NewTestIdentity(ctx) + require.NoError(t, err) + config := server.Config{ + Address: "127.0.0.1:0", + PrivateAddress: "127.0.0.1:0", - Config: tlsopts.Config{ - PeerIDVersions: "*", - Extensions: extensions.Config{ - Revocation: false, - WhitelistSignedLeaf: false, + Config: tlsopts.Config{ + PeerIDVersions: "*", + Extensions: extensions.Config{ + Revocation: false, + WhitelistSignedLeaf: false, + }, }, - }, - } - mockSatTLSOptions, err := tlsopts.NewOptions(mockSatID, config.Config, nil) - require.NoError(t, err) - - t.Run("Less than 30m", func(t *testing.T) { - // register mock GetTime endpoint to mock server - var group errgroup.Group - defer ctx.Check(group.Wait) - - contactServer, err := server.New(log, mockSatTLSOptions, config) - require.NoError(t, err) - defer ctx.Check(contactServer.Close) - - err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{ - localTime: time.Now().Add(-25 * time.Minute), - }) - require.NoError(t, err) - - group.Go(func() error { - return contactServer.Run(ctx) - }) - - // get mock server address - _, portStr, err := net.SplitHostPort(contactServer.Addr().String()) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - url := trust.SatelliteURL{ - ID: mockSatID.ID, - Host: "127.0.0.1", - Port: port, } + mockSatTLSOptions, err := tlsopts.NewOptions(mockSatID, config.Config, nil) require.NoError(t, err) - // set up storagenode client - source, err := trust.NewStaticURLSource(url.String()) - require.NoError(t, err) + t.Run("Less than 30m", func(t *testing.T) { + // register mock GetTime endpoint to mock server + var group errgroup.Group + defer ctx.Check(group.Wait) - identity, err := testidentity.NewTestIdentity(ctx) - require.NoError(t, err) - tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil) - require.NoError(t, err) - dialer := rpc.NewDefaultDialer(tlsOptions) - pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{ - Sources: []trust.Source{source}, - CachePath: ctx.File("trust-cache.json"), - }, nil) - require.NoError(t, err) - err = pool.Refresh(ctx) - require.NoError(t, err) + contactServer, err := server.New(log, mockSatTLSOptions, config) + require.NoError(t, err) + defer ctx.Check(contactServer.Close) - // should not return any error when node's clock is off no more than 30m - localtime := preflight.NewLocalTime(log, preflight.Config{ - LocalTimeCheck: true, - }, pool, dialer) - err = localtime.Check(ctx) - require.NoError(t, err) + err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{ + localTime: time.Now().Add(-25 * time.Minute), + }) + require.NoError(t, err) - }) + group.Go(func() error { + return contactServer.Run(ctx) + }) - t.Run("More than 30m", func(t *testing.T) { - // register mock GetTime endpoint to mock server - var group errgroup.Group - defer ctx.Check(group.Wait) + // get mock server address + _, portStr, err := net.SplitHostPort(contactServer.Addr().String()) + require.NoError(t, err) + port, err := strconv.Atoi(portStr) + require.NoError(t, err) + url := trust.SatelliteURL{ + ID: mockSatID.ID, + Host: "127.0.0.1", + Port: port, + } + require.NoError(t, err) - contactServer, err := server.New(log, mockSatTLSOptions, config) - require.NoError(t, err) - defer ctx.Check(contactServer.Close) + // set up storagenode client + source, err := trust.NewStaticURLSource(url.String()) + require.NoError(t, err) - err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{ - localTime: time.Now().Add(-31 * time.Minute), - }) - require.NoError(t, err) + identity, err := testidentity.NewTestIdentity(ctx) + require.NoError(t, err) + tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil) + require.NoError(t, err) + dialer := rpc.NewDefaultDialer(tlsOptions) + pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{ + Sources: []trust.Source{source}, + CachePath: ctx.File("trust-cache.json"), + }, db.Satellites()) + require.NoError(t, err) + err = pool.Refresh(ctx) + require.NoError(t, err) + + // should not return any error when node's clock is off no more than 30m + localtime := preflight.NewLocalTime(log, preflight.Config{ + LocalTimeCheck: true, + }, pool, dialer) + err = localtime.Check(ctx) + require.NoError(t, err) - group.Go(func() error { - return contactServer.Run(ctx) }) - // get mock server address - _, portStr, err := net.SplitHostPort(contactServer.Addr().String()) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - url := trust.SatelliteURL{ - ID: mockSatID.ID, - Host: "127.0.0.1", - Port: port, - } - require.NoError(t, err) + t.Run("More than 30m", func(t *testing.T) { + // register mock GetTime endpoint to mock server + var group errgroup.Group + defer ctx.Check(group.Wait) - // set up storagenode client - source, err := trust.NewStaticURLSource(url.String()) - require.NoError(t, err) + contactServer, err := server.New(log, mockSatTLSOptions, config) + require.NoError(t, err) + defer ctx.Check(contactServer.Close) - identity, err := testidentity.NewTestIdentity(ctx) - require.NoError(t, err) - tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil) - require.NoError(t, err) - dialer := rpc.NewDefaultDialer(tlsOptions) - pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{ - Sources: []trust.Source{source}, - CachePath: ctx.File("trust-cache.json"), - }, nil) - require.NoError(t, err) - err = pool.Refresh(ctx) - require.NoError(t, err) + err = pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{ + localTime: time.Now().Add(-31 * time.Minute), + }) + require.NoError(t, err) - // should return an error when node's clock is off by more than 30m with all trusted satellites - localtime := preflight.NewLocalTime(log, preflight.Config{ - LocalTimeCheck: true, - }, pool, dialer) - err = localtime.Check(ctx) - require.Error(t, err) + group.Go(func() error { + return contactServer.Run(ctx) + }) + + // get mock server address + _, portStr, err := net.SplitHostPort(contactServer.Addr().String()) + require.NoError(t, err) + port, err := strconv.Atoi(portStr) + require.NoError(t, err) + url := trust.SatelliteURL{ + ID: mockSatID.ID, + Host: "127.0.0.1", + Port: port, + } + require.NoError(t, err) + + // set up storagenode client + source, err := trust.NewStaticURLSource(url.String()) + require.NoError(t, err) + + identity, err := testidentity.NewTestIdentity(ctx) + require.NoError(t, err) + tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil) + require.NoError(t, err) + dialer := rpc.NewDefaultDialer(tlsOptions) + pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{ + Sources: []trust.Source{source}, + CachePath: ctx.File("trust-cache.json"), + }, db.Satellites()) + require.NoError(t, err) + err = pool.Refresh(ctx) + require.NoError(t, err) + + // should return an error when node's clock is off by more than 30m with all trusted satellites + localtime := preflight.NewLocalTime(log, preflight.Config{ + LocalTimeCheck: true, + }, pool, dialer) + err = localtime.Check(ctx) + require.Error(t, err) + }) }) } diff --git a/storagenode/satellites/satellites.go b/storagenode/satellites/satellites.go index 61a764c14..d5ef5838f 100644 --- a/storagenode/satellites/satellites.go +++ b/storagenode/satellites/satellites.go @@ -13,19 +13,21 @@ import ( // Status refers to the state of the relationship with a satellites. type Status = int +// It is important that the values/order of these Status constants are not changed +// because they are stored in the database. const ( // Unexpected status should not be used for sanity checking. Unexpected Status = 0 // Normal status reflects a lack of graceful exit. - Normal = 1 + Normal Status = 1 // Exiting reflects an active graceful exit. - Exiting = 2 + Exiting Status = 2 // ExitSucceeded reflects a graceful exit that succeeded. - ExitSucceeded = 3 + ExitSucceeded Status = 3 // ExitFailed reflects a graceful exit that failed. - ExitFailed = 4 + ExitFailed Status = 4 // Untrusted reflects a satellite that is not trusted. - Untrusted = 5 + Untrusted Status = 5 ) // ExitProgress contains the status of a graceful exit. @@ -36,14 +38,14 @@ type ExitProgress struct { StartingDiskUsage int64 BytesDeleted int64 CompletionReceipt []byte - Status int32 + Status Status } // Satellite contains the satellite and status. type Satellite struct { SatelliteID storj.NodeID AddedAt time.Time - Status int32 + Status Status } // DB works with satellite database. diff --git a/storagenode/trust/service.go b/storagenode/trust/service.go index 7db3e5560..695ec8a03 100644 --- a/storagenode/trust/service.go +++ b/storagenode/trust/service.go @@ -120,21 +120,6 @@ func (pool *Pool) Run(ctx context.Context) error { pool.log.Error("Failed to refresh", zap.Error(err)) return err } - - for _, trustedSatellite := range pool.satellites { - status := satellites.Normal - // for cases where a satellite was previously marked as untrusted, but is now trusted - // we reset the status back to normal - satellite, err := pool.satellitesDB.GetSatellite(ctx, trustedSatellite.url.ID) - if err == nil && !satellite.SatelliteID.IsZero() { - if satellite.Status != satellites.Untrusted { - status = satellites.Status(satellite.Status) - } - } - if err := pool.satellitesDB.SetAddressAndStatus(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address, status); err != nil { - return err - } - } } } @@ -229,7 +214,7 @@ func (pool *Pool) Refresh(ctx context.Context) error { } // remove trusted IDs that are no longer in the URL list - for id := range pool.satellites { + for id, info := range pool.satellites { if _, ok := trustedIDs[id]; !ok { pool.log.Debug("Satellite is no longer trusted", zap.String("id", id.String())) delete(pool.satellites, id) @@ -237,6 +222,21 @@ func (pool *Pool) Refresh(ctx context.Context) error { if err != nil { return err } + + continue + } + + // for cases where a satellite was previously marked as untrusted, but is now trusted + // we reset the status back to normal + status := satellites.Normal + dbSatellite, err := pool.satellitesDB.GetSatellite(ctx, info.url.ID) + if err == nil && !dbSatellite.SatelliteID.IsZero() { + if dbSatellite.Status != satellites.Untrusted { + status = dbSatellite.Status + } + } + if err := pool.satellitesDB.SetAddressAndStatus(ctx, info.url.ID, info.url.Address, status); err != nil { + return err } } diff --git a/storagenode/trust/service_test.go b/storagenode/trust/service_test.go index 584e44b24..8df7014a0 100644 --- a/storagenode/trust/service_test.go +++ b/storagenode/trust/service_test.go @@ -10,6 +10,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,6 +21,7 @@ import ( "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/satellites" "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" "storj.io/storj/storagenode/trust" ) @@ -33,7 +35,6 @@ func TestPoolRequiresCachePath(t *testing.T) { func TestPoolVerifySatelliteID(t *testing.T) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { pool, source, _ := newPoolTest(ctx, t, db) - defer ctx.Cleanup() id := testrand.NodeID() @@ -77,7 +78,6 @@ func TestPoolGetSignee(t *testing.T) { } pool, source, resolver := newPoolTest(ctx, t, db) - defer ctx.Cleanup() // ID is untrusted _, err := pool.GetSignee(context.Background(), id) @@ -122,7 +122,6 @@ func TestPoolGetSignee(t *testing.T) { func TestPoolGetSatellites(t *testing.T) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { pool, source, _ := newPoolTest(ctx, t, db) - defer ctx.Cleanup() id1 := testrand.NodeID() id2 := testrand.NodeID() @@ -152,10 +151,115 @@ func TestPoolGetSatellites(t *testing.T) { }) } +func TestPool_SatelliteDB_Status(t *testing.T) { + storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { + source := &fakeSource{} + + resolver := newFakeIdentityResolver() + + log := zaptest.NewLogger(t) + config := trust.Config{ + Sources: []trust.Source{source}, + CachePath: ctx.File("trust-cache.json"), + RefreshInterval: 0 * time.Second, + } + + pool, err := trust.NewPool(log, resolver, config, db.Satellites()) + require.NoError(t, err) + + id1 := testrand.NodeID() + id2 := testrand.NodeID() + + // Refresh the pool with the new trust entry + source.entries = []trust.Entry{ + { + SatelliteURL: trust.SatelliteURL{ + ID: id1, + Host: "foo.test", + Port: 7777, + }, + }, + { + SatelliteURL: trust.SatelliteURL{ + ID: id2, + Host: "bar.test", + Port: 7777, + }, + }, + } + + require.NoError(t, pool.Refresh(context.Background())) + + sats, err := db.Satellites().GetSatellites(ctx) + require.NoError(t, err) + require.Equal(t, 2, len(sats)) + require.Equal(t, satellites.Normal, sats[0].Status) + require.Equal(t, satellites.Normal, sats[1].Status) + + // Refresh the pool with the new trust entry + source.entries = []trust.Entry{ + { + SatelliteURL: trust.SatelliteURL{ + ID: id2, + Host: "bar.test", + Port: 7777, + }, + }, + } + require.NoError(t, pool.Refresh(context.Background())) + sats, err = db.Satellites().GetSatellites(ctx) + require.NoError(t, err) + require.Equal(t, 2, len(sats)) + + for i := 0; i < len(sats); i++ { + switch sats[i].SatelliteID { + case id1: + require.Equal(t, satellites.Untrusted, sats[i].Status) + case id2: + require.Equal(t, satellites.Normal, sats[i].Status) + default: + t.Fatal("unexpected satellite") + } + } + + expected := []storj.NodeID{id2} + actual := pool.GetSatellites(context.Background()) + assert.ElementsMatch(t, expected, actual) + + // test cases when the untrusted satellite is now trusted + source.entries = []trust.Entry{ + { + SatelliteURL: trust.SatelliteURL{ + ID: id1, + Host: "foo.test", + Port: 7777, + }, + }, + { + SatelliteURL: trust.SatelliteURL{ + ID: id2, + Host: "bar.test", + Port: 7777, + }, + }, + } + + require.NoError(t, pool.Refresh(context.Background())) + sats, err = db.Satellites().GetSatellites(ctx) + require.NoError(t, err) + require.Equal(t, 2, len(sats)) + require.Equal(t, satellites.Normal, sats[0].Status) + require.Equal(t, satellites.Normal, sats[1].Status) + + expected = []storj.NodeID{id1, id2} + actual = pool.GetSatellites(context.Background()) + assert.ElementsMatch(t, expected, actual) + }) +} + func TestPoolGetAddress(t *testing.T) { storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { pool, source, _ := newPoolTest(ctx, t, db) - defer ctx.Cleanup() id := testrand.NodeID()