satellite/contact; storagenode/preflight: add clock check on startup for storagenode
add config preflight.enabled-local-time Change-Id: I7b942c9bee063aae409ee6721ae9d079dff0144f
This commit is contained in:
parent
07c2824d94
commit
db8aee0806
@ -30,6 +30,7 @@ import (
|
||||
"storj.io/storj/storagenode/nodestats"
|
||||
"storj.io/storj/storagenode/orders"
|
||||
"storj.io/storj/storagenode/piecestore"
|
||||
"storj.io/storj/storagenode/preflight"
|
||||
"storj.io/storj/storagenode/retain"
|
||||
"storj.io/storj/storagenode/storagenodedb"
|
||||
"storj.io/storj/storagenode/trust"
|
||||
@ -83,6 +84,9 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
|
||||
},
|
||||
},
|
||||
},
|
||||
Preflight: preflight.Config{
|
||||
EnabledLocalTime: false,
|
||||
},
|
||||
Operator: storagenode.OperatorConfig{
|
||||
Email: prefix + "@mail.test",
|
||||
Wallet: "0x" + strings.Repeat("00", 20),
|
||||
|
@ -101,5 +101,16 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
|
||||
// GetTime returns current timestamp
|
||||
func (endpoint *Endpoint) GetTime(ctx context.Context, req *pb.GetTimeRequest) (_ *pb.GetTimeResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return nil, rpcstatus.Error(rpcstatus.Unimplemented, "not implemented")
|
||||
|
||||
peerID, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
endpoint.log.Info("failed to get node ID from context", zap.Error(err))
|
||||
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, errCheckInIdentity.New("failed to get ID from context: %v", err).Error())
|
||||
}
|
||||
|
||||
currentTimestamp := time.Now().UTC()
|
||||
endpoint.log.Debug("get system current time", zap.Stringer("timestamp", currentTimestamp), zap.Stringer("node id", peerID.ID))
|
||||
return &pb.GetTimeResponse{
|
||||
Timestamp: currentTimestamp,
|
||||
}, nil
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ import (
|
||||
"storj.io/storj/storagenode/orders"
|
||||
"storj.io/storj/storagenode/pieces"
|
||||
"storj.io/storj/storagenode/piecestore"
|
||||
"storj.io/storj/storagenode/preflight"
|
||||
"storj.io/storj/storagenode/reputation"
|
||||
"storj.io/storj/storagenode/retain"
|
||||
"storj.io/storj/storagenode/satellites"
|
||||
@ -81,8 +82,9 @@ type Config struct {
|
||||
|
||||
Server server.Config
|
||||
|
||||
Contact contact.Config
|
||||
Operator OperatorConfig
|
||||
Preflight preflight.Config
|
||||
Contact contact.Config
|
||||
Operator OperatorConfig
|
||||
|
||||
// TODO: flatten storage config and only keep the new one
|
||||
Storage piecestore.OldConfig
|
||||
@ -125,6 +127,10 @@ type Peer struct {
|
||||
// services and endpoints
|
||||
// TODO: similar grouping to satellite.Core
|
||||
|
||||
Preflight struct {
|
||||
LocalTime *preflight.LocalTime
|
||||
}
|
||||
|
||||
Contact struct {
|
||||
Service *contact.Service
|
||||
Chore *contact.Chore
|
||||
@ -213,6 +219,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
peer.Preflight.LocalTime = preflight.NewLocalTime(peer.Log.Named("preflight:localtime"), config.Preflight, peer.Storage2.Trust, peer.Dialer)
|
||||
}
|
||||
|
||||
{ // setup notification service.
|
||||
peer.Notifications.Service = notifications.NewService(peer.Log, peer.DB.Notifications())
|
||||
}
|
||||
@ -441,6 +451,11 @@ func (peer *Peer) Run(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := peer.Preflight.LocalTime.Check(ctx); err != nil {
|
||||
peer.Log.Fatal("failed preflight check", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
group.Go(func() error {
|
||||
|
15
storagenode/preflight/common.go
Normal file
15
storagenode/preflight/common.go
Normal file
@ -0,0 +1,15 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package preflight
|
||||
|
||||
import (
|
||||
"gopkg.in/spacemonkeygo/monkit.v2"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Config for graceful exit
|
||||
type Config struct {
|
||||
EnabledLocalTime bool `help:"whether or not preflight check for local system clock is enabled on the satellite side. When disabling this feature, your storagenode may not setup correctly." default:"true"`
|
||||
}
|
140
storagenode/preflight/localtime.go
Normal file
140
storagenode/preflight/localtime.go
Normal file
@ -0,0 +1,140 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package preflight
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/storagenode/trust"
|
||||
)
|
||||
|
||||
// ErrClockOutOfSyncMinor is the error class for system clock is off by more than 1h
|
||||
var ErrClockOutOfSyncMinor = errs.Class("system clock is off")
|
||||
|
||||
// ErrClockOutOfSyncMajor is the error class for system clock is out of sync by more than 24h
|
||||
var ErrClockOutOfSyncMajor = errs.Class("system clock is out of sync")
|
||||
|
||||
// LocalTime checks local system clock against all trusted satellites.
|
||||
type LocalTime struct {
|
||||
log *zap.Logger
|
||||
config Config
|
||||
trust *trust.Pool
|
||||
dialer rpc.Dialer
|
||||
}
|
||||
|
||||
// NewLocalTime creates a new localtime instance.
|
||||
func NewLocalTime(log *zap.Logger, config Config, trust *trust.Pool, dialer rpc.Dialer) *LocalTime {
|
||||
return &LocalTime{
|
||||
log: log,
|
||||
config: config,
|
||||
trust: trust,
|
||||
dialer: dialer,
|
||||
}
|
||||
}
|
||||
|
||||
// Check compares local system clock with all trusted satellites' system clock.
|
||||
// it returns an error when local system clock is out of sync by more than 24h with all trusted satellites' clock.
|
||||
func (localTime *LocalTime) Check(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if !localTime.config.EnabledLocalTime {
|
||||
localTime.log.Debug("local system clock check is not enabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
localTime.log.Info("start checking local system clock with trusted satellites' system clock.")
|
||||
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// get trusted satellites
|
||||
satellites := localTime.trust.GetSatellites(ctx)
|
||||
results := make([]error, len(satellites))
|
||||
for i, satellite := range satellites {
|
||||
i := i
|
||||
satellite := satellite
|
||||
group.Go(func() error {
|
||||
// get a current timestamp
|
||||
currentLocalTime := time.Now().UTC()
|
||||
satelliteTime, err := localTime.getSatelliteTime(ctx, satellite)
|
||||
if err != nil {
|
||||
localTime.log.Error("unable to get satellite system time", zap.Stringer("Satellite ID", satellite), zap.Error(err))
|
||||
results[i] = ErrClockOutOfSyncMajor.Wrap(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
err = localTime.checkSatelliteTime(ctx, satelliteTime.GetTimestamp(), currentLocalTime)
|
||||
if err != nil {
|
||||
localTime.log.Error("system clock is out of sync with satellite", zap.Stringer("Satellite ID", satellite), zap.Error(err))
|
||||
if ErrClockOutOfSyncMinor.Has(err) {
|
||||
return nil
|
||||
}
|
||||
results[i] = err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
_ = group.Wait()
|
||||
|
||||
errsCounter := 0
|
||||
for _, result := range results {
|
||||
if ErrClockOutOfSyncMajor.Has(result) {
|
||||
errsCounter++
|
||||
}
|
||||
}
|
||||
if errsCounter == len(satellites) {
|
||||
return ErrClockOutOfSyncMajor.New("system clock is out of sync with all trusted satellites")
|
||||
}
|
||||
|
||||
localTime.log.Info("local system clock is in sync with trusted satellites' system clock.")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (localTime *LocalTime) getSatelliteTime(ctx context.Context, satelliteID storj.NodeID) (_ *pb.GetTimeResponse, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
address, err := localTime.trust.GetAddress(ctx, satelliteID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := localTime.dialer.DialAddressID(ctx, address, satelliteID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, conn.Close())
|
||||
}()
|
||||
|
||||
resp, err := pb.NewDRPCNodeClient(conn.Raw()).GetTime(ctx, &pb.GetTimeRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (localTime *LocalTime) checkSatelliteTime(ctx context.Context, satelliteTime time.Time, systemTime time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
diff := math.Abs(satelliteTime.Sub(systemTime).Hours())
|
||||
// check to see if the timestamp received from satellites are off by more than 24h
|
||||
if diff > 24 {
|
||||
return ErrClockOutOfSyncMajor.New("clock off by %f", diff)
|
||||
}
|
||||
// check to see if the timestamp received from satellites are off by more than 1h
|
||||
if diff > 1 {
|
||||
return ErrClockOutOfSyncMinor.New("clock off by %f", diff)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
182
storagenode/preflight/localtime_test.go
Normal file
182
storagenode/preflight/localtime_test.go
Normal file
@ -0,0 +1,182 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package preflight_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/peertls/tlsopts"
|
||||
"storj.io/common/rpc"
|
||||
"storj.io/storj/pkg/server"
|
||||
|
||||
"storj.io/common/identity/testidentity"
|
||||
"storj.io/common/peertls/extensions"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/storagenode/preflight"
|
||||
"storj.io/storj/storagenode/trust"
|
||||
)
|
||||
|
||||
type mockServer struct {
|
||||
localTime time.Time
|
||||
pb.NodeServer
|
||||
}
|
||||
|
||||
func TestLocalTime_InSync(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
storagenode := planet.StorageNodes[0]
|
||||
err := storagenode.Preflight.LocalTime.Check(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLocalTime_OutOfSync(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
// set up mock satellite server configuration
|
||||
mockSatID, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
config := server.Config{
|
||||
Address: "127.0.0.1:0",
|
||||
PrivateAddress: "127.0.0.1:0",
|
||||
|
||||
Config: tlsopts.Config{
|
||||
PeerIDVersions: "*",
|
||||
Extensions: extensions.Config{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
mockSatTLSOptions, err := tlsopts.NewOptions(mockSatID, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("Less than 24h", func(t *testing.T) {
|
||||
// register mock GetTime endpoint to mock server
|
||||
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress, nil)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := contactServer.Close()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
|
||||
localTime: time.Now().UTC().Add(-2 * time.Hour),
|
||||
})
|
||||
|
||||
go func() {
|
||||
err := contactServer.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
// get mock server address
|
||||
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
|
||||
require.NoError(t, err)
|
||||
port, err := strconv.Atoi(portStr)
|
||||
require.NoError(t, err)
|
||||
url := trust.SatelliteURL{
|
||||
ID: mockSatID.ID,
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// set up storagenode client
|
||||
source, err := trust.NewStaticURLSource(url.String())
|
||||
require.NoError(t, err)
|
||||
|
||||
identity, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
|
||||
Sources: []trust.Source{source},
|
||||
CachePath: ctx.File("trust-cache.json"),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = pool.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// should not return any error when node's clock is off no more than 24
|
||||
localtime := preflight.NewLocalTime(log, preflight.Config{
|
||||
EnabledLocalTime: true,
|
||||
}, pool, dialer)
|
||||
err = localtime.Check(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("More than 24h", func(t *testing.T) {
|
||||
// register mock GetTime endpoint to mock server
|
||||
contactServer, err := server.New(log, mockSatTLSOptions, config.Address, config.PrivateAddress, nil)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := contactServer.Close()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
pb.DRPCRegisterNode(contactServer.DRPC(), &mockServer{
|
||||
localTime: time.Now().UTC().Add(-25 * time.Hour),
|
||||
})
|
||||
|
||||
go func() {
|
||||
err := contactServer.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
// get mock server address
|
||||
_, portStr, err := net.SplitHostPort(contactServer.Addr().String())
|
||||
require.NoError(t, err)
|
||||
port, err := strconv.Atoi(portStr)
|
||||
require.NoError(t, err)
|
||||
url := trust.SatelliteURL{
|
||||
ID: mockSatID.ID,
|
||||
Host: "127.0.0.1",
|
||||
Port: port,
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// set up storagenode client
|
||||
source, err := trust.NewStaticURLSource(url.String())
|
||||
require.NoError(t, err)
|
||||
|
||||
identity, err := testidentity.NewTestIdentity(ctx)
|
||||
require.NoError(t, err)
|
||||
tlsOptions, err := tlsopts.NewOptions(identity, config.Config, nil)
|
||||
require.NoError(t, err)
|
||||
dialer := rpc.NewDefaultDialer(tlsOptions)
|
||||
pool, err := trust.NewPool(log, trust.Dialer(dialer), trust.Config{
|
||||
Sources: []trust.Source{source},
|
||||
CachePath: ctx.File("trust-cache.json"),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = pool.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// should return an error when node's clock is off by more than 24h with all trusted satellites
|
||||
localtime := preflight.NewLocalTime(log, preflight.Config{
|
||||
EnabledLocalTime: true,
|
||||
}, pool, dialer)
|
||||
err = localtime.Check(ctx)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func (mock *mockServer) GetTime(ctx context.Context, req *pb.GetTimeRequest) (*pb.GetTimeResponse, error) {
|
||||
return &pb.GetTimeResponse{
|
||||
Timestamp: mock.localTime,
|
||||
}, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user