From e24262c2c9d7b2d48897d0de767f8012d95c444c Mon Sep 17 00:00:00 2001 From: Moby von Briesen Date: Fri, 4 Dec 2020 17:21:07 -0500 Subject: [PATCH] satellite/satellitedb: Do not consider nodes with offline_suspended as reputable. Nodes which are offline_suspended will no longer be considered for new uploads. The current threshold that enters a node into offline suspension is 0.6. Disqualification for offline suspension is still disabled. Change-Id: I0da9abf47167dd5bf6bb21e0bc2186e003e38d1a --- satellite/overlay/service.go | 4 +++ satellite/overlay/service_test.go | 11 +++++--- satellite/satellitedb/nodeselection.go | 1 + satellite/satellitedb/overlaycache.go | 36 ++++++++++++++++++++++++++ 4 files changed, 48 insertions(+), 4 deletions(-) diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 772110802..2e096d964 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -102,6 +102,10 @@ type DB interface { SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) // UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits. UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error) + // SuspendNodeOfflineAudit suspends a storage node for offline audits. + SuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) + // UnsuspendNodeOfflineAudit unsuspends a storage node for offline audits. + UnsuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID) (err error) // TestVetNode directly sets a node's vetted_at timestamp to make testing easier TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error) diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 902054d24..44ccfc0b1 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -463,11 +463,15 @@ func TestKnownReliable(t *testing.T) { require.NoError(t, err) require.False(t, service.IsOnline(node)) - // Suspend storage node #2 + // Suspend storage node #2 for unknown audits err = satellite.DB.OverlayCache().SuspendNodeUnknownAudit(ctx, planet.StorageNodes[2].ID(), time.Now()) require.NoError(t, err) - // Check that only storage nodes #3 and #4 are reliable + // Suspend storage node #3 for offline audits + err = satellite.DB.OverlayCache().SuspendNodeOfflineAudit(ctx, planet.StorageNodes[3].ID(), time.Now()) + require.NoError(t, err) + + // Check that only storage nodes #4 is reliable result, err := service.KnownReliable(ctx, []storj.NodeID{ planet.StorageNodes[0].ID(), planet.StorageNodes[1].ID(), @@ -476,11 +480,10 @@ func TestKnownReliable(t *testing.T) { planet.StorageNodes[4].ID(), }) require.NoError(t, err) - require.Len(t, result, 2) + require.Len(t, result, 1) // Sort the storage nodes for predictable checks expectedReliable := []storj.NodeURL{ - planet.StorageNodes[3].NodeURL(), planet.StorageNodes[4].NodeURL(), } sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) }) diff --git a/satellite/satellitedb/nodeselection.go b/satellite/satellitedb/nodeselection.go index 6c506c400..60bab44e6 100644 --- a/satellite/satellitedb/nodeselection.go +++ b/satellite/satellitedb/nodeselection.go @@ -181,6 +181,7 @@ func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria, var conds conditions conds.add(`disqualified IS NULL`) conds.add(`unknown_audit_suspended IS NULL`) + conds.add(`offline_suspended IS NULL`) conds.add(`exit_initiated_at IS NULL`) conds.add(`type = ?`, int(pb.NodeType_STORAGE)) diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 6ce1a3edd..ea22a2349 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -63,6 +63,7 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele FROM nodes ` + asOf + ` WHERE disqualified IS NULL AND unknown_audit_suspended IS NULL + AND offline_suspended IS NULL AND exit_initiated_at IS NULL AND type = $1 AND free_disk >= $2 @@ -313,6 +314,7 @@ func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteri WHERE id = any($1::bytea[]) AND disqualified IS NULL AND unknown_audit_suspended IS NULL + AND offline_suspended IS NULL AND exit_finished_at IS NULL AND last_contact_success > $2 `), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-criteria.OnlineWindow), @@ -369,6 +371,7 @@ func (cache *overlaycache) knownReliable(ctx context.Context, onlineWindow time. WHERE id = any($1::bytea[]) AND disqualified IS NULL AND unknown_audit_suspended IS NULL + AND offline_suspended IS NULL AND exit_finished_at IS NULL AND last_contact_success > $2 `), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow), @@ -417,6 +420,7 @@ func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeC SELECT id FROM nodes `+asOf+` WHERE disqualified IS NULL AND unknown_audit_suspended IS NULL + AND offline_suspended IS NULL AND exit_finished_at IS NULL AND last_contact_success > ? `), time.Now().Add(-criteria.OnlineWindow)) @@ -746,6 +750,38 @@ func (cache *overlaycache) UnsuspendNodeUnknownAudit(ctx context.Context, nodeID return nil } +// SuspendNodeOfflineAudit suspends a storage node for offline audits. +func (cache *overlaycache) SuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) { + defer mon.Task()(&ctx)(&err) + updateFields := dbx.Node_Update_Fields{} + updateFields.OfflineSuspended = dbx.Node_OfflineSuspended(suspendedAt.UTC()) + + dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields) + if err != nil { + return err + } + if dbNode == nil { + return errs.New("unable to get node by ID: %v", nodeID) + } + return nil +} + +// UnsuspendNodeOfflineAudit unsuspends a storage node for offline audits. +func (cache *overlaycache) UnsuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID) (err error) { + defer mon.Task()(&ctx)(&err) + updateFields := dbx.Node_Update_Fields{} + updateFields.OfflineSuspended = dbx.Node_OfflineSuspended_Null() + + dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields) + if err != nil { + return err + } + if dbNode == nil { + return errs.New("unable to get node by ID: %v", nodeID) + } + return nil +} + // AllPieceCounts returns a map of node IDs to piece counts from the db. // NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned. func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int, err error) {