diff --git a/satellite/nodeevents/chore.go b/satellite/nodeevents/chore.go index 35b3648f2..039e3bbf0 100644 --- a/satellite/nodeevents/chore.go +++ b/satellite/nodeevents/chore.go @@ -96,6 +96,7 @@ func (chore *Chore) process(ctx context.Context) (n int, err error) { } if err = chore.notifier.Notify(ctx, chore.satellite, batch); err != nil { + err = errs.Combine(err, chore.db.UpdateLastAttempted(ctx, rowIDs, chore.nowFn())) return 0, err } diff --git a/satellite/nodeevents/chore_test.go b/satellite/nodeevents/chore_test.go index f6a03e175..9de884ca7 100644 --- a/satellite/nodeevents/chore_test.go +++ b/satellite/nodeevents/chore_test.go @@ -9,9 +9,11 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/zeebo/errs" "go.uber.org/zap" "storj.io/common/testcontext" + "storj.io/common/uuid" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/nodeevents" @@ -34,6 +36,20 @@ func (tn *TestNotifier) Notify(ctx context.Context, satellite string, events []n return nil } +type ErrorNotifier struct { + errCount int + errID uuid.UUID +} + +func (errN *ErrorNotifier) Notify(ctx context.Context, satellite string, events []nodeevents.NodeEvent) error { + if len(events) == 0 { + return errs.New("This shouldn't happen") + } + errN.errCount++ + errN.errID = events[0].ID + return errs.New("test error") +} + func TestNodeEventsChore(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, @@ -99,3 +115,42 @@ func TestNodeEventsChore(t *testing.T) { require.True(t, foundEvent2) }) } + +func TestNodeEventsChoreFailedNotify(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.SendNodeEmails = true + config.NodeEvents.SelectionWaitPeriod = 5 * time.Minute + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + sat := planet.Satellites[0] + node0 := planet.StorageNodes[0] + + chore := sat.NodeEvents.Chore + chore.Loop.Pause() + + errN := &ErrorNotifier{} + chore.SetNotifier(errN) + + // DQ nodes. Should create a node events in nodeevents DB. + require.NoError(t, sat.Overlay.Service.DisqualifyNode(ctx, node0.ID(), overlay.DisqualificationReasonUnknown)) + + // Now, set nowFn on chore to 5 minutes in the future to test that chore does notify for the events. + futureTime := func() time.Time { + return time.Now().Add(5 * time.Minute) + } + chore.SetNow(futureTime) + + // Trigger chore and check that error occurred, that last_attempted has been updated, and email_sent is null + chore.Loop.TriggerWait() + require.Equal(t, 1, errN.errCount) + + event, err := sat.DB.NodeEvents().GetByID(ctx, errN.errID) + require.NoError(t, err) + require.NotNil(t, event.LastAttempted) + require.Nil(t, event.EmailSent) + }) +} diff --git a/satellite/satellitedb/nodeevents.go b/satellite/satellitedb/nodeevents.go index 4593a2007..c22749925 100644 --- a/satellite/satellitedb/nodeevents.go +++ b/satellite/satellitedb/nodeevents.go @@ -82,7 +82,7 @@ func (ne *nodeEvents) GetNextBatch(ctx context.Context, firstSeenBefore time.Tim FROM node_events WHERE created_at < $1 AND email_sent is NULL - ORDER BY created_at ASC + ORDER BY last_attempted ASC NULLS FIRST, created_at ASC LIMIT 1 ) as t ON node_events.email = t.email diff --git a/satellite/satellitedb/nodeevents_test.go b/satellite/satellitedb/nodeevents_test.go index 46a2707fa..2010fe2b5 100644 --- a/satellite/satellitedb/nodeevents_test.go +++ b/satellite/satellitedb/nodeevents_test.go @@ -194,49 +194,50 @@ func TestNodeEventsGetNextBatch(t *testing.T) { func TestNodeEventsGetNextBatchSelectionOrder(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { - testID1 := teststorj.NodeIDFromString("test1") - testID2 := teststorj.NodeIDFromString("test2") - testEmail1 := "test1@storj.test" - testEmail2 := "test2@storj.test" + id0 := teststorj.NodeIDFromString("test0") + id1 := teststorj.NodeIDFromString("test1") + id2 := teststorj.NodeIDFromString("test2") + id3 := teststorj.NodeIDFromString("test3") + + email0 := "test0@storj.test" + email1 := "test1@storj.test" + email2 := "test2@storj.test" + email3 := "test3@storj.test" eventType := nodeevents.Disqualified - event1, err := db.NodeEvents().Insert(ctx, testEmail1, testID1, eventType) - require.NoError(t, err) - - // insert one event with same email and event type, but with different node ID. It should be selected. - event2, err := db.NodeEvents().Insert(ctx, testEmail1, testID2, eventType) - require.NoError(t, err) - - // insert one event with same email and event type, but email_sent is not null. Should not be selected. - event3, err := db.NodeEvents().Insert(ctx, testEmail1, testID1, eventType) - require.NoError(t, err) - - require.NoError(t, db.NodeEvents().UpdateEmailSent(ctx, []uuid.UUID{event3.ID}, time.Now())) - - // insert one event with same email, but different type. Should not be selected. - _, err = db.NodeEvents().Insert(ctx, testEmail1, testID1, nodeevents.BelowMinVersion) - require.NoError(t, err) - - // insert one event with same event type, but different email. Should not be selected. - _, err = db.NodeEvents().Insert(ctx, testEmail2, testID1, eventType) - require.NoError(t, err) - - batch, err := db.NodeEvents().GetNextBatch(ctx, time.Now()) - require.NoError(t, err) - require.Len(t, batch, 2) - - var foundEvent1, foundEvent2 bool - for _, ne := range batch { - switch ne.NodeID { - case event1.NodeID: - foundEvent1 = true - case event2.NodeID: - foundEvent2 = true - default: - } + // GetNextBatch orders by last_attempted, created_at asc + // expected selection order: + // 1. insert0: last_attempted = nil, created_at = earliest + // 2. insert3: last_attempted = nil, created_at = 3rd earliest + // 3. insert2: last_attempted != nil, created_at = 4th earliest + // 4. insert1: last_attempted later than insert2, created_at = 2nd earliest + expectedOrder := []string{ + email0, email3, email2, email1, + } + + _, err := db.NodeEvents().Insert(ctx, email0, id0, eventType) + require.NoError(t, err) + + insert1, err := db.NodeEvents().Insert(ctx, email1, id1, eventType) + require.NoError(t, err) + require.NoError(t, err) + require.NoError(t, db.NodeEvents().UpdateLastAttempted(ctx, []uuid.UUID{insert1.ID}, time.Now().Add(5*time.Minute))) + + insert2, err := db.NodeEvents().Insert(ctx, email2, id2, eventType) + require.NoError(t, err) + require.NoError(t, err) + require.NoError(t, db.NodeEvents().UpdateLastAttempted(ctx, []uuid.UUID{insert2.ID}, time.Now())) + + _, err = db.NodeEvents().Insert(ctx, email3, id3, eventType) + require.NoError(t, err) + + for i := 0; i < 4; i++ { + e, err := db.NodeEvents().GetNextBatch(ctx, time.Now()) + require.NoError(t, err) + require.Equal(t, expectedOrder[i], e[0].Email) + + require.NoError(t, db.NodeEvents().UpdateEmailSent(ctx, []uuid.UUID{e[0].ID}, time.Now())) } - require.True(t, foundEvent1) - require.True(t, foundEvent2) }) }