satellite/satellitedb: Do not consider nodes with offline_suspended as reputable.
Nodes which are offline_suspended will no longer be considered for new uploads. The current threshold that enters a node into offline suspension is 0.6. Disqualification for offline suspension is still disabled. Change-Id: I0da9abf47167dd5bf6bb21e0bc2186e003e38d1a
This commit is contained in:
parent
ca12e98d5d
commit
e24262c2c9
@ -102,6 +102,10 @@ type DB interface {
|
|||||||
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
|
SuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
|
||||||
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
|
// UnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
|
||||||
UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
|
UnsuspendNodeUnknownAudit(ctx context.Context, nodeID storj.NodeID) (err error)
|
||||||
|
// SuspendNodeOfflineAudit suspends a storage node for offline audits.
|
||||||
|
SuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error)
|
||||||
|
// UnsuspendNodeOfflineAudit unsuspends a storage node for offline audits.
|
||||||
|
UnsuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID) (err error)
|
||||||
|
|
||||||
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier
|
// TestVetNode directly sets a node's vetted_at timestamp to make testing easier
|
||||||
TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error)
|
TestVetNode(ctx context.Context, nodeID storj.NodeID) (vettedTime *time.Time, err error)
|
||||||
|
@ -463,11 +463,15 @@ func TestKnownReliable(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, service.IsOnline(node))
|
require.False(t, service.IsOnline(node))
|
||||||
|
|
||||||
// Suspend storage node #2
|
// Suspend storage node #2 for unknown audits
|
||||||
err = satellite.DB.OverlayCache().SuspendNodeUnknownAudit(ctx, planet.StorageNodes[2].ID(), time.Now())
|
err = satellite.DB.OverlayCache().SuspendNodeUnknownAudit(ctx, planet.StorageNodes[2].ID(), time.Now())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Check that only storage nodes #3 and #4 are reliable
|
// Suspend storage node #3 for offline audits
|
||||||
|
err = satellite.DB.OverlayCache().SuspendNodeOfflineAudit(ctx, planet.StorageNodes[3].ID(), time.Now())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Check that only storage nodes #4 is reliable
|
||||||
result, err := service.KnownReliable(ctx, []storj.NodeID{
|
result, err := service.KnownReliable(ctx, []storj.NodeID{
|
||||||
planet.StorageNodes[0].ID(),
|
planet.StorageNodes[0].ID(),
|
||||||
planet.StorageNodes[1].ID(),
|
planet.StorageNodes[1].ID(),
|
||||||
@ -476,11 +480,10 @@ func TestKnownReliable(t *testing.T) {
|
|||||||
planet.StorageNodes[4].ID(),
|
planet.StorageNodes[4].ID(),
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, result, 2)
|
require.Len(t, result, 1)
|
||||||
|
|
||||||
// Sort the storage nodes for predictable checks
|
// Sort the storage nodes for predictable checks
|
||||||
expectedReliable := []storj.NodeURL{
|
expectedReliable := []storj.NodeURL{
|
||||||
planet.StorageNodes[3].NodeURL(),
|
|
||||||
planet.StorageNodes[4].NodeURL(),
|
planet.StorageNodes[4].NodeURL(),
|
||||||
}
|
}
|
||||||
sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) })
|
sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].ID.Less(expectedReliable[j].ID) })
|
||||||
|
@ -181,6 +181,7 @@ func nodeSelectionCondition(ctx context.Context, criteria *overlay.NodeCriteria,
|
|||||||
var conds conditions
|
var conds conditions
|
||||||
conds.add(`disqualified IS NULL`)
|
conds.add(`disqualified IS NULL`)
|
||||||
conds.add(`unknown_audit_suspended IS NULL`)
|
conds.add(`unknown_audit_suspended IS NULL`)
|
||||||
|
conds.add(`offline_suspended IS NULL`)
|
||||||
conds.add(`exit_initiated_at IS NULL`)
|
conds.add(`exit_initiated_at IS NULL`)
|
||||||
|
|
||||||
conds.add(`type = ?`, int(pb.NodeType_STORAGE))
|
conds.add(`type = ?`, int(pb.NodeType_STORAGE))
|
||||||
|
@ -63,6 +63,7 @@ func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, sele
|
|||||||
FROM nodes ` + asOf + `
|
FROM nodes ` + asOf + `
|
||||||
WHERE disqualified IS NULL
|
WHERE disqualified IS NULL
|
||||||
AND unknown_audit_suspended IS NULL
|
AND unknown_audit_suspended IS NULL
|
||||||
|
AND offline_suspended IS NULL
|
||||||
AND exit_initiated_at IS NULL
|
AND exit_initiated_at IS NULL
|
||||||
AND type = $1
|
AND type = $1
|
||||||
AND free_disk >= $2
|
AND free_disk >= $2
|
||||||
@ -313,6 +314,7 @@ func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteri
|
|||||||
WHERE id = any($1::bytea[])
|
WHERE id = any($1::bytea[])
|
||||||
AND disqualified IS NULL
|
AND disqualified IS NULL
|
||||||
AND unknown_audit_suspended IS NULL
|
AND unknown_audit_suspended IS NULL
|
||||||
|
AND offline_suspended IS NULL
|
||||||
AND exit_finished_at IS NULL
|
AND exit_finished_at IS NULL
|
||||||
AND last_contact_success > $2
|
AND last_contact_success > $2
|
||||||
`), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-criteria.OnlineWindow),
|
`), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-criteria.OnlineWindow),
|
||||||
@ -369,6 +371,7 @@ func (cache *overlaycache) knownReliable(ctx context.Context, onlineWindow time.
|
|||||||
WHERE id = any($1::bytea[])
|
WHERE id = any($1::bytea[])
|
||||||
AND disqualified IS NULL
|
AND disqualified IS NULL
|
||||||
AND unknown_audit_suspended IS NULL
|
AND unknown_audit_suspended IS NULL
|
||||||
|
AND offline_suspended IS NULL
|
||||||
AND exit_finished_at IS NULL
|
AND exit_finished_at IS NULL
|
||||||
AND last_contact_success > $2
|
AND last_contact_success > $2
|
||||||
`), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow),
|
`), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow),
|
||||||
@ -417,6 +420,7 @@ func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeC
|
|||||||
SELECT id FROM nodes `+asOf+`
|
SELECT id FROM nodes `+asOf+`
|
||||||
WHERE disqualified IS NULL
|
WHERE disqualified IS NULL
|
||||||
AND unknown_audit_suspended IS NULL
|
AND unknown_audit_suspended IS NULL
|
||||||
|
AND offline_suspended IS NULL
|
||||||
AND exit_finished_at IS NULL
|
AND exit_finished_at IS NULL
|
||||||
AND last_contact_success > ?
|
AND last_contact_success > ?
|
||||||
`), time.Now().Add(-criteria.OnlineWindow))
|
`), time.Now().Add(-criteria.OnlineWindow))
|
||||||
@ -746,6 +750,38 @@ func (cache *overlaycache) UnsuspendNodeUnknownAudit(ctx context.Context, nodeID
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SuspendNodeOfflineAudit suspends a storage node for offline audits.
|
||||||
|
func (cache *overlaycache) SuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID, suspendedAt time.Time) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
updateFields := dbx.Node_Update_Fields{}
|
||||||
|
updateFields.OfflineSuspended = dbx.Node_OfflineSuspended(suspendedAt.UTC())
|
||||||
|
|
||||||
|
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if dbNode == nil {
|
||||||
|
return errs.New("unable to get node by ID: %v", nodeID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnsuspendNodeOfflineAudit unsuspends a storage node for offline audits.
|
||||||
|
func (cache *overlaycache) UnsuspendNodeOfflineAudit(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||||||
|
defer mon.Task()(&ctx)(&err)
|
||||||
|
updateFields := dbx.Node_Update_Fields{}
|
||||||
|
updateFields.OfflineSuspended = dbx.Node_OfflineSuspended_Null()
|
||||||
|
|
||||||
|
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if dbNode == nil {
|
||||||
|
return errs.New("unable to get node by ID: %v", nodeID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// AllPieceCounts returns a map of node IDs to piece counts from the db.
|
// AllPieceCounts returns a map of node IDs to piece counts from the db.
|
||||||
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
|
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
|
||||||
func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int, err error) {
|
func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int, err error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user