satellite/nodeevents: take last_attempted into account when selecting
Previously, the node events chore would select based on the earliest created_at. However, if for some reason this batch fails, it would still be the next item to select. If there is a consistent error, the chore would be stuck retrying the same batch over and over. Now instead GetNextBatch orders by `last_attempted NULLS FIRST ASC, created_at ASC`. If a batch fails during Notify, last_attempted is updated so we can move on to a new batch if one exists. Change-Id: Ia8458e05ac358d85b2f2c6d690f3d607d631be61
This commit is contained in:
parent
76c22fa572
commit
08c9d745f1
@ -96,6 +96,7 @@ func (chore *Chore) process(ctx context.Context) (n int, err error) {
|
||||
}
|
||||
|
||||
if err = chore.notifier.Notify(ctx, chore.satellite, batch); err != nil {
|
||||
err = errs.Combine(err, chore.db.UpdateLastAttempted(ctx, rowIDs, chore.nowFn()))
|
||||
return 0, err
|
||||
}
|
||||
|
||||
|
@ -9,9 +9,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/nodeevents"
|
||||
@ -34,6 +36,20 @@ func (tn *TestNotifier) Notify(ctx context.Context, satellite string, events []n
|
||||
return nil
|
||||
}
|
||||
|
||||
type ErrorNotifier struct {
|
||||
errCount int
|
||||
errID uuid.UUID
|
||||
}
|
||||
|
||||
func (errN *ErrorNotifier) Notify(ctx context.Context, satellite string, events []nodeevents.NodeEvent) error {
|
||||
if len(events) == 0 {
|
||||
return errs.New("This shouldn't happen")
|
||||
}
|
||||
errN.errCount++
|
||||
errN.errID = events[0].ID
|
||||
return errs.New("test error")
|
||||
}
|
||||
|
||||
func TestNodeEventsChore(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
||||
@ -99,3 +115,42 @@ func TestNodeEventsChore(t *testing.T) {
|
||||
require.True(t, foundEvent2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestNodeEventsChoreFailedNotify(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Overlay.SendNodeEmails = true
|
||||
config.NodeEvents.SelectionWaitPeriod = 5 * time.Minute
|
||||
},
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
node0 := planet.StorageNodes[0]
|
||||
|
||||
chore := sat.NodeEvents.Chore
|
||||
chore.Loop.Pause()
|
||||
|
||||
errN := &ErrorNotifier{}
|
||||
chore.SetNotifier(errN)
|
||||
|
||||
// DQ nodes. Should create a node events in nodeevents DB.
|
||||
require.NoError(t, sat.Overlay.Service.DisqualifyNode(ctx, node0.ID(), overlay.DisqualificationReasonUnknown))
|
||||
|
||||
// Now, set nowFn on chore to 5 minutes in the future to test that chore does notify for the events.
|
||||
futureTime := func() time.Time {
|
||||
return time.Now().Add(5 * time.Minute)
|
||||
}
|
||||
chore.SetNow(futureTime)
|
||||
|
||||
// Trigger chore and check that error occurred, that last_attempted has been updated, and email_sent is null
|
||||
chore.Loop.TriggerWait()
|
||||
require.Equal(t, 1, errN.errCount)
|
||||
|
||||
event, err := sat.DB.NodeEvents().GetByID(ctx, errN.errID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, event.LastAttempted)
|
||||
require.Nil(t, event.EmailSent)
|
||||
})
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ func (ne *nodeEvents) GetNextBatch(ctx context.Context, firstSeenBefore time.Tim
|
||||
FROM node_events
|
||||
WHERE created_at < $1
|
||||
AND email_sent is NULL
|
||||
ORDER BY created_at ASC
|
||||
ORDER BY last_attempted ASC NULLS FIRST, created_at ASC
|
||||
LIMIT 1
|
||||
) as t
|
||||
ON node_events.email = t.email
|
||||
|
@ -194,49 +194,50 @@ func TestNodeEventsGetNextBatch(t *testing.T) {
|
||||
|
||||
func TestNodeEventsGetNextBatchSelectionOrder(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
testID1 := teststorj.NodeIDFromString("test1")
|
||||
testID2 := teststorj.NodeIDFromString("test2")
|
||||
testEmail1 := "test1@storj.test"
|
||||
testEmail2 := "test2@storj.test"
|
||||
id0 := teststorj.NodeIDFromString("test0")
|
||||
id1 := teststorj.NodeIDFromString("test1")
|
||||
id2 := teststorj.NodeIDFromString("test2")
|
||||
id3 := teststorj.NodeIDFromString("test3")
|
||||
|
||||
email0 := "test0@storj.test"
|
||||
email1 := "test1@storj.test"
|
||||
email2 := "test2@storj.test"
|
||||
email3 := "test3@storj.test"
|
||||
|
||||
eventType := nodeevents.Disqualified
|
||||
|
||||
event1, err := db.NodeEvents().Insert(ctx, testEmail1, testID1, eventType)
|
||||
require.NoError(t, err)
|
||||
|
||||
// insert one event with same email and event type, but with different node ID. It should be selected.
|
||||
event2, err := db.NodeEvents().Insert(ctx, testEmail1, testID2, eventType)
|
||||
require.NoError(t, err)
|
||||
|
||||
// insert one event with same email and event type, but email_sent is not null. Should not be selected.
|
||||
event3, err := db.NodeEvents().Insert(ctx, testEmail1, testID1, eventType)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, db.NodeEvents().UpdateEmailSent(ctx, []uuid.UUID{event3.ID}, time.Now()))
|
||||
|
||||
// insert one event with same email, but different type. Should not be selected.
|
||||
_, err = db.NodeEvents().Insert(ctx, testEmail1, testID1, nodeevents.BelowMinVersion)
|
||||
require.NoError(t, err)
|
||||
|
||||
// insert one event with same event type, but different email. Should not be selected.
|
||||
_, err = db.NodeEvents().Insert(ctx, testEmail2, testID1, eventType)
|
||||
require.NoError(t, err)
|
||||
|
||||
batch, err := db.NodeEvents().GetNextBatch(ctx, time.Now())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, batch, 2)
|
||||
|
||||
var foundEvent1, foundEvent2 bool
|
||||
for _, ne := range batch {
|
||||
switch ne.NodeID {
|
||||
case event1.NodeID:
|
||||
foundEvent1 = true
|
||||
case event2.NodeID:
|
||||
foundEvent2 = true
|
||||
default:
|
||||
}
|
||||
// GetNextBatch orders by last_attempted, created_at asc
|
||||
// expected selection order:
|
||||
// 1. insert0: last_attempted = nil, created_at = earliest
|
||||
// 2. insert3: last_attempted = nil, created_at = 3rd earliest
|
||||
// 3. insert2: last_attempted != nil, created_at = 4th earliest
|
||||
// 4. insert1: last_attempted later than insert2, created_at = 2nd earliest
|
||||
expectedOrder := []string{
|
||||
email0, email3, email2, email1,
|
||||
}
|
||||
|
||||
_, err := db.NodeEvents().Insert(ctx, email0, id0, eventType)
|
||||
require.NoError(t, err)
|
||||
|
||||
insert1, err := db.NodeEvents().Insert(ctx, email1, id1, eventType)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.NodeEvents().UpdateLastAttempted(ctx, []uuid.UUID{insert1.ID}, time.Now().Add(5*time.Minute)))
|
||||
|
||||
insert2, err := db.NodeEvents().Insert(ctx, email2, id2, eventType)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.NodeEvents().UpdateLastAttempted(ctx, []uuid.UUID{insert2.ID}, time.Now()))
|
||||
|
||||
_, err = db.NodeEvents().Insert(ctx, email3, id3, eventType)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
e, err := db.NodeEvents().GetNextBatch(ctx, time.Now())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedOrder[i], e[0].Email)
|
||||
|
||||
require.NoError(t, db.NodeEvents().UpdateEmailSent(ctx, []uuid.UUID{e[0].ID}, time.Now()))
|
||||
}
|
||||
require.True(t, foundEvent1)
|
||||
require.True(t, foundEvent2)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user