From ed0fa59f23f8b4353868ecc298d5053f3d7f2967 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Mon, 21 Nov 2022 18:10:27 -0600 Subject: [PATCH] satellite/overlay: add SetNodeContained() method SetNodeContained() will change the contained flag in the nodes table, which will affect whether nodes are selected for new uploads. This flag _should_ correlate with whether or not a given node has any entries in the reverification queue. However, the reverification queue is intended to be 'safely partitionable' from the nodes table, so we can't enforce that characteristic transactionally. But this is ok; there are no dire consequences if they are out of sync. We will be adding a chore that updates the contained flag based on the contents of the reverification queue periodically, if something fails to set it directly when appropriate. Refs: https://github.com/storj/storj/issues/5231 Change-Id: I26460d8718dee63fd55d00a44568b2065fc8fe30 --- satellite/overlay/service.go | 13 ++++++ satellite/satellitedb/overlaycache.go | 28 +++++++++++++ satellite/satellitedb/overlaycache_test.go | 48 ++++++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index c84b4c228..6c005c0fd 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -76,6 +76,8 @@ type DB interface { UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *InfoResponse) (stats *NodeDossier, err error) // UpdateCheckIn updates a single storagenode's check-in stats. UpdateCheckIn(ctx context.Context, node NodeCheckInInfo, timestamp time.Time, config NodeSelectionConfig) (err error) + // SetNodeContained updates the contained field for the node record. + SetNodeContained(ctx context.Context, node storj.NodeID, contained bool) (err error) // AllPieceCounts returns a map of node IDs to piece counts from the db. AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) @@ -603,6 +605,17 @@ func (service *Service) UpdateNodeInfo(ctx context.Context, node storj.NodeID, n return service.db.UpdateNodeInfo(ctx, node, nodeInfo) } +// SetNodeContained updates the contained field for the node record. If +// `contained` is true, the contained field in the record is set to the current +// database time, if it is not already set. If `contained` is false, the +// contained field in the record is set to NULL. All other fields are left +// alone. +func (service *Service) SetNodeContained(ctx context.Context, node storj.NodeID, contained bool) (err error) { + defer mon.Task()(&ctx)(&err) + + return service.db.SetNodeContained(ctx, node, contained) +} + // UpdateCheckIn updates a single storagenode's check-in info if needed. /* The check-in info is updated in the database if: diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 9c510e80a..7cb6cdc1a 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -1137,6 +1137,9 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier, if info.CountryCode != nil { node.CountryCode = location.ToCountryCode(*info.CountryCode) } + if info.Contained != nil { + node.Contained = true + } return node, nil } @@ -1426,6 +1429,31 @@ func (cache *overlaycache) UpdateCheckIn(ctx context.Context, node overlay.NodeC return nil } +// SetNodeContained updates the contained field for the node record. If +// `contained` is true, the contained field in the record is set to the current +// database time, if it is not already set. If `contained` is false, the +// contained field in the record is set to NULL. All other fields are left +// alone. +func (cache *overlaycache) SetNodeContained(ctx context.Context, nodeID storj.NodeID, contained bool) (err error) { + defer mon.Task()(&ctx)(&err) + + var query string + if contained { + // only update the timestamp if it's not already set + query = ` + UPDATE nodes SET contained = current_timestamp + WHERE id = $1 AND contained IS NULL + ` + } else { + query = ` + UPDATE nodes SET contained = NULL + WHERE id = $1 + ` + } + _, err = cache.db.DB.ExecContext(ctx, query, nodeID[:]) + return Error.Wrap(err) +} + var ( // ErrVetting is the error class for the following test methods. ErrVetting = errs.Class("vetting") diff --git a/satellite/satellitedb/overlaycache_test.go b/satellite/satellitedb/overlaycache_test.go index 09ac69a80..1417bc668 100644 --- a/satellite/satellitedb/overlaycache_test.go +++ b/satellite/satellitedb/overlaycache_test.go @@ -12,6 +12,7 @@ import ( "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" + "storj.io/common/testrand" "storj.io/storj/private/teststorj" "storj.io/storj/satellite" "storj.io/storj/satellite/overlay" @@ -136,3 +137,50 @@ func TestUpdateLastOfflineEmail(t *testing.T) { require.Equal(t, now.Truncate(time.Second), node1.LastOfflineEmail.Truncate(time.Second)) }) } + +func TestSetNodeContained(t *testing.T) { + satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { + cache := db.OverlayCache() + + nodeID := testrand.NodeID() + checkInInfo := overlay.NodeCheckInInfo{ + IsUp: true, + Address: &pb.NodeAddress{ + Address: "1.2.3.4", + }, + Version: &pb.NodeVersion{ + Version: "v0.0.0", + CommitHash: "", + Timestamp: time.Time{}, + Release: false, + }, + Operator: &pb.NodeOperator{ + Email: "offline@storj.test", + }, + } + + now := time.Now() + + // offline node should be selected + checkInInfo.NodeID = nodeID + require.NoError(t, cache.UpdateCheckIn(ctx, checkInInfo, now.Add(-24*time.Hour), overlay.NodeSelectionConfig{})) + + cacheInfo, err := cache.Get(ctx, nodeID) + require.NoError(t, err) + require.False(t, cacheInfo.Contained) + + err = cache.SetNodeContained(ctx, nodeID, true) + require.NoError(t, err) + + cacheInfo, err = cache.Get(ctx, nodeID) + require.NoError(t, err) + require.True(t, cacheInfo.Contained) + + err = cache.SetNodeContained(ctx, nodeID, false) + require.NoError(t, err) + + cacheInfo, err = cache.Get(ctx, nodeID) + require.NoError(t, err) + require.False(t, cacheInfo.Contained) + }) +}