pkg structure and repair queue enqueue/dequeue (#397)
* pkg structure and repair queue implementation * adds zeebo * gets redis working with queue * modifies interface * changes re feedback * pr changes w encoding and enqueue dequeue modifications * test force error * concurrent enqueue/dequeue * refactor sequential to use only 1 slice * added token for time conflicts
This commit is contained in:
parent
ab64aa0b97
commit
54996e1edb
2
go.mod
2
go.mod
@ -21,7 +21,7 @@ require (
|
||||
github.com/eapache/go-resiliency v1.1.0 // indirect
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
|
||||
github.com/eapache/queue v1.1.0 // indirect
|
||||
github.com/eclipse/paho.mqtt.golang v1.1.1 // indirect
|
||||
github.com/eclipse/paho.mqtt.golang v1.1.1
|
||||
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
|
||||
github.com/fatih/color v1.7.0 // indirect
|
||||
github.com/fatih/structs v1.0.0 // indirect
|
||||
|
@ -1,42 +0,0 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package datarepair
|
||||
|
||||
import "storj.io/storj/pkg/pb"
|
||||
|
||||
//Queue implements the RepairQueue interface
|
||||
type Queue struct {
|
||||
//redis db of repair segments
|
||||
//offline nodes?
|
||||
}
|
||||
|
||||
//NewQueue creates a new data repair queue
|
||||
func NewQueue() {
|
||||
|
||||
}
|
||||
|
||||
//Add adds a repair segment to the queue
|
||||
func (q Queue) Add(qi *pb.InjuredSegment) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//AddAll adds a slice of repair segements to the queue
|
||||
func (q Queue) AddAll(qis []*pb.InjuredSegment) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//Remove removes a repair segment from the queue
|
||||
func (q Queue) Remove(qi *pb.InjuredSegment) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//GetNext returns the next repair segement from the queue
|
||||
func (q Queue) GetNext() pb.InjuredSegment {
|
||||
return pb.InjuredSegment{}
|
||||
}
|
||||
|
||||
//GetSize returns the number of repair segements are in the queue
|
||||
func (q Queue) GetSize() int {
|
||||
return 0
|
||||
}
|
91
pkg/datarepair/queue/queue.go
Normal file
91
pkg/datarepair/queue/queue.go
Normal file
@ -0,0 +1,91 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
// RepairQueue is the interface for the data repair queue
|
||||
type RepairQueue interface {
|
||||
Enqueue(qi *pb.InjuredSegment) error
|
||||
Dequeue() (pb.InjuredSegment, error)
|
||||
}
|
||||
|
||||
// Queue implements the RepairQueue interface
|
||||
type Queue struct {
|
||||
mu sync.Mutex
|
||||
db storage.KeyValueStore
|
||||
}
|
||||
|
||||
var (
|
||||
queueError = errs.Class("data repair queue error")
|
||||
)
|
||||
|
||||
// NewQueue returns a pointer to a new Queue instance with an initialized connection to Redis
|
||||
func NewQueue(client storage.KeyValueStore) *Queue {
|
||||
return &Queue{
|
||||
mu: sync.Mutex{},
|
||||
db: client,
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue adds a repair segment to the queue
|
||||
func (q *Queue) Enqueue(qi *pb.InjuredSegment) error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
dateTime := make([]byte, binary.MaxVarintLen64)
|
||||
// TODO: this can cause conflicts when time is unstable or running on multiple computers
|
||||
// Append random 4 byte token to account for time conflicts
|
||||
binary.BigEndian.PutUint64(dateTime, uint64(time.Now().UnixNano()))
|
||||
const tokenSize = 4
|
||||
token := make([]byte, tokenSize)
|
||||
rand.Read(token)
|
||||
dateTime = append(dateTime, token...)
|
||||
val, err := proto.Marshal(qi)
|
||||
if err != nil {
|
||||
return queueError.New("error marshalling injured seg %s", err)
|
||||
}
|
||||
err = q.db.Put(dateTime, val)
|
||||
if err != nil {
|
||||
return queueError.New("error adding injured seg to queue %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dequeue returns the next repair segement and removes it from the queue
|
||||
func (q *Queue) Dequeue() (pb.InjuredSegment, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
items, _, err := storage.ListV2(q.db, storage.ListOptions{IncludeValue: true, Limit: 1, Recursive: true})
|
||||
if err != nil {
|
||||
return pb.InjuredSegment{}, queueError.New("error getting first key %s", err)
|
||||
}
|
||||
if len(items) == 0 {
|
||||
return pb.InjuredSegment{}, queueError.New("empty database")
|
||||
}
|
||||
key := items[0].Key
|
||||
val := items[0].Value
|
||||
|
||||
seg := &pb.InjuredSegment{}
|
||||
err = proto.Unmarshal(val, seg)
|
||||
if err != nil {
|
||||
return pb.InjuredSegment{}, queueError.New("error unmarshalling segment %s", err)
|
||||
}
|
||||
err = q.db.Delete(key)
|
||||
if err != nil {
|
||||
return *seg, queueError.New("error removing injured seg %s", err)
|
||||
}
|
||||
return *seg, nil
|
||||
}
|
127
pkg/datarepair/queue/queue_test.go
Normal file
127
pkg/datarepair/queue/queue_test.go
Normal file
@ -0,0 +1,127 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/storage/teststore"
|
||||
)
|
||||
|
||||
func TestEnqueueDequeue(t *testing.T) {
|
||||
db := teststore.New()
|
||||
q := NewQueue(db)
|
||||
seg := &pb.InjuredSegment{
|
||||
Path: "abc",
|
||||
LostPieces: []int32{},
|
||||
}
|
||||
err := q.Enqueue(seg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
s, err := q.Dequeue()
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, proto.Equal(&s, seg))
|
||||
}
|
||||
|
||||
func TestDequeueEmptyQueue(t *testing.T) {
|
||||
db := teststore.New()
|
||||
q := NewQueue(db)
|
||||
s, err := q.Dequeue()
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, pb.InjuredSegment{}, s)
|
||||
}
|
||||
|
||||
func TestForceError(t *testing.T) {
|
||||
db := teststore.New()
|
||||
q := NewQueue(db)
|
||||
err := q.Enqueue(&pb.InjuredSegment{Path: "abc", LostPieces: []int32{}})
|
||||
assert.NoError(t, err)
|
||||
db.ForceError++
|
||||
item, err := q.Dequeue()
|
||||
assert.Equal(t, pb.InjuredSegment{}, item)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestSequential(t *testing.T) {
|
||||
db := teststore.New()
|
||||
q := NewQueue(db)
|
||||
const N = 100
|
||||
var addSegs []*pb.InjuredSegment
|
||||
for i := 0; i < N; i++ {
|
||||
seg := &pb.InjuredSegment{
|
||||
Path: strconv.Itoa(i),
|
||||
LostPieces: []int32{int32(i)},
|
||||
}
|
||||
err := q.Enqueue(seg)
|
||||
assert.NoError(t, err)
|
||||
addSegs = append(addSegs, seg)
|
||||
}
|
||||
for i := 0; i < N; i++ {
|
||||
dqSeg, err := q.Dequeue()
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, proto.Equal(addSegs[i], &dqSeg))
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallel(t *testing.T) {
|
||||
queue := NewQueue(teststore.New())
|
||||
const N = 100
|
||||
errs := make(chan error, N*2)
|
||||
entries := make(chan *pb.InjuredSegment, N*2)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(N)
|
||||
// Add to queue concurrently
|
||||
for i := 0; i < N; i++ {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
err := queue.Enqueue(&pb.InjuredSegment{
|
||||
Path: strconv.Itoa(i),
|
||||
LostPieces: []int32{int32(i)},
|
||||
})
|
||||
if err != nil {
|
||||
errs <- err
|
||||
}
|
||||
}(i)
|
||||
|
||||
}
|
||||
wg.Wait()
|
||||
wg.Add(N)
|
||||
// Remove from queue concurrently
|
||||
for i := 0; i < N; i++ {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
segment, err := queue.Dequeue()
|
||||
if err != nil {
|
||||
errs <- err
|
||||
}
|
||||
entries <- &segment
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
close(entries)
|
||||
|
||||
for err := range errs {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
var items []*pb.InjuredSegment
|
||||
for segment := range entries {
|
||||
items = append(items, segment)
|
||||
}
|
||||
|
||||
sort.Slice(items, func(i, k int) bool { return items[i].LostPieces[0] < items[k].LostPieces[0] })
|
||||
// check if the enqueued and dequeued elements match
|
||||
for i := 0; i < N; i++ {
|
||||
assert.Equal(t, items[i].LostPieces[0], int32(i))
|
||||
}
|
||||
}
|
@ -2,16 +2,3 @@
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package datarepair
|
||||
|
||||
import (
|
||||
"storj.io/storj/pkg/pb"
|
||||
)
|
||||
|
||||
//RepairQueue is the interface for the data repair queue
|
||||
type RepairQueue interface {
|
||||
Add(qi *pb.InjuredSegment) error
|
||||
AddAll(qis []*pb.InjuredSegment) error
|
||||
Remove(qi *pb.InjuredSegment) error
|
||||
GetNext() pb.InjuredSegment
|
||||
GetSize() int
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user