satellite/gracefulexit: wait for errgroup to return

credit to Yingrong

Change-Id: I538371040d4dcdf6e943c61e8454320fd57b7526
This commit is contained in:
paul cannon 2020-01-23 12:28:32 -06:00 committed by Yingrong Zhao
parent d8e3556a22
commit 8ce9ce7f0f

View File

@ -180,11 +180,20 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
pending := NewPendingMap()
var group errgroup.Group
group.Go(func() error {
incompleteLoop := sync2.NewCycle(endpoint.interval)
defer func() {
err2 := errs2.IgnoreCanceled(group.Wait())
if err2 != nil {
endpoint.log.Error("incompleteLoop gave error", zap.Error(err2))
}
}()
// we cancel this context in all situations where we want to exit the loop
ctx, cancel := context.WithCancel(ctx)
defer cancel()
group.Go(func() error {
incompleteLoop := sync2.NewCycle(endpoint.interval)
loopErr := incompleteLoop.Run(ctx, func(ctx context.Context) error {
if pending.Length() == 0 {
incomplete, err := endpoint.db.GetIncompleteNotFailed(ctx, nodeID, endpoint.config.EndpointBatchSize, 0)
@ -223,6 +232,7 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
for {
finishedPromise := pending.IsFinishedPromise()
finished, err := finishedPromise.Wait(ctx)
err = errs2.IgnoreCanceled(err)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -318,12 +328,6 @@ func (endpoint *Endpoint) doProcess(stream processStream) (err error) {
}
}
if err := group.Wait(); err != nil {
if !errs.Is(err, context.Canceled) {
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
}
return nil
}