removes unused queue code, moves queue_test.go to repairqueue_test.go in satellitedb dir (#1783)
This commit is contained in:
parent
8fc5fe1d6f
commit
8d1f614662
@ -6,14 +6,11 @@ package queue
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/storage"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RepairQueue implements queueing for segments that need repairing.
|
// RepairQueue implements queueing for segments that need repairing.
|
||||||
|
// Implementation can be found at satellite/satellitedb/repairqueue.go.
|
||||||
type RepairQueue interface {
|
type RepairQueue interface {
|
||||||
// Insert adds an injured segment.
|
// Insert adds an injured segment.
|
||||||
Insert(ctx context.Context, s *pb.InjuredSegment) error
|
Insert(ctx context.Context, s *pb.InjuredSegment) error
|
||||||
@ -24,65 +21,3 @@ type RepairQueue interface {
|
|||||||
// SelectN lists limit amount of injured segments.
|
// SelectN lists limit amount of injured segments.
|
||||||
SelectN(ctx context.Context, limit int) ([]pb.InjuredSegment, error)
|
SelectN(ctx context.Context, limit int) ([]pb.InjuredSegment, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Queue implements the RepairQueue interface
|
|
||||||
type Queue struct {
|
|
||||||
db storage.Queue
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewQueue returns a pointer to a new Queue instance with an initialized connection to Redis
|
|
||||||
func NewQueue(client storage.Queue) *Queue {
|
|
||||||
zap.L().Info("Initializing new data repair queue")
|
|
||||||
return &Queue{db: client}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue adds a repair segment to the queue
|
|
||||||
func (q *Queue) Enqueue(ctx context.Context, qi *pb.InjuredSegment) error {
|
|
||||||
val, err := proto.Marshal(qi)
|
|
||||||
if err != nil {
|
|
||||||
return Error.New("error marshalling injured seg %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = q.db.Enqueue(val)
|
|
||||||
if err != nil {
|
|
||||||
return Error.New("error adding injured seg to queue %s", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dequeue returns the next repair segement and removes it from the queue
|
|
||||||
func (q *Queue) Dequeue(ctx context.Context) (pb.InjuredSegment, error) {
|
|
||||||
val, err := q.db.Dequeue()
|
|
||||||
if err != nil {
|
|
||||||
if storage.ErrEmptyQueue.Has(err) {
|
|
||||||
return pb.InjuredSegment{}, err
|
|
||||||
}
|
|
||||||
return pb.InjuredSegment{}, Error.New("error obtaining item from repair queue %s", err)
|
|
||||||
}
|
|
||||||
seg := &pb.InjuredSegment{}
|
|
||||||
err = proto.Unmarshal(val, seg)
|
|
||||||
if err != nil {
|
|
||||||
return pb.InjuredSegment{}, Error.New("error unmarshalling segment %s", err)
|
|
||||||
}
|
|
||||||
return *seg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Peekqueue returns upto 'limit' of the entries from the repair queue
|
|
||||||
func (q *Queue) Peekqueue(ctx context.Context, limit int) ([]pb.InjuredSegment, error) {
|
|
||||||
if limit < 0 || limit > storage.LookupLimit {
|
|
||||||
limit = storage.LookupLimit
|
|
||||||
}
|
|
||||||
result, err := q.db.Peekqueue(limit)
|
|
||||||
if err != nil {
|
|
||||||
return []pb.InjuredSegment{}, Error.New("error peeking into repair queue %s", err)
|
|
||||||
}
|
|
||||||
segs := make([]pb.InjuredSegment, 0)
|
|
||||||
for _, v := range result {
|
|
||||||
seg := &pb.InjuredSegment{}
|
|
||||||
if err = proto.Unmarshal(v, seg); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
segs = append(segs, *seg)
|
|
||||||
}
|
|
||||||
return segs, nil
|
|
||||||
}
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
// Copyright (C) 2019 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package queue_test
|
package satellitedb_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
@ -68,18 +68,6 @@ type KeyValueStore interface {
|
|||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
//Queue is an interface describing queue stores like redis
|
|
||||||
type Queue interface {
|
|
||||||
//Enqueue add a FIFO element
|
|
||||||
Enqueue(Value) error
|
|
||||||
//Dequeue removes a FIFO element, returning ErrEmptyQueue if empty
|
|
||||||
Dequeue() (Value, error)
|
|
||||||
//Peekqueue returns 'limit' elements from the queue
|
|
||||||
Peekqueue(limit int) ([]Value, error)
|
|
||||||
//Close closes the store
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// IterateOptions contains options for iterator
|
// IterateOptions contains options for iterator
|
||||||
type IterateOptions struct {
|
type IterateOptions struct {
|
||||||
// Prefix ensure
|
// Prefix ensure
|
||||||
|
@ -1,67 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/go-redis/redis"
|
|
||||||
|
|
||||||
"storj.io/storj/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Queue is the aliased entrypoint into Redis
|
|
||||||
type Queue Client
|
|
||||||
|
|
||||||
const queueKey = "queue"
|
|
||||||
|
|
||||||
// NewQueue returns a configured Client instance, verifying a successful connection to redis
|
|
||||||
func NewQueue(address, password string, db int) (*Queue, error) {
|
|
||||||
queue, err := NewClient(address, password, db)
|
|
||||||
return (*Queue)(queue), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewQueueFrom returns a configured Client instance from a redis address, verifying a successful connection to redis
|
|
||||||
func NewQueueFrom(address string) (*Queue, error) {
|
|
||||||
queue, err := NewClientFrom(address)
|
|
||||||
return (*Queue)(queue), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes a redis client
|
|
||||||
func (client *Queue) Close() error {
|
|
||||||
return client.db.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
//Enqueue add a FIFO element, for the storage.Queue interface
|
|
||||||
func (client *Queue) Enqueue(value storage.Value) error {
|
|
||||||
err := client.db.LPush(queueKey, []byte(value)).Err()
|
|
||||||
if err != nil {
|
|
||||||
return Error.New("enqueue error: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//Dequeue removes a FIFO element, for the storage.Queue interface
|
|
||||||
func (client *Queue) Dequeue() (storage.Value, error) {
|
|
||||||
out, err := client.db.RPop(queueKey).Bytes()
|
|
||||||
if err != nil {
|
|
||||||
if err == redis.Nil {
|
|
||||||
return nil, storage.ErrEmptyQueue.New("")
|
|
||||||
}
|
|
||||||
return nil, Error.New("dequeue error: %v", err)
|
|
||||||
}
|
|
||||||
return storage.Value(out), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Peekqueue returns upto 1000 entries in the queue without removing
|
|
||||||
func (client *Queue) Peekqueue(limit int) ([]storage.Value, error) {
|
|
||||||
cmd := client.db.LRange(queueKey, 0, int64(limit))
|
|
||||||
items, err := cmd.Result()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
result := make([]storage.Value, 0)
|
|
||||||
for _, v := range items {
|
|
||||||
result = append(result, storage.Value([]byte(v)))
|
|
||||||
}
|
|
||||||
return result, err
|
|
||||||
}
|
|
@ -1,26 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"storj.io/storj/storage/redis/redisserver"
|
|
||||||
"storj.io/storj/storage/testsuite"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestQueue(t *testing.T) {
|
|
||||||
addr, cleanup, err := redisserver.Start()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
client, err := NewQueue(addr, "", 1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
testsuite.RunQueueTests(t, client)
|
|
||||||
}
|
|
@ -1,41 +0,0 @@
|
|||||||
// Copyright (C) 2019 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package testsuite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"storj.io/storj/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
// RunQueueTests runs common storage.Queue tests
|
|
||||||
func RunQueueTests(t *testing.T, q storage.Queue) {
|
|
||||||
t.Run("basic", func(t *testing.T) { testBasic(t, q) })
|
|
||||||
}
|
|
||||||
|
|
||||||
func testBasic(t *testing.T, q storage.Queue) {
|
|
||||||
err := q.Enqueue(storage.Value("hello world"))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = q.Enqueue(storage.Value("Привіт, світе"))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
err = q.Enqueue(storage.Value([]byte{0, 0, 0, 0, 255, 255, 255, 255}))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
list, err := q.Peekqueue(100)
|
|
||||||
assert.NotNil(t, list)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
out, err := q.Dequeue()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, out, storage.Value("hello world"))
|
|
||||||
out, err = q.Dequeue()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, out, storage.Value("Привіт, світе"))
|
|
||||||
out, err = q.Dequeue()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, out, storage.Value([]byte{0, 0, 0, 0, 255, 255, 255, 255}))
|
|
||||||
out, err = q.Dequeue()
|
|
||||||
assert.Nil(t, out)
|
|
||||||
assert.Error(t, err)
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user