From b712fbcbb04b61da2d8e7107457742dcb8446e9f Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 2 Jan 2019 17:00:32 +0100 Subject: [PATCH] Fix 'empty queue' error when satellite starts (#939) --- pkg/datarepair/queue/queue.go | 2 +- pkg/datarepair/repairer/repairer.go | 2 +- satellite/satellitedb/repairqueue.go | 2 +- storage/common.go | 2 +- storage/redis/client_queue.go | 2 +- storage/testqueue/queue.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/datarepair/queue/queue.go b/pkg/datarepair/queue/queue.go index 34ccd0bbd..bf93f86b8 100644 --- a/pkg/datarepair/queue/queue.go +++ b/pkg/datarepair/queue/queue.go @@ -49,7 +49,7 @@ func (q *Queue) Enqueue(ctx context.Context, qi *pb.InjuredSegment) error { func (q *Queue) Dequeue(ctx context.Context) (pb.InjuredSegment, error) { val, err := q.db.Dequeue() if err != nil { - if err == storage.ErrEmptyQueue { + if storage.ErrEmptyQueue.Has(err) { return pb.InjuredSegment{}, err } return pb.InjuredSegment{}, Error.New("error obtaining item from repair queue %s", err) diff --git a/pkg/datarepair/repairer/repairer.go b/pkg/datarepair/repairer/repairer.go index a4d86262d..09b897f07 100644 --- a/pkg/datarepair/repairer/repairer.go +++ b/pkg/datarepair/repairer/repairer.go @@ -62,7 +62,7 @@ func (service *repairService) Run(ctx context.Context) (err error) { func (service *repairService) process(ctx context.Context) error { seg, err := service.queue.Dequeue(ctx) if err != nil { - if err == storage.ErrEmptyQueue { + if storage.ErrEmptyQueue.Has(err) { return nil } return err diff --git a/satellite/satellitedb/repairqueue.go b/satellite/satellitedb/repairqueue.go index f1a814adc..351e60b8d 100644 --- a/satellite/satellitedb/repairqueue.go +++ b/satellite/satellitedb/repairqueue.go @@ -42,7 +42,7 @@ func (r *repairQueue) Dequeue(ctx context.Context) (pb.InjuredSegment, error) { if err != nil { return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(err, tx.Rollback())) } else if res == nil { - return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(storage.ErrEmptyQueue, tx.Rollback())) + return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(storage.ErrEmptyQueue.New(""), tx.Rollback())) } deleted, err := tx.Delete_Injuredsegment_By_Id( diff --git a/storage/common.go b/storage/common.go index eec96fe3b..96b0c19b0 100644 --- a/storage/common.go +++ b/storage/common.go @@ -20,7 +20,7 @@ var ErrKeyNotFound = errs.Class("key not found") var ErrEmptyKey = errs.Class("empty key") // ErrEmptyQueue is returned when attempting to Dequeue from an empty queue -var ErrEmptyQueue = errors.New("empty queue") +var ErrEmptyQueue = errs.Class("empty queue") // ErrLimitExceeded is returned when request limit is exceeded var ErrLimitExceeded = errors.New("limit exceeded") diff --git a/storage/redis/client_queue.go b/storage/redis/client_queue.go index 5e20b3d76..e20efb370 100644 --- a/storage/redis/client_queue.go +++ b/storage/redis/client_queue.go @@ -45,7 +45,7 @@ func (client *Queue) Dequeue() (storage.Value, error) { out, err := client.db.RPop(queueKey).Bytes() if err != nil { if err == redis.Nil { - return nil, storage.ErrEmptyQueue + return nil, storage.ErrEmptyQueue.New("") } return nil, Error.New("dequeue error: %v", err) } diff --git a/storage/testqueue/queue.go b/storage/testqueue/queue.go index 008ab9f3a..1e47e0661 100644 --- a/storage/testqueue/queue.go +++ b/storage/testqueue/queue.go @@ -38,7 +38,7 @@ func (q *Queue) Dequeue() (storage.Value, error) { q.s.Remove(e) // Dequeue return e.Value.(storage.Value), nil } - return nil, storage.ErrEmptyQueue + return nil, storage.ErrEmptyQueue.New("") } //Peekqueue gets upto 'limit' entries from the list queue