satellite/overlay: insert DQ event into node events in overlay.DisqualifyNode
Also, return node email from overlaycache db DisqualifyNode to be used in node events insertion Change-Id: I41534cf01351c1690c3966a8055c5fe6fcf0d6a6
This commit is contained in:
parent
f42e5049c9
commit
74ddfab810
@ -376,7 +376,7 @@ func dqNodes(ctx *testcontext.Context, overlayDB overlay.DB, storageNodes []stor
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err := overlayDB.DisqualifyNode(ctx, n, time.Now().UTC(), overlay.DisqualificationReasonUnknown)
|
_, err := overlayDB.DisqualifyNode(ctx, n, time.Now().UTC(), overlay.DisqualificationReasonUnknown)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -407,7 +407,7 @@ func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
|
|||||||
satellite := planet.Satellites[0]
|
satellite := planet.Satellites[0]
|
||||||
exitingNode := planet.StorageNodes[0]
|
exitingNode := planet.StorageNodes[0]
|
||||||
|
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL())
|
conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL())
|
||||||
@ -450,7 +450,7 @@ func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !isDisqualified {
|
if !isDisqualified {
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -267,7 +267,7 @@ func BenchmarkNodeSelection(b *testing.B) {
|
|||||||
err := overlaydb.TestSuspendNodeUnknownAudit(ctx, nodeID, now)
|
err := overlaydb.TestSuspendNodeUnknownAudit(ctx, nodeID, now)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
case 1:
|
case 1:
|
||||||
err := overlaydb.DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := overlaydb.DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
case 2:
|
case 2:
|
||||||
err := overlaydb.UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
|
err := overlaydb.UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
|
||||||
|
@ -226,7 +226,7 @@ func TestDBDisqualifyNode(t *testing.T) {
|
|||||||
err := overlayDB.UpdateCheckIn(ctx, checkIn, now, overlay.NodeSelectionConfig{})
|
err := overlayDB.UpdateCheckIn(ctx, checkIn, now, overlay.NodeSelectionConfig{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = overlayDB.DisqualifyNode(ctx, testcase.NodeID, testcase.DisqualifiedAt, testcase.Reason)
|
_, err = overlayDB.DisqualifyNode(ctx, testcase.NodeID, testcase.DisqualifiedAt, testcase.Reason)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
info, err := overlayDB.Get(ctx, testcase.NodeID)
|
info, err := overlayDB.Get(ctx, testcase.NodeID)
|
||||||
|
@ -131,7 +131,7 @@ func TestDownloadSelectionCache_GetNodes(t *testing.T) {
|
|||||||
require.Len(t, nodes, nodeCount)
|
require.Len(t, nodes, nodeCount)
|
||||||
|
|
||||||
// disqualify one node
|
// disqualify one node
|
||||||
err = db.OverlayCache().DisqualifyNode(ctx, ids[0], time.Now(), overlay.DisqualificationReasonAuditFailure)
|
_, err = db.OverlayCache().DisqualifyNode(ctx, ids[0], time.Now(), overlay.DisqualificationReasonAuditFailure)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// suspend the other node
|
// suspend the other node
|
||||||
err = db.OverlayCache().TestSuspendNodeUnknownAudit(ctx, ids[1], time.Now())
|
err = db.OverlayCache().TestSuspendNodeUnknownAudit(ctx, ids[1], time.Now())
|
||||||
|
@ -96,7 +96,7 @@ type DB interface {
|
|||||||
GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error)
|
GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error)
|
||||||
|
|
||||||
// DisqualifyNode disqualifies a storage node.
|
// DisqualifyNode disqualifies a storage node.
|
||||||
DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason DisqualificationReason) (err error)
|
DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason DisqualificationReason) (email string, err error)
|
||||||
|
|
||||||
// DQNodesLastSeenBefore disqualifies a limited number of nodes where last_contact_success < cutoff except those already disqualified
|
// DQNodesLastSeenBefore disqualifies a limited number of nodes where last_contact_success < cutoff except those already disqualified
|
||||||
// or gracefully exited or where last_contact_success = '0001-01-01 00:00:00+00'.
|
// or gracefully exited or where last_contact_success = '0001-01-01 00:00:00+00'.
|
||||||
@ -703,7 +703,17 @@ func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context
|
|||||||
// DisqualifyNode disqualifies a storage node.
|
// DisqualifyNode disqualifies a storage node.
|
||||||
func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, reason DisqualificationReason) (err error) {
|
func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, reason DisqualificationReason) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
return service.db.DisqualifyNode(ctx, nodeID, time.Now().UTC(), reason)
|
email, err := service.db.DisqualifyNode(ctx, nodeID, time.Now().UTC(), reason)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if service.config.SendNodeEmails {
|
||||||
|
_, err = service.nodeEvents.Insert(ctx, email, nodeID, nodeevents.Disqualified)
|
||||||
|
if err != nil {
|
||||||
|
service.log.Error("could not insert node disqualified into node events")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectAllStorageNodesDownload returns a nodes that are ready for downloading.
|
// SelectAllStorageNodesDownload returns a nodes that are ready for downloading.
|
||||||
|
@ -454,7 +454,7 @@ func TestKnownReliable(t *testing.T) {
|
|||||||
oc := satellite.DB.OverlayCache()
|
oc := satellite.DB.OverlayCache()
|
||||||
|
|
||||||
// Disqualify storage node #0
|
// Disqualify storage node #0
|
||||||
err := oc.DisqualifyNode(ctx, planet.StorageNodes[0].ID(), time.Now().UTC(), overlay.DisqualificationReasonUnknown)
|
_, err := oc.DisqualifyNode(ctx, planet.StorageNodes[0].ID(), time.Now().UTC(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Stop storage node #1
|
// Stop storage node #1
|
||||||
@ -954,6 +954,28 @@ func TestUpdateReputationNodeEvents(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDisqualifyNodeEmails(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||||
|
Reconfigure: testplanet.Reconfigure{
|
||||||
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
|
config.Overlay.SendNodeEmails = true
|
||||||
|
config.Overlay.Node.OnlineWindow = 4 * time.Hour
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
service := planet.Satellites[0].Overlay.Service
|
||||||
|
node := planet.StorageNodes[0]
|
||||||
|
node.Contact.Chore.Pause(ctx)
|
||||||
|
|
||||||
|
require.NoError(t, service.DisqualifyNode(ctx, node.ID(), overlay.DisqualificationReasonUnknown))
|
||||||
|
|
||||||
|
ne, err := planet.Satellites[0].DB.NodeEvents().GetLatestByEmailAndEvent(ctx, node.Config.Operator.Email, nodeevents.Disqualified)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, node.ID(), ne.NodeID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestUpdateCheckInNodeEventOnline(t *testing.T) {
|
func TestUpdateCheckInNodeEventOnline(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||||
|
@ -76,7 +76,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if tt.disqualified {
|
if tt.disqualified {
|
||||||
err = cache.DisqualifyNode(ctx, tt.nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err = cache.DisqualifyNode(ctx, tt.nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
if tt.offline {
|
if tt.offline {
|
||||||
|
@ -140,7 +140,7 @@ func testDataRepair(t *testing.T, inMemoryRepair bool, hashAlgo pb.PieceHashAlgo
|
|||||||
|
|
||||||
for _, node := range planet.StorageNodes {
|
for _, node := range planet.StorageNodes {
|
||||||
if nodesToDisqualify[node.ID()] {
|
if nodesToDisqualify[node.ID()] {
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -307,7 +307,7 @@ func TestDataRepairPendingObject(t *testing.T) {
|
|||||||
|
|
||||||
for _, node := range planet.StorageNodes {
|
for _, node := range planet.StorageNodes {
|
||||||
if nodesToDisqualify[node.ID()] {
|
if nodesToDisqualify[node.ID()] {
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -1335,7 +1335,7 @@ func TestRepairExpiredSegment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for nodeID := range nodesToDQ {
|
for nodeID := range nodesToDQ {
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1420,7 +1420,7 @@ func TestRemoveDeletedSegmentFromQueue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for nodeID := range nodesToDQ {
|
for nodeID := range nodesToDQ {
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1683,7 +1683,7 @@ func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
|
|||||||
remotePieces := segment.Pieces
|
remotePieces := segment.Pieces
|
||||||
|
|
||||||
for i := 0; i < toDQ; i++ {
|
for i := 0; i < toDQ; i++ {
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1695,7 +1695,7 @@ func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
|
|||||||
// Disqualify nodes so that online nodes < minimum threshold
|
// Disqualify nodes so that online nodes < minimum threshold
|
||||||
// This will make the segment irreparable
|
// This will make the segment irreparable
|
||||||
for _, piece := range remotePieces {
|
for _, piece := range remotePieces {
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, piece.StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, piece.StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1876,7 +1876,7 @@ func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) {
|
|||||||
// disqualify and suspend nodes
|
// disqualify and suspend nodes
|
||||||
for i := 0; i < toDisqualify; i++ {
|
for i := 0; i < toDisqualify; i++ {
|
||||||
nodesToDisqualify[remotePieces[i].StorageNode] = true
|
nodesToDisqualify[remotePieces[i].StorageNode] = true
|
||||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
_, err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
for i := toDisqualify; i < toDisqualify+toSuspend; i++ {
|
for i := toDisqualify; i < toDisqualify+toSuspend; i++ {
|
||||||
|
@ -701,7 +701,7 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DisqualifyNode disqualifies a storage node.
|
// DisqualifyNode disqualifies a storage node.
|
||||||
func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error) {
|
func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (email string, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
updateFields := dbx.Node_Update_Fields{}
|
updateFields := dbx.Node_Update_Fields{}
|
||||||
updateFields.Disqualified = dbx.Node_Disqualified(disqualifiedAt.UTC())
|
updateFields.Disqualified = dbx.Node_Disqualified(disqualifiedAt.UTC())
|
||||||
@ -709,12 +709,12 @@ func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.Node
|
|||||||
|
|
||||||
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
if dbNode == nil {
|
if dbNode == nil {
|
||||||
return errs.New("unable to get node by ID: %v", nodeID)
|
return "", errs.New("unable to get node by ID: %v", nodeID)
|
||||||
}
|
}
|
||||||
return nil
|
return dbNode.Email, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSuspendNodeUnknownAudit suspends a storage node for unknown audits.
|
// TestSuspendNodeUnknownAudit suspends a storage node for unknown audits.
|
||||||
|
Loading…
Reference in New Issue
Block a user