satellite/overlay: add SetNodeContained() method

SetNodeContained() will change the contained flag in the nodes table,
which will affect whether nodes are selected for new uploads. This flag
_should_ correlate with whether or not a given node has any entries in
the reverification queue. However, the reverification queue is intended
to be 'safely partitionable' from the nodes table, so we can't enforce
that characteristic transactionally. But this is ok; there are no dire
consequences if they are out of sync.

We will be adding a chore that updates the contained flag based on the
contents of the reverification queue periodically, if something fails
to set it directly when appropriate.

Refs: https://github.com/storj/storj/issues/5231
Change-Id: I26460d8718dee63fd55d00a44568b2065fc8fe30
This commit is contained in:
paul cannon 2022-11-21 18:10:27 -06:00 committed by Storj Robot
parent fba39b72b8
commit ed0fa59f23
3 changed files with 89 additions and 0 deletions

View File

@ -76,6 +76,8 @@ type DB interface {
UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *InfoResponse) (stats *NodeDossier, err error)
// UpdateCheckIn updates a single storagenode's check-in stats.
UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time, config NodeSelectionConfig) (err error)
// SetNodeContained updates the contained field for the node record.
SetNodeContained(ctx context.Context, node storj.NodeID, contained bool) (err error)
// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error)
@ -603,6 +605,17 @@ func (service *Service) UpdateNodeInfo(ctx context.Context, node storj.NodeID, n
return service.db.UpdateNodeInfo(ctx, node, nodeInfo)
}
// SetNodeContained updates the contained field for the node record. If
// `contained` is true, the contained field in the record is set to the current
// database time, if it is not already set. If `contained` is false, the
// contained field in the record is set to NULL. All other fields are left
// alone.
func (service *Service) SetNodeContained(ctx context.Context, node storj.NodeID, contained bool) (err error) {
defer mon.Task()(&ctx)(&err)
return service.db.SetNodeContained(ctx, node, contained)
}
// UpdateCheckIn updates a single storagenode's check-in info if needed.
/*
The check-in info is updated in the database if:

View File

@ -1137,6 +1137,9 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
if info.CountryCode != nil {
node.CountryCode = location.ToCountryCode(*info.CountryCode)
}
if info.Contained != nil {
node.Contained = true
}
return node, nil
}
@ -1426,6 +1429,31 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC
return nil
}
// SetNodeContained updates the contained field for the node record. If
// `contained` is true, the contained field in the record is set to the current
// database time, if it is not already set. If `contained` is false, the
// contained field in the record is set to NULL. All other fields are left
// alone.
func (cache *overlaycache) SetNodeContained(ctx context.Context, nodeID storj.NodeID, contained bool) (err error) {
defer mon.Task()(&ctx)(&err)
var query string
if contained {
// only update the timestamp if it's not already set
query = `
UPDATE nodes SET contained = current_timestamp
WHERE id = $1 AND contained IS NULL
`
} else {
query = `
UPDATE nodes SET contained = NULL
WHERE id = $1
`
}
_, err = cache.db.DB.ExecContext(ctx, query, nodeID[:])
return Error.Wrap(err)
}
var (
// ErrVetting is the error class for the following test methods.
ErrVetting = errs.Class("vetting")

View File

@ -12,6 +12,7 @@ import (
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/teststorj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
@ -136,3 +137,50 @@ func TestUpdateLastOfflineEmail(t *testing.T) {
require.Equal(t, now.Truncate(time.Second), node1.LastOfflineEmail.Truncate(time.Second))
})
}
func TestSetNodeContained(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := db.OverlayCache()
nodeID := testrand.NodeID()
checkInInfo := overlay.NodeCheckInInfo{
IsUp: true,
Address: &pb.NodeAddress{
Address: "1.2.3.4",
},
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
Operator: &pb.NodeOperator{
Email: "offline@storj.test",
},
}
now := time.Now()
// offline node should be selected
checkInInfo.NodeID = nodeID
require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now.Add(-24*time.Hour), overlay.NodeSelectionConfig{}))
cacheInfo, err := cache.Get(ctx, nodeID)
require.NoError(t, err)
require.False(t, cacheInfo.Contained)
err = cache.SetNodeContained(ctx, nodeID, true)
require.NoError(t, err)
cacheInfo, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
require.True(t, cacheInfo.Contained)
err = cache.SetNodeContained(ctx, nodeID, false)
require.NoError(t, err)
cacheInfo, err = cache.Get(ctx, nodeID)
require.NoError(t, err)
require.False(t, cacheInfo.Contained)
})
}