satellite/overlay: Ignore unnecessary check-ins

This prevents the database from being contacted unnecessarily,
reducing load.

Change-Id: Ib2420f68a20636ec35eb3dd3df8e02bd5341b419
This commit is contained in:
Jeremy Wharton 2021-06-15 11:32:12 -05:00 committed by Ivan Fraixedes
parent 7815e647de
commit 8a070e7c25
6 changed files with 167 additions and 5 deletions

View File

@ -0,0 +1,108 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay_test
import (
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/overlay"
)
// TestCheckIn ensures that redundant node check-ins aren't sent to the database.
// This is verified by comparing the last contact time from the database with
// the time of the unnecessary check-in.
func TestCheckIn(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
wait := sat.Config.Overlay.NodeCheckInWaitPeriod
nodeID := testrand.NodeID()
oldInfo, err := sat.Overlay.Service.Get(ctx, nodeID)
require.Error(t, overlay.ErrNodeNotFound.New("%v", nodeID), err)
require.Nil(t, oldInfo)
nodeInfo := overlay.NodeCheckInInfo{
NodeID: nodeID,
Address: &pb.NodeAddress{Address: "127.0.1.0"},
LastNet: "127.0.1",
LastIPPort: "127.0.1.0:8080",
IsUp: true,
Operator: &pb.NodeOperator{
Wallet: "0x" + strings.Repeat("00", 20),
Email: "abc123@mail.test",
WalletFeatures: []string{},
},
Capacity: &pb.NodeCapacity{},
Version: &pb.NodeVersion{Version: "v1.0.0"},
}
now := time.Now()
lastFail := time.Time{}
// infoCheck sends a node check-in and gets the node's info.
// The last contact timestamp is compared to the expected timestamp.
infoCheck := func(testName string, checkTime time.Time, expectedLastSuccess time.Time, expectedLastFailure time.Time) {
require.NoErrorf(t, sat.Overlay.Service.UpdateCheckIn(ctx, nodeInfo, checkTime), testName)
oldInfo, err := sat.Overlay.Service.Get(ctx, nodeID)
require.NoErrorf(t, err, testName)
require.Equal(t, expectedLastSuccess.Truncate(time.Second).UTC(),
oldInfo.Reputation.LastContactSuccess.Truncate(time.Second).UTC(), testName)
require.Equal(t, expectedLastFailure.Truncate(time.Second).UTC(),
oldInfo.Reputation.LastContactFailure.Truncate(time.Second).UTC(), testName)
}
infoCheck("First check-in", now, now, lastFail)
infoCheck("Within wait period - no information changed", now.Add(wait-time.Minute), now, lastFail)
now = now.Add(wait + time.Minute)
infoCheck("After wait period - no information changed", now, now, lastFail)
now = now.Add(time.Second)
lastFail = now
nodeInfo.IsUp = false
infoCheck("Within wait period - node taken offline", now, now.Add(-time.Second), lastFail)
now = now.Add(time.Second)
nodeInfo.IsUp = true
infoCheck("Within wait period - node back online", now, now, lastFail)
now = now.Add(time.Second)
nodeInfo.Address.Address = "127.0.2.0"
infoCheck("Within wait period - changed: Address", now, now, lastFail)
now = now.Add(time.Second)
nodeInfo.Operator.Wallet = "0x" + strings.Repeat("11", 20)
infoCheck("Within wait period - changed: Wallet", now, now, lastFail)
now = now.Add(time.Second)
nodeInfo.LastNet = "127.0.2"
infoCheck("Within wait period - changed: LastNet", now, now, lastFail)
now = now.Add(time.Second)
nodeInfo.LastIPPort = "127.0.2.0:8080"
infoCheck("Within wait period - changed: LastIPPort", now, now, lastFail)
now = now.Add(time.Second)
nodeInfo.Version.Version = "v2.0.0"
infoCheck("Within wait period - changed: Version", now, now, lastFail)
now = now.Add(time.Second)
nodeInfo.Capacity.FreeDisk = 1
infoCheck("Within wait period - changed: FreeDisk", now, now, lastFail)
})
}

View File

@ -20,10 +20,11 @@ var (
// Config is a configuration for overlay service.
type Config struct {
Node NodeSelectionConfig
NodeSelectionCache UploadSelectionCacheConfig
UpdateStatsBatchSize int `help:"number of update requests to process per transaction" default:"100"`
AuditHistory AuditHistoryConfig
Node NodeSelectionConfig
NodeSelectionCache UploadSelectionCacheConfig
UpdateStatsBatchSize int `help:"number of update requests to process per transaction" default:"100"`
AuditHistory AuditHistoryConfig
NodeCheckInWaitPeriod time.Duration `help:"the amount of time to wait before accepting a redundant check-in from a node (unmodified info since last check-in)" default:"2h" testDefault:"30s"`
}
// AsOfSystemTimeConfig is a configuration struct to enable 'AS OF SYSTEM TIME' for CRDB queries.

View File

@ -38,6 +38,7 @@ func TestMinimumDiskSpace(t *testing.T) {
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.MinimumDiskSpace = 10 * memory.MB
config.Overlay.NodeSelectionCache.Staleness = -time.Hour
config.Overlay.NodeCheckInWaitPeriod = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {

View File

@ -487,7 +487,44 @@ func (service *Service) UpdateNodeInfo(ctx context.Context, node storj.NodeID, n
// UpdateCheckIn updates a single storagenode's check-in info.
func (service *Service) UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
return service.db.UpdateCheckIn(ctx, node, timestamp, service.config.Node)
oldInfo, err := service.Get(ctx, node.NodeID)
if err != nil && !ErrNodeNotFound.Has(err) {
return Error.New("failed to get node info from DB")
}
if oldInfo == nil {
return service.db.UpdateCheckIn(ctx, node, timestamp, service.config.Node)
}
lastUp, lastDown := oldInfo.Reputation.LastContactSuccess, oldInfo.Reputation.LastContactFailure
lastContact := lastUp
if lastContact.Before(lastDown) {
lastContact = lastDown
}
dbStale := lastContact.Add(service.config.NodeCheckInWaitPeriod).Before(timestamp) ||
(node.IsUp && lastUp.Before(lastDown)) || (!node.IsUp && lastDown.Before(lastUp))
addrChanged := ((node.Address == nil) != (oldInfo.Address == nil)) ||
(oldInfo.Address != nil && node.Address != nil && oldInfo.Address.Address != node.Address.Address)
walletChanged := (node.Operator == nil && oldInfo.Operator.Wallet != "") ||
(node.Operator != nil && oldInfo.Operator.Wallet != node.Operator.Wallet)
verChanged := (node.Version == nil && oldInfo.Version.Version != "") ||
(node.Version != nil && oldInfo.Version.Version != node.Version.Version)
spaceChanged := (node.Capacity == nil && oldInfo.Capacity.FreeDisk != 0) ||
(node.Capacity != nil && node.Capacity.FreeDisk != oldInfo.Capacity.FreeDisk)
if dbStale || addrChanged || walletChanged || verChanged || spaceChanged ||
oldInfo.LastNet != node.LastNet || oldInfo.LastIPPort != node.LastIPPort {
return service.db.UpdateCheckIn(ctx, node, timestamp, service.config.Node)
}
service.log.Debug("ignoring unnecessary check-in",
zap.String("node address", node.Address.Address),
zap.Stringer("Node ID", node.NodeID))
return nil
}
// GetMissingPieces returns the list of offline nodes.

View File

@ -484,6 +484,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# The length of time spanning a single audit window
# overlay.audit-history.window-size: 12h0m0s
# the amount of time to wait before accepting a redundant check-in from a node (unmodified info since last check-in)
# overlay.node-check-in-wait-period: 2h0m0s
# disable node cache
# overlay.node-selection-cache.disabled: false

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/identity/testidentity"
@ -17,6 +18,7 @@ import (
"storj.io/common/rpc/rpcpeer"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
)
func TestStoragenodeContactEndpoint(t *testing.T) {
@ -50,6 +52,11 @@ func TestStoragenodeContactEndpoint(t *testing.T) {
func TestNodeInfoUpdated(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.NodeCheckInWaitPeriod = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
node := planet.StorageNodes[0]
@ -81,6 +88,11 @@ func TestNodeInfoUpdated(t *testing.T) {
func TestServicePingSatellites(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.NodeCheckInWaitPeriod = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
node.Contact.Chore.Pause(ctx)