ECClient (#110)
* WIP ECClient * Get returns RangeCloser * Introduce RedundancyStrategy * Constructor takes max buffer memory * Remove unnecessary NopCloser wrapper * Added telemetry * Tests * Adapt to PSClient from master * Decode should report error if empty rrs map is passed * collectErrors helper * Move to /pkg/storage * Move to /pkg/storage/ec * Rename ecclient.go to client.go * Better logging * Rename ec.ECClient to ec.Client * Fix some test execution * Adopt Transport Client from master
This commit is contained in:
parent
0e8931a299
commit
d8f1ec1db6
@ -49,10 +49,10 @@ func Main() error {
|
||||
return err
|
||||
}
|
||||
// initialize http rangers in parallel to save from network latency
|
||||
rrs := map[int]ranger.Ranger{}
|
||||
rrs := map[int]ranger.RangeCloser{}
|
||||
type indexRangerError struct {
|
||||
i int
|
||||
rr ranger.Ranger
|
||||
rr ranger.RangeCloser
|
||||
err error
|
||||
}
|
||||
result := make(chan indexRangerError, *rsn)
|
||||
@ -60,7 +60,7 @@ func Main() error {
|
||||
go func(i int) {
|
||||
url := fmt.Sprintf("http://18.184.133.99:%d", 10000+i)
|
||||
rr, err := ranger.HTTPRanger(url)
|
||||
result <- indexRangerError{i, rr, err}
|
||||
result <- indexRangerError{i: i, rr: ranger.NopCloser(rr), err: err}
|
||||
}(i)
|
||||
}
|
||||
// wait for all goroutines to finish and save result in rrs map
|
||||
@ -72,11 +72,12 @@ func Main() error {
|
||||
}
|
||||
rrs[res.i] = res.rr
|
||||
}
|
||||
rr, err := eestream.Decode(rrs, es, 4*1024*1024)
|
||||
rc, err := eestream.Decode(rrs, es, 4*1024*1024)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rr, err = eestream.Transform(rr, decrypter)
|
||||
defer rc.Close()
|
||||
rr, err := eestream.Transform(rc, decrypter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ func Main() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rrs := map[int]ranger.Ranger{}
|
||||
rrs := map[int]ranger.RangeCloser{}
|
||||
for _, piece := range pieces {
|
||||
piecenum, err := strconv.Atoi(strings.TrimSuffix(piece.Name(), ".piece"))
|
||||
if err != nil {
|
||||
@ -71,14 +71,14 @@ func Main() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
rrs[piecenum] = r
|
||||
}
|
||||
rr, err := eestream.Decode(rrs, es, 4*1024*1024)
|
||||
rc, err := eestream.Decode(rrs, es, 4*1024*1024)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rr, err = eestream.Transform(rr, decrypter)
|
||||
defer rc.Close()
|
||||
rr, err := eestream.Transform(rc, decrypter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -48,6 +48,10 @@ func Main() error {
|
||||
return err
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, *pieceBlockSize)
|
||||
rs, err := eestream.NewRedundancyStrategy(es, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encKey := sha256.Sum256([]byte(*key))
|
||||
var firstNonce [12]byte
|
||||
encrypter, err := eestream.NewAESGCMEncrypter(
|
||||
@ -55,9 +59,9 @@ func Main() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readers, err := eestream.EncodeReader(context.Background(), eestream.TransformReader(
|
||||
eestream.PadReader(os.Stdin, encrypter.InBlockSize()), encrypter, 0),
|
||||
es, 0, 0, 4*1024*1024)
|
||||
readers, err := eestream.EncodeReader(context.Background(),
|
||||
eestream.TransformReader(eestream.PadReader(os.Stdin,
|
||||
encrypter.InBlockSize()), encrypter, 0), rs, 4*1024*1024)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -50,9 +50,8 @@ func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser,
|
||||
return readcloser.FatalReadCloser(
|
||||
Error.New("expected size not a factor decoded block size"))
|
||||
}
|
||||
if mbm < 0 {
|
||||
return readcloser.FatalReadCloser(
|
||||
Error.New("negative max buffer memory"))
|
||||
if err := checkMBM(mbm); err != nil {
|
||||
return readcloser.FatalReadCloser(err)
|
||||
}
|
||||
chanSize := mbm / (len(rs) * es.EncodedBlockSize())
|
||||
if chanSize < 1 {
|
||||
@ -223,7 +222,7 @@ func (dr *decodedReader) readBlock(inbufs map[int][]byte) error {
|
||||
|
||||
type decodedRanger struct {
|
||||
es ErasureScheme
|
||||
rrs map[int]ranger.Ranger
|
||||
rrs map[int]ranger.RangeCloser
|
||||
inSize int64
|
||||
mbm int // max buffer memory
|
||||
}
|
||||
@ -234,9 +233,12 @@ type decodedRanger struct {
|
||||
// rrs is a map of erasure piece numbers to erasure piece rangers.
|
||||
// mbm is the maximum memory (in bytes) to be allocated for read buffers. If
|
||||
// set to 0, the minimum possible memory will be used.
|
||||
func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int) (ranger.Ranger, error) {
|
||||
if mbm < 0 {
|
||||
return nil, Error.New("negative max buffer memory")
|
||||
func Decode(rrs map[int]ranger.RangeCloser, es ErasureScheme, mbm int) (ranger.RangeCloser, error) {
|
||||
if err := checkMBM(mbm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(rrs) < es.RequiredCount() {
|
||||
return nil, Error.New("not enough readers to reconstruct data!")
|
||||
}
|
||||
size := int64(-1)
|
||||
for _, rr := range rrs {
|
||||
@ -244,21 +246,18 @@ func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int) (ranger.Ranger
|
||||
size = rr.Size()
|
||||
} else {
|
||||
if size != rr.Size() {
|
||||
return nil, Error.New("decode failure: range reader sizes don't " +
|
||||
"all match")
|
||||
return nil, Error.New(
|
||||
"decode failure: range reader sizes don't all match")
|
||||
}
|
||||
}
|
||||
}
|
||||
if size == -1 {
|
||||
return ranger.ByteRanger(nil), nil
|
||||
return ranger.NopCloser(ranger.ByteRanger(nil)), nil
|
||||
}
|
||||
if size%int64(es.EncodedBlockSize()) != 0 {
|
||||
return nil, Error.New("invalid erasure decoder and range reader combo. " +
|
||||
"range reader size must be a multiple of erasure encoder block size")
|
||||
}
|
||||
if len(rrs) < es.RequiredCount() {
|
||||
return nil, Error.New("not enough readers to reconstruct data!")
|
||||
}
|
||||
return &decodedRanger{
|
||||
es: es,
|
||||
rrs: rrs,
|
||||
@ -315,3 +314,20 @@ func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (io.Re
|
||||
// length might not have included all of the blocks, limit what we return
|
||||
return readcloser.LimitReadCloser(r, length), nil
|
||||
}
|
||||
|
||||
func (dr *decodedRanger) Close() error {
|
||||
errs := make(chan error, len(dr.rrs))
|
||||
for _, rr := range dr.rrs {
|
||||
go func(rr ranger.RangeCloser) {
|
||||
errs <- rr.Close()
|
||||
}(rr)
|
||||
}
|
||||
var first error
|
||||
for range dr.rrs {
|
||||
err := <-errs
|
||||
if err != nil && first == nil {
|
||||
first = Error.Wrap(err)
|
||||
}
|
||||
}
|
||||
return first
|
||||
}
|
||||
|
@ -39,13 +39,68 @@ type ErasureScheme interface {
|
||||
RequiredCount() int
|
||||
}
|
||||
|
||||
// RedundancyStrategy is an ErasureScheme with a minimum and optimum thresholds
|
||||
type RedundancyStrategy struct {
|
||||
ErasureScheme
|
||||
min int
|
||||
opt int
|
||||
}
|
||||
|
||||
// NewRedundancyStrategy from the given ErasureScheme, minimum and optimum
|
||||
// thresholds
|
||||
//
|
||||
// min is the minimum threshold. If set to 0, it will be reset to the
|
||||
// TotalCount of the ErasureScheme.
|
||||
// opt is the optimum threshold. If set to 0, it will be reset to the
|
||||
// TotalCount of the ErasureScheme.
|
||||
func NewRedundancyStrategy(es ErasureScheme, min, opt int) (RedundancyStrategy, error) {
|
||||
if min == 0 {
|
||||
min = es.TotalCount()
|
||||
}
|
||||
if opt == 0 {
|
||||
opt = es.TotalCount()
|
||||
}
|
||||
if min < 0 {
|
||||
return RedundancyStrategy{}, Error.New("negative minimum threshold")
|
||||
}
|
||||
if min > 0 && min < es.RequiredCount() {
|
||||
return RedundancyStrategy{}, Error.New("minimum threshold less than required count")
|
||||
}
|
||||
if min > es.TotalCount() {
|
||||
return RedundancyStrategy{}, Error.New("minimum threshold greater than total count")
|
||||
}
|
||||
if opt < 0 {
|
||||
return RedundancyStrategy{}, Error.New("negative optimum threshold")
|
||||
}
|
||||
if opt > 0 && opt < es.RequiredCount() {
|
||||
return RedundancyStrategy{}, Error.New("optimum threshold less than required count")
|
||||
}
|
||||
if opt > es.TotalCount() {
|
||||
return RedundancyStrategy{}, Error.New("optimum threshold greater than total count")
|
||||
}
|
||||
if min > opt {
|
||||
return RedundancyStrategy{}, Error.New("minimum threshold greater than optimum threshold")
|
||||
}
|
||||
return RedundancyStrategy{ErasureScheme: es, min: min, opt: opt}, nil
|
||||
}
|
||||
|
||||
// MinimumThreshold is the number of available erasure pieces below which
|
||||
// the data must be repaired to avoid loss
|
||||
func (rs *RedundancyStrategy) MinimumThreshold() int {
|
||||
return rs.min
|
||||
}
|
||||
|
||||
// OptimumThreshold is the number of available erasure pieces above which
|
||||
// there is no need for the data to be repaired
|
||||
func (rs *RedundancyStrategy) OptimumThreshold() int {
|
||||
return rs.opt
|
||||
}
|
||||
|
||||
type encodedReader struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
r io.Reader
|
||||
es ErasureScheme
|
||||
min int // minimum threshold
|
||||
opt int // optimum threshold
|
||||
rs RedundancyStrategy
|
||||
inbuf []byte
|
||||
eps map[int](*encodedPiece)
|
||||
mux sync.Mutex
|
||||
@ -53,94 +108,52 @@ type encodedReader struct {
|
||||
done int // number of readers done
|
||||
}
|
||||
|
||||
// EncodeReader takes a Reader and an ErasureScheme and return a slice of
|
||||
// EncodeReader takes a Reader and a RedundancyStrategy and returns a slice of
|
||||
// Readers.
|
||||
//
|
||||
// min is the minimum threshold - the minimum number of erasure pieces that
|
||||
// are completely stored. If set to 0, it will be reset to the TotalCount
|
||||
// of the ErasureScheme.
|
||||
// opt is the optimum threshold - the optimum number of erasure pieces that
|
||||
// are completely stored. If set to 0, it will be reset to the TotalCount
|
||||
// of the ErasureScheme.
|
||||
// mbm is the maximum memory (in bytes) to be allocated for read buffers. If
|
||||
// set to 0, the minimum possible memory will be used.
|
||||
//
|
||||
// When the minimum threshold is reached a timer will be started with another
|
||||
// 1.5x the amount of time that took so far. The Readers will be aborted as
|
||||
// soon as the timer expires or the optimum threshold is reached.
|
||||
func EncodeReader(ctx context.Context, r io.Reader, es ErasureScheme,
|
||||
min, opt, mbm int) ([]io.Reader, error) {
|
||||
if min == 0 {
|
||||
min = es.TotalCount()
|
||||
}
|
||||
if opt == 0 {
|
||||
opt = es.TotalCount()
|
||||
}
|
||||
if err := checkParams(es, min, opt, mbm); err != nil {
|
||||
func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy,
|
||||
mbm int) ([]io.Reader, error) {
|
||||
if err := checkMBM(mbm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
er := &encodedReader{
|
||||
r: r,
|
||||
es: es,
|
||||
min: min,
|
||||
opt: opt,
|
||||
inbuf: make([]byte, es.DecodedBlockSize()),
|
||||
eps: make(map[int](*encodedPiece), es.TotalCount()),
|
||||
rs: rs,
|
||||
inbuf: make([]byte, rs.DecodedBlockSize()),
|
||||
eps: make(map[int](*encodedPiece), rs.TotalCount()),
|
||||
start: time.Now(),
|
||||
}
|
||||
er.ctx, er.cancel = context.WithCancel(ctx)
|
||||
readers := make([]io.Reader, 0, es.TotalCount())
|
||||
for i := 0; i < es.TotalCount(); i++ {
|
||||
readers := make([]io.Reader, 0, rs.TotalCount())
|
||||
for i := 0; i < rs.TotalCount(); i++ {
|
||||
er.eps[i] = &encodedPiece{
|
||||
er: er,
|
||||
}
|
||||
er.eps[i].ctx, er.eps[i].cancel = context.WithCancel(er.ctx)
|
||||
readers = append(readers, er.eps[i])
|
||||
}
|
||||
chanSize := mbm / (es.TotalCount() * es.EncodedBlockSize())
|
||||
chanSize := mbm / (rs.TotalCount() * rs.EncodedBlockSize())
|
||||
if chanSize < 1 {
|
||||
chanSize = 1
|
||||
}
|
||||
for i := 0; i < es.TotalCount(); i++ {
|
||||
for i := 0; i < rs.TotalCount(); i++ {
|
||||
er.eps[i].ch = make(chan block, chanSize)
|
||||
}
|
||||
go er.fillBuffer()
|
||||
return readers, nil
|
||||
}
|
||||
|
||||
func checkParams(es ErasureScheme, min, opt, mbm int) error {
|
||||
if min < 0 {
|
||||
return Error.New("negative minimum threshold")
|
||||
}
|
||||
if min > 0 && min < es.RequiredCount() {
|
||||
return Error.New("minimum threshold less than required count")
|
||||
}
|
||||
if min > es.TotalCount() {
|
||||
return Error.New("minimum threshold greater than total count")
|
||||
}
|
||||
if opt < 0 {
|
||||
return Error.New("negative optimum threshold")
|
||||
}
|
||||
if opt > 0 && opt < es.RequiredCount() {
|
||||
return Error.New("optimum threshold less than required count")
|
||||
}
|
||||
if opt > es.TotalCount() {
|
||||
return Error.New("optimum threshold greater than total count")
|
||||
}
|
||||
if min > opt {
|
||||
return Error.New("minimum threshold greater than optimum threshold")
|
||||
}
|
||||
if mbm < 0 {
|
||||
return Error.New("negative max buffer memory")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (er *encodedReader) fillBuffer() {
|
||||
// these channels will synchronize the erasure encoder output with the
|
||||
// goroutines for adding the output to the reader buffers
|
||||
copiers := make(map[int]chan block, er.es.TotalCount())
|
||||
for i := 0; i < er.es.TotalCount(); i++ {
|
||||
copiers := make(map[int]chan block, er.rs.TotalCount())
|
||||
for i := 0; i < er.rs.TotalCount(); i++ {
|
||||
copiers[i] = make(chan block)
|
||||
// closing the channel will exit the next goroutine
|
||||
defer close(copiers[i])
|
||||
@ -157,7 +170,7 @@ func (er *encodedReader) fillBuffer() {
|
||||
}
|
||||
return
|
||||
}
|
||||
err = er.es.Encode(er.inbuf, func(num int, data []byte) {
|
||||
err = er.rs.Encode(er.inbuf, func(num int, data []byte) {
|
||||
b := block{
|
||||
i: num,
|
||||
num: blockNum,
|
||||
@ -228,7 +241,7 @@ func (er *encodedReader) checkSlowChannel(num int) (closed bool) {
|
||||
// check if more than the required buffer channels are empty, i.e. the
|
||||
// current channel is slow and should be closed and its context should be
|
||||
// canceled
|
||||
closed = ec >= er.min
|
||||
closed = ec >= er.rs.MinimumThreshold()
|
||||
if closed {
|
||||
close(er.eps[num].ch)
|
||||
er.eps[num].ch = nil
|
||||
@ -242,12 +255,12 @@ func (er *encodedReader) readerDone() {
|
||||
er.mux.Lock()
|
||||
defer er.mux.Unlock()
|
||||
er.done++
|
||||
if er.done == er.min {
|
||||
if er.done == er.rs.MinimumThreshold() {
|
||||
// minimum threshold reached, wait for 1.5x the duration and cancel
|
||||
// the context regardless if optimum threshold is reached
|
||||
time.AfterFunc(time.Since(er.start)*3/2, er.cancel)
|
||||
}
|
||||
if er.done == er.opt {
|
||||
if er.done == er.rs.OptimumThreshold() {
|
||||
// optimum threshold reached - cancel the context
|
||||
er.cancel()
|
||||
}
|
||||
@ -305,34 +318,24 @@ func (ep *encodedPiece) Read(p []byte) (n int, err error) {
|
||||
// interface.
|
||||
type EncodedRanger struct {
|
||||
rr ranger.Ranger
|
||||
es ErasureScheme
|
||||
min int // minimum threshold
|
||||
opt int // optimum threshold
|
||||
rs RedundancyStrategy
|
||||
mbm int // max buffer memory
|
||||
}
|
||||
|
||||
// NewEncodedRanger creates an EncodedRanger. See the comments for EncodeReader
|
||||
// about min, opt and mbm.
|
||||
func NewEncodedRanger(rr ranger.Ranger, es ErasureScheme, min, opt, mbm int) (*EncodedRanger, error) {
|
||||
if rr.Size()%int64(es.DecodedBlockSize()) != 0 {
|
||||
// NewEncodedRanger from the given Ranger and RedundancyStrategy. See the
|
||||
// comments for EncodeReader about the minumum and optimum thresholds, and the
|
||||
// max buffer memory.
|
||||
func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy, mbm int) (*EncodedRanger, error) {
|
||||
if rr.Size()%int64(rs.DecodedBlockSize()) != 0 {
|
||||
return nil, Error.New("invalid erasure encoder and range reader combo. " +
|
||||
"range reader size must be a multiple of erasure encoder block size")
|
||||
}
|
||||
if min == 0 {
|
||||
min = es.TotalCount()
|
||||
}
|
||||
if opt == 0 {
|
||||
opt = es.TotalCount()
|
||||
}
|
||||
err := checkParams(es, min, opt, mbm)
|
||||
if err != nil {
|
||||
if err := checkMBM(mbm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &EncodedRanger{
|
||||
es: es,
|
||||
rs: rs,
|
||||
rr: rr,
|
||||
min: min,
|
||||
opt: opt,
|
||||
mbm: mbm,
|
||||
}, nil
|
||||
}
|
||||
@ -340,8 +343,8 @@ func NewEncodedRanger(rr ranger.Ranger, es ErasureScheme, min, opt, mbm int) (*E
|
||||
// OutputSize is like Ranger.Size but returns the Size of the erasure encoded
|
||||
// pieces that come out.
|
||||
func (er *EncodedRanger) OutputSize() int64 {
|
||||
blocks := er.rr.Size() / int64(er.es.DecodedBlockSize())
|
||||
return blocks * int64(er.es.EncodedBlockSize())
|
||||
blocks := er.rr.Size() / int64(er.rs.DecodedBlockSize())
|
||||
return blocks * int64(er.rs.EncodedBlockSize())
|
||||
}
|
||||
|
||||
// Range is like Ranger.Range, but returns a slice of Readers
|
||||
@ -349,15 +352,15 @@ func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) ([]io.
|
||||
// the offset and length given may not be block-aligned, so let's figure
|
||||
// out which blocks contain the request.
|
||||
firstBlock, blockCount := calcEncompassingBlocks(
|
||||
offset, length, er.es.EncodedBlockSize())
|
||||
offset, length, er.rs.EncodedBlockSize())
|
||||
// okay, now let's encode the reader for the range containing the blocks
|
||||
r, err := er.rr.Range(ctx,
|
||||
firstBlock*int64(er.es.DecodedBlockSize()),
|
||||
blockCount*int64(er.es.DecodedBlockSize()))
|
||||
firstBlock*int64(er.rs.DecodedBlockSize()),
|
||||
blockCount*int64(er.rs.DecodedBlockSize()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
readers, err := EncodeReader(ctx, r, er.es, er.min, er.opt, er.mbm)
|
||||
readers, err := EncodeReader(ctx, r, er.rs, er.mbm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -365,7 +368,7 @@ func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) ([]io.
|
||||
// the offset might start a few bytes in, so we potentially have to
|
||||
// discard the beginning bytes
|
||||
_, err := io.CopyN(ioutil.Discard, r,
|
||||
offset-firstBlock*int64(er.es.EncodedBlockSize()))
|
||||
offset-firstBlock*int64(er.rs.EncodedBlockSize()))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -375,3 +378,10 @@ func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) ([]io.
|
||||
}
|
||||
return readers, nil
|
||||
}
|
||||
|
||||
func checkMBM(mbm int) error {
|
||||
if mbm < 0 {
|
||||
return Error.New("negative max buffer memory")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -29,8 +29,12 @@ func TestRS(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rs := NewRSScheme(fc, 8*1024)
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0, 0, 0)
|
||||
es := NewRSScheme(fc, 8*1024)
|
||||
rs, err := NewRedundancyStrategy(es, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -58,8 +62,12 @@ func TestRSUnexpectedEOF(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rs := NewRSScheme(fc, 8*1024)
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0, 0, 0)
|
||||
es := NewRSScheme(fc, 8*1024)
|
||||
rs, err := NewRedundancyStrategy(es, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -82,7 +90,11 @@ func TestRSRanger(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rs := NewRSScheme(fc, 8*1024)
|
||||
es := NewRSScheme(fc, 8*1024)
|
||||
rs, err := NewRedundancyStrategy(es, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
encKey := sha256.Sum256([]byte("the secret key"))
|
||||
var firstNonce [12]byte
|
||||
encrypter, err := NewAESGCMEncrypter(
|
||||
@ -91,8 +103,7 @@ func TestRSRanger(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
readers, err := EncodeReader(ctx, TransformReader(PadReader(ioutil.NopCloser(
|
||||
bytes.NewReader(data)), encrypter.InBlockSize()), encrypter, 0),
|
||||
rs, 0, 0, 0)
|
||||
bytes.NewReader(data)), encrypter.InBlockSize()), encrypter, 0), rs, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -100,17 +111,18 @@ func TestRSRanger(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rrs := map[int]ranger.Ranger{}
|
||||
rrs := map[int]ranger.RangeCloser{}
|
||||
for i, piece := range pieces {
|
||||
rrs[i] = ranger.ByteRanger(piece)
|
||||
rrs[i] = ranger.NopCloser(ranger.ByteRanger(piece))
|
||||
}
|
||||
decrypter, err := NewAESGCMDecrypter(
|
||||
&encKey, &firstNonce, rs.DecodedBlockSize())
|
||||
rr, err := Decode(rrs, rs, 0)
|
||||
rc, err := Decode(rrs, rs, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rr, err = Transform(rr, decrypter)
|
||||
defer rc.Close()
|
||||
rr, err := Transform(rc, decrypter)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -131,25 +143,51 @@ func TestRSRanger(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRSEncoderInputParams(t *testing.T) {
|
||||
func TestNewRedundancyStrategy(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
min int
|
||||
opt int
|
||||
expMin int
|
||||
expOpt int
|
||||
errString string
|
||||
}{
|
||||
{0, 0, 4, 4, ""},
|
||||
{-1, 0, 0, 0, "eestream error: negative minimum threshold"},
|
||||
{1, 0, 0, 0, "eestream error: minimum threshold less than required count"},
|
||||
{5, 0, 0, 0, "eestream error: minimum threshold greater than total count"},
|
||||
{0, -1, 0, 0, "eestream error: negative optimum threshold"},
|
||||
{0, 1, 0, 0, "eestream error: optimum threshold less than required count"},
|
||||
{0, 5, 0, 0, "eestream error: optimum threshold greater than total count"},
|
||||
{3, 4, 3, 4, ""},
|
||||
{0, 3, 0, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{4, 3, 0, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{4, 4, 4, 4, ""},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
es := NewRSScheme(fc, 8*1024)
|
||||
rs, err := NewRedundancyStrategy(es, tt.min, tt.opt)
|
||||
if tt.errString != "" {
|
||||
assert.EqualError(t, err, tt.errString, errTag)
|
||||
continue
|
||||
}
|
||||
assert.NoError(t, err, errTag)
|
||||
assert.Equal(t, tt.expMin, rs.MinimumThreshold(), errTag)
|
||||
assert.Equal(t, tt.expOpt, rs.OptimumThreshold(), errTag)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRSEncoderInputParams(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
mbm int
|
||||
errString string
|
||||
}{
|
||||
{0, 0, 0, ""},
|
||||
{-1, 0, 0, "eestream error: negative minimum threshold"},
|
||||
{1, 0, 0, "eestream error: minimum threshold less than required count"},
|
||||
{5, 0, 0, "eestream error: minimum threshold greater than total count"},
|
||||
{0, -1, 0, "eestream error: negative optimum threshold"},
|
||||
{0, 1, 0, "eestream error: optimum threshold less than required count"},
|
||||
{0, 5, 0, "eestream error: optimum threshold greater than total count"},
|
||||
{3, 4, 0, ""},
|
||||
{0, 3, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{4, 3, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{0, 0, -1, "eestream error: negative max buffer memory"},
|
||||
{4, 4, 1024, ""},
|
||||
{0, ""},
|
||||
{-1, "eestream error: negative max buffer memory"},
|
||||
{1024, ""},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
ctx := context.Background()
|
||||
@ -158,8 +196,12 @@ func TestRSEncoderInputParams(t *testing.T) {
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
rs := NewRSScheme(fc, 8*1024)
|
||||
_, err = EncodeReader(ctx, bytes.NewReader(data), rs, tt.min, tt.opt, tt.mbm)
|
||||
es := NewRSScheme(fc, 8*1024)
|
||||
rs, err := NewRedundancyStrategy(es, 0, 0)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
_, err = EncodeReader(ctx, bytes.NewReader(data), rs, tt.mbm)
|
||||
if tt.errString == "" {
|
||||
assert.NoError(t, err, errTag)
|
||||
} else {
|
||||
@ -170,23 +212,12 @@ func TestRSEncoderInputParams(t *testing.T) {
|
||||
|
||||
func TestRSRangerInputParams(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
min int
|
||||
opt int
|
||||
mbm int
|
||||
errString string
|
||||
}{
|
||||
{0, 0, 0, ""},
|
||||
{-1, 0, 0, "eestream error: negative minimum threshold"},
|
||||
{1, 0, 0, "eestream error: minimum threshold less than required count"},
|
||||
{5, 0, 0, "eestream error: minimum threshold greater than total count"},
|
||||
{0, -1, 0, "eestream error: negative optimum threshold"},
|
||||
{0, 1, 0, "eestream error: optimum threshold less than required count"},
|
||||
{0, 5, 0, "eestream error: optimum threshold greater than total count"},
|
||||
{3, 4, 0, ""},
|
||||
{0, 3, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{4, 3, 0, "eestream error: minimum threshold greater than optimum threshold"},
|
||||
{0, 0, -1, "eestream error: negative max buffer memory"},
|
||||
{4, 4, 1024, ""},
|
||||
{0, ""},
|
||||
{-1, "eestream error: negative max buffer memory"},
|
||||
{1024, ""},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
ctx := context.Background()
|
||||
@ -195,8 +226,12 @@ func TestRSRangerInputParams(t *testing.T) {
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
rs := NewRSScheme(fc, 8*1024)
|
||||
_, err = EncodeReader(ctx, bytes.NewReader(data), rs, tt.min, tt.opt, tt.mbm)
|
||||
es := NewRSScheme(fc, 8*1024)
|
||||
rs, err := NewRedundancyStrategy(es, 0, 0)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
_, err = EncodeReader(ctx, bytes.NewReader(data), rs, tt.mbm)
|
||||
if tt.errString == "" {
|
||||
assert.NoError(t, err, errTag)
|
||||
} else {
|
||||
@ -399,8 +434,12 @@ func testRSProblematic(t *testing.T, tt testCase, i int, fn problematicReadClose
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
return
|
||||
}
|
||||
rs := NewRSScheme(fc, tt.blockSize)
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0, 0, 3*1024)
|
||||
es := NewRSScheme(fc, tt.blockSize)
|
||||
rs, err := NewRedundancyStrategy(es, 0, 0)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
return
|
||||
}
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 3*1024)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
return
|
||||
}
|
||||
@ -471,8 +510,12 @@ func TestEncoderStalledReaders(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rs := NewRSScheme(fc, 1024)
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 35, 50, 0)
|
||||
es := NewRSScheme(fc, 1024)
|
||||
rs, err := NewRedundancyStrategy(es, 35, 50)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
readers, err := EncodeReader(ctx, bytes.NewReader(data), rs, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -236,6 +236,10 @@ func encryptFile(data io.ReadCloser, blockSize uint, bucket, object string) erro
|
||||
return err
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, *pieceBlockSize)
|
||||
rs, err := eestream.NewRedundancyStrategy(es, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encKey := sha256.Sum256([]byte(*key))
|
||||
var firstNonce [12]byte
|
||||
encrypter, err := eestream.NewAESGCMEncrypter(
|
||||
@ -243,9 +247,9 @@ func encryptFile(data io.ReadCloser, blockSize uint, bucket, object string) erro
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readers, err := eestream.EncodeReader(context.Background(), eestream.TransformReader(
|
||||
eestream.PadReader(data, encrypter.InBlockSize()), encrypter, 0),
|
||||
es, 0, 0, 4*1024*1024)
|
||||
readers, err := eestream.EncodeReader(context.Background(),
|
||||
eestream.TransformReader(eestream.PadReader(data,
|
||||
encrypter.InBlockSize()), encrypter, 0), rs, 4*1024*1024)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
176
pkg/storage/ec/client.go
Normal file
176
pkg/storage/ec/client.go
Normal file
@ -0,0 +1,176 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package ecclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
monkit "gopkg.in/spacemonkeygo/monkit.v2"
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/piecestore/rpc/client"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/transport"
|
||||
proto "storj.io/storj/protos/overlay"
|
||||
)
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Client defines an interface for storing erasure coded data to piece store nodes
|
||||
type Client interface {
|
||||
Put(ctx context.Context, nodes []proto.Node, rs eestream.RedundancyStrategy,
|
||||
pieceID client.PieceID, data io.Reader, expiration time.Time) error
|
||||
Get(ctx context.Context, nodes []proto.Node, es eestream.ErasureScheme,
|
||||
pieceID client.PieceID, size int64) (ranger.RangeCloser, error)
|
||||
Delete(ctx context.Context, nodes []proto.Node, pieceID client.PieceID) error
|
||||
}
|
||||
|
||||
type dialer interface {
|
||||
dial(ctx context.Context, node proto.Node) (ps client.PSClient, err error)
|
||||
}
|
||||
|
||||
type defaultDialer struct {
|
||||
t transport.Client
|
||||
}
|
||||
|
||||
func (d *defaultDialer) dial(ctx context.Context, node proto.Node) (ps client.PSClient, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
c, err := d.t.DialNode(ctx, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client.NewPSClient(c), nil
|
||||
}
|
||||
|
||||
type ecClient struct {
|
||||
d dialer
|
||||
mbm int
|
||||
}
|
||||
|
||||
// NewClient from the given TransportClient and max buffer memory
|
||||
func NewClient(t transport.Client, mbm int) Client {
|
||||
return &ecClient{d: &defaultDialer{t: t}, mbm: mbm}
|
||||
}
|
||||
|
||||
func (ec *ecClient) Put(ctx context.Context, nodes []proto.Node, rs eestream.RedundancyStrategy,
|
||||
pieceID client.PieceID, data io.Reader, expiration time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(nodes) != rs.TotalCount() {
|
||||
return Error.New("number of nodes do not match total count of erasure scheme")
|
||||
}
|
||||
readers, err := eestream.EncodeReader(ctx, data, rs, ec.mbm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errs := make(chan error, len(readers))
|
||||
for i, n := range nodes {
|
||||
go func(i int, n proto.Node) {
|
||||
ps, err := ec.d.dial(ctx, n)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed putting piece %s to node %s: %v",
|
||||
pieceID, n.GetId(), err)
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
err = ps.Put(ctx, pieceID, readers[i], expiration)
|
||||
ps.CloseConn()
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed putting piece %s to node %s: %v",
|
||||
pieceID, n.GetId(), err)
|
||||
}
|
||||
errs <- err
|
||||
}(i, n)
|
||||
}
|
||||
allerrs := collectErrors(errs, len(readers))
|
||||
sc := len(readers) - len(allerrs)
|
||||
if sc < rs.MinimumThreshold() {
|
||||
return Error.New(
|
||||
"successful puts (%d) less than minimum threshold (%d)",
|
||||
sc, rs.MinimumThreshold())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ec *ecClient) Get(ctx context.Context, nodes []proto.Node, es eestream.ErasureScheme,
|
||||
pieceID client.PieceID, size int64) (rr ranger.RangeCloser, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(nodes) != es.TotalCount() {
|
||||
return nil, Error.New("number of nodes do not match total count of erasure scheme")
|
||||
}
|
||||
rrs := map[int]ranger.RangeCloser{}
|
||||
type rangerInfo struct {
|
||||
i int
|
||||
rr ranger.RangeCloser
|
||||
err error
|
||||
}
|
||||
ch := make(chan rangerInfo, len(nodes))
|
||||
for i, n := range nodes {
|
||||
go func(i int, n proto.Node) {
|
||||
ps, err := ec.d.dial(ctx, n)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed getting piece %s from node %s: %v",
|
||||
pieceID, n.GetId(), err)
|
||||
ch <- rangerInfo{i: i, rr: nil, err: err}
|
||||
return
|
||||
}
|
||||
rr, err := ps.Get(ctx, pieceID, size)
|
||||
// no ps.CloseConn() here, the connection will be closed by
|
||||
// the caller using RangeCloser.Close
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed getting piece %s from node %s: %v",
|
||||
pieceID, n.GetId(), err)
|
||||
}
|
||||
ch <- rangerInfo{i: i, rr: rr, err: err}
|
||||
}(i, n)
|
||||
}
|
||||
for range nodes {
|
||||
rri := <-ch
|
||||
if rri.err == nil {
|
||||
rrs[rri.i] = rri.rr
|
||||
}
|
||||
}
|
||||
return eestream.Decode(rrs, es, ec.mbm)
|
||||
}
|
||||
|
||||
func (ec *ecClient) Delete(ctx context.Context, nodes []proto.Node, pieceID client.PieceID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
errs := make(chan error, len(nodes))
|
||||
for _, n := range nodes {
|
||||
go func(n proto.Node) {
|
||||
ps, err := ec.d.dial(ctx, n)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed deleting piece %s from node %s: %v",
|
||||
pieceID, n.GetId(), err)
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
err = ps.Delete(ctx, pieceID)
|
||||
ps.CloseConn()
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed deleting piece %s from node %s: %v",
|
||||
pieceID, n.GetId(), err)
|
||||
}
|
||||
errs <- err
|
||||
}(n)
|
||||
}
|
||||
allerrs := collectErrors(errs, len(nodes))
|
||||
if len(allerrs) > 0 && len(allerrs) == len(nodes) {
|
||||
return allerrs[0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func collectErrors(errs <-chan error, size int) []error {
|
||||
var result []error
|
||||
for i := 0; i < size; i++ {
|
||||
err := <-errs
|
||||
if err != nil {
|
||||
result = append(result, err)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
295
pkg/storage/ec/client_test.go
Normal file
295
pkg/storage/ec/client_test.go
Normal file
@ -0,0 +1,295 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package ecclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/vivint/infectious"
|
||||
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/piecestore/rpc/client"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
proto "storj.io/storj/protos/overlay"
|
||||
)
|
||||
|
||||
const (
|
||||
dialFailed = "dial failed"
|
||||
opFailed = "op failed"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrDialFailed = errors.New(dialFailed)
|
||||
ErrOpFailed = errors.New(opFailed)
|
||||
)
|
||||
|
||||
var (
|
||||
node0 = proto.Node{Id: "node-0"}
|
||||
node1 = proto.Node{Id: "node-1"}
|
||||
node2 = proto.Node{Id: "node-2"}
|
||||
node3 = proto.Node{Id: "node-3"}
|
||||
)
|
||||
|
||||
type mockDialer struct {
|
||||
m map[proto.Node]client.PSClient
|
||||
}
|
||||
|
||||
func (d *mockDialer) dial(ctx context.Context, node proto.Node) (
|
||||
ps client.PSClient, err error) {
|
||||
ps = d.m[node]
|
||||
if ps == nil {
|
||||
return nil, ErrDialFailed
|
||||
}
|
||||
return d.m[node], nil
|
||||
}
|
||||
|
||||
func TestNewECClient(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
tc := NewMockClient(ctrl)
|
||||
mbm := 1234
|
||||
|
||||
ec := NewClient(tc, mbm)
|
||||
assert.NotNil(t, ec)
|
||||
|
||||
ecc, ok := ec.(*ecClient)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, ecc.d)
|
||||
assert.Equal(t, mbm, ecc.mbm)
|
||||
|
||||
dd, ok := ecc.d.(*defaultDialer)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, dd.t)
|
||||
assert.Equal(t, dd.t, tc)
|
||||
}
|
||||
|
||||
func TestDefaultDialer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
for i, tt := range []struct {
|
||||
err error
|
||||
errString string
|
||||
}{
|
||||
{nil, ""},
|
||||
{ErrDialFailed, dialFailed},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
tc := NewMockClient(ctrl)
|
||||
tc.EXPECT().DialNode(gomock.Any(), node0).Return(nil, tt.err)
|
||||
|
||||
dd := defaultDialer{t: tc}
|
||||
_, err := dd.dial(ctx, node0)
|
||||
|
||||
if tt.errString != "" {
|
||||
assert.EqualError(t, err, tt.errString, errTag)
|
||||
} else {
|
||||
assert.NoError(t, err, errTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPut(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
for i, tt := range []struct {
|
||||
nodes []proto.Node
|
||||
min int
|
||||
mbm int
|
||||
errs []error
|
||||
errString string
|
||||
}{
|
||||
{[]proto.Node{}, 0, 0, []error{}, "ecclient error: " +
|
||||
"number of nodes do not match total count of erasure scheme"},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0, -1,
|
||||
[]error{nil, nil, nil, nil},
|
||||
"eestream error: negative max buffer memory"},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0, 0,
|
||||
[]error{nil, nil, nil, nil}, ""},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0, 0,
|
||||
[]error{nil, ErrDialFailed, nil, nil},
|
||||
"ecclient error: successful puts (3) less than minimum threshold (4)"},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0, 0,
|
||||
[]error{nil, ErrOpFailed, nil, nil},
|
||||
"ecclient error: successful puts (3) less than minimum threshold (4)"},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 2, 0,
|
||||
[]error{nil, ErrDialFailed, nil, nil}, ""},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 2, 0,
|
||||
[]error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed},
|
||||
"ecclient error: successful puts (1) less than minimum threshold (2)"},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
id := client.NewPieceID()
|
||||
size := 32 * 1024
|
||||
ttl := time.Now()
|
||||
|
||||
errs := make(map[proto.Node]error, len(tt.nodes))
|
||||
for i, n := range tt.nodes {
|
||||
errs[n] = tt.errs[i]
|
||||
}
|
||||
|
||||
m := make(map[proto.Node]client.PSClient, len(tt.nodes))
|
||||
for _, n := range tt.nodes {
|
||||
if errs[n] != ErrDialFailed && tt.mbm >= 0 {
|
||||
ps := NewMockPSClient(ctrl)
|
||||
gomock.InOrder(
|
||||
ps.EXPECT().Put(gomock.Any(), id, gomock.Any(), ttl).Return(errs[n]),
|
||||
ps.EXPECT().CloseConn().Return(nil),
|
||||
)
|
||||
m[n] = ps
|
||||
}
|
||||
}
|
||||
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, size/4)
|
||||
rs, err := eestream.NewRedundancyStrategy(es, tt.min, 0)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
r := io.LimitReader(rand.Reader, int64(size))
|
||||
ec := ecClient{d: &mockDialer{m: m}, mbm: tt.mbm}
|
||||
err = ec.Put(ctx, tt.nodes, rs, id, r, ttl)
|
||||
|
||||
if tt.errString != "" {
|
||||
assert.EqualError(t, err, tt.errString, errTag)
|
||||
} else {
|
||||
assert.NoError(t, err, errTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
for i, tt := range []struct {
|
||||
nodes []proto.Node
|
||||
mbm int
|
||||
errs []error
|
||||
errString string
|
||||
}{
|
||||
{[]proto.Node{}, 0, []error{}, "ecclient error: " +
|
||||
"number of nodes do not match total count of erasure scheme"},
|
||||
{[]proto.Node{node0, node1, node2, node3}, -1,
|
||||
[]error{nil, nil, nil, nil},
|
||||
"eestream error: negative max buffer memory"},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0,
|
||||
[]error{nil, nil, nil, nil}, ""},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0,
|
||||
[]error{nil, ErrDialFailed, nil, nil}, ""},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0,
|
||||
[]error{nil, ErrOpFailed, nil, nil}, ""},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0,
|
||||
[]error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed},
|
||||
"eestream error: not enough readers to reconstruct data!"},
|
||||
{[]proto.Node{node0, node1, node2, node3}, 0,
|
||||
[]error{ErrDialFailed, ErrOpFailed, ErrOpFailed, ErrDialFailed},
|
||||
"eestream error: not enough readers to reconstruct data!"},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
id := client.NewPieceID()
|
||||
size := 32 * 1024
|
||||
|
||||
errs := make(map[proto.Node]error, len(tt.nodes))
|
||||
for i, n := range tt.nodes {
|
||||
errs[n] = tt.errs[i]
|
||||
}
|
||||
|
||||
m := make(map[proto.Node]client.PSClient, len(tt.nodes))
|
||||
for _, n := range tt.nodes {
|
||||
if errs[n] != ErrDialFailed {
|
||||
ps := NewMockPSClient(ctrl)
|
||||
ps.EXPECT().Get(gomock.Any(), id, int64(size)).Return(
|
||||
ranger.NopCloser(ranger.ByteRanger(nil)), errs[n])
|
||||
m[n] = ps
|
||||
}
|
||||
}
|
||||
|
||||
fc, err := infectious.NewFEC(2, 4)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
es := eestream.NewRSScheme(fc, size/4)
|
||||
ec := ecClient{d: &mockDialer{m: m}, mbm: tt.mbm}
|
||||
rr, err := ec.Get(ctx, tt.nodes, es, id, int64(size))
|
||||
|
||||
if tt.errString != "" {
|
||||
assert.EqualError(t, err, tt.errString, errTag)
|
||||
} else {
|
||||
assert.NoError(t, err, errTag)
|
||||
assert.NotNil(t, rr, errTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelete(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
for i, tt := range []struct {
|
||||
nodes []proto.Node
|
||||
errs []error
|
||||
errString string
|
||||
}{
|
||||
{[]proto.Node{}, []error{}, ""},
|
||||
{[]proto.Node{node0}, []error{nil}, ""},
|
||||
{[]proto.Node{node0}, []error{ErrDialFailed}, dialFailed},
|
||||
{[]proto.Node{node0}, []error{ErrOpFailed}, opFailed},
|
||||
{[]proto.Node{node0, node1}, []error{nil, nil}, ""},
|
||||
{[]proto.Node{node0, node1}, []error{ErrDialFailed, nil}, ""},
|
||||
{[]proto.Node{node0, node1}, []error{nil, ErrOpFailed}, ""},
|
||||
{[]proto.Node{node0, node1}, []error{ErrDialFailed, ErrDialFailed}, dialFailed},
|
||||
{[]proto.Node{node0, node1}, []error{ErrOpFailed, ErrOpFailed}, opFailed},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
id := client.NewPieceID()
|
||||
|
||||
errs := make(map[proto.Node]error, len(tt.nodes))
|
||||
for i, n := range tt.nodes {
|
||||
errs[n] = tt.errs[i]
|
||||
}
|
||||
|
||||
m := make(map[proto.Node]client.PSClient, len(tt.nodes))
|
||||
for _, n := range tt.nodes {
|
||||
if errs[n] != ErrDialFailed {
|
||||
ps := NewMockPSClient(ctrl)
|
||||
gomock.InOrder(
|
||||
ps.EXPECT().Delete(gomock.Any(), id).Return(errs[n]),
|
||||
ps.EXPECT().CloseConn().Return(nil),
|
||||
)
|
||||
m[n] = ps
|
||||
}
|
||||
}
|
||||
|
||||
ec := ecClient{d: &mockDialer{m: m}}
|
||||
err := ec.Delete(ctx, tt.nodes, id)
|
||||
|
||||
if tt.errString != "" {
|
||||
assert.EqualError(t, err, tt.errString, errTag)
|
||||
} else {
|
||||
assert.NoError(t, err, errTag)
|
||||
}
|
||||
}
|
||||
}
|
11
pkg/storage/ec/common.go
Normal file
11
pkg/storage/ec/common.go
Normal file
@ -0,0 +1,11 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package ecclient
|
||||
|
||||
import (
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
// Error is the errs class of standard Ranger errors
|
||||
var Error = errs.Class("ecclient error")
|
102
pkg/storage/ec/psclient_mock_test.go
Normal file
102
pkg/storage/ec/psclient_mock_test.go
Normal file
@ -0,0 +1,102 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: storj.io/storj/pkg/piecestore/rpc/client (interfaces: PSClient)
|
||||
|
||||
// Package ecclient is a generated GoMock package.
|
||||
package ecclient
|
||||
|
||||
import (
|
||||
context "context"
|
||||
io "io"
|
||||
reflect "reflect"
|
||||
time "time"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
client "storj.io/storj/pkg/piecestore/rpc/client"
|
||||
ranger "storj.io/storj/pkg/ranger"
|
||||
piecestore "storj.io/storj/protos/piecestore"
|
||||
)
|
||||
|
||||
// MockPSClient is a mock of PSClient interface
|
||||
type MockPSClient struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockPSClientMockRecorder
|
||||
}
|
||||
|
||||
// MockPSClientMockRecorder is the mock recorder for MockPSClient
|
||||
type MockPSClientMockRecorder struct {
|
||||
mock *MockPSClient
|
||||
}
|
||||
|
||||
// NewMockPSClient creates a new mock instance
|
||||
func NewMockPSClient(ctrl *gomock.Controller) *MockPSClient {
|
||||
mock := &MockPSClient{ctrl: ctrl}
|
||||
mock.recorder = &MockPSClientMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockPSClient) EXPECT() *MockPSClientMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// CloseConn mocks base method
|
||||
func (m *MockPSClient) CloseConn() error {
|
||||
ret := m.ctrl.Call(m, "CloseConn")
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// CloseConn indicates an expected call of CloseConn
|
||||
func (mr *MockPSClientMockRecorder) CloseConn() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseConn", reflect.TypeOf((*MockPSClient)(nil).CloseConn))
|
||||
}
|
||||
|
||||
// Delete mocks base method
|
||||
func (m *MockPSClient) Delete(arg0 context.Context, arg1 client.PieceID) error {
|
||||
ret := m.ctrl.Call(m, "Delete", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Delete indicates an expected call of Delete
|
||||
func (mr *MockPSClientMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockPSClient)(nil).Delete), arg0, arg1)
|
||||
}
|
||||
|
||||
// Get mocks base method
|
||||
func (m *MockPSClient) Get(arg0 context.Context, arg1 client.PieceID, arg2 int64) (ranger.RangeCloser, error) {
|
||||
ret := m.ctrl.Call(m, "Get", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(ranger.RangeCloser)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Get indicates an expected call of Get
|
||||
func (mr *MockPSClientMockRecorder) Get(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPSClient)(nil).Get), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// Meta mocks base method
|
||||
func (m *MockPSClient) Meta(arg0 context.Context, arg1 client.PieceID) (*piecestore.PieceSummary, error) {
|
||||
ret := m.ctrl.Call(m, "Meta", arg0, arg1)
|
||||
ret0, _ := ret[0].(*piecestore.PieceSummary)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Meta indicates an expected call of Meta
|
||||
func (mr *MockPSClientMockRecorder) Meta(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockPSClient)(nil).Meta), arg0, arg1)
|
||||
}
|
||||
|
||||
// Put mocks base method
|
||||
func (m *MockPSClient) Put(arg0 context.Context, arg1 client.PieceID, arg2 io.Reader, arg3 time.Time) error {
|
||||
ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Put indicates an expected call of Put
|
||||
func (mr *MockPSClientMockRecorder) Put(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockPSClient)(nil).Put), arg0, arg1, arg2, arg3)
|
||||
}
|
62
pkg/storage/ec/transportclient_mock_test.go
Normal file
62
pkg/storage/ec/transportclient_mock_test.go
Normal file
@ -0,0 +1,62 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: storj.io/storj/pkg/transport (interfaces: Client)
|
||||
|
||||
// Package ecclient is a generated GoMock package.
|
||||
package ecclient
|
||||
|
||||
import (
|
||||
context "context"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
grpc "google.golang.org/grpc"
|
||||
reflect "reflect"
|
||||
overlay "storj.io/storj/protos/overlay"
|
||||
)
|
||||
|
||||
// MockClient is a mock of Client interface
|
||||
type MockClient struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockClientMockRecorder
|
||||
}
|
||||
|
||||
// MockClientMockRecorder is the mock recorder for MockClient
|
||||
type MockClientMockRecorder struct {
|
||||
mock *MockClient
|
||||
}
|
||||
|
||||
// NewMockClient creates a new mock instance
|
||||
func NewMockClient(ctrl *gomock.Controller) *MockClient {
|
||||
mock := &MockClient{ctrl: ctrl}
|
||||
mock.recorder = &MockClientMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockClient) EXPECT() *MockClientMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// DialNode mocks base method
|
||||
func (m *MockClient) DialNode(arg0 context.Context, arg1 overlay.Node) (*grpc.ClientConn, error) {
|
||||
ret := m.ctrl.Call(m, "DialNode", arg0, arg1)
|
||||
ret0, _ := ret[0].(*grpc.ClientConn)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// DialNode indicates an expected call of DialNode
|
||||
func (mr *MockClientMockRecorder) DialNode(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialNode", reflect.TypeOf((*MockClient)(nil).DialNode), arg0, arg1)
|
||||
}
|
||||
|
||||
// DialUnauthenticated mocks base method
|
||||
func (m *MockClient) DialUnauthenticated(arg0 context.Context, arg1 overlay.NodeAddress) (*grpc.ClientConn, error) {
|
||||
ret := m.ctrl.Call(m, "DialUnauthenticated", arg0, arg1)
|
||||
ret0, _ := ret[0].(*grpc.ClientConn)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// DialUnauthenticated indicates an expected call of DialUnauthenticated
|
||||
func (mr *MockClientMockRecorder) DialUnauthenticated(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DialUnauthenticated", reflect.TypeOf((*MockClient)(nil).DialUnauthenticated), arg0, arg1)
|
||||
}
|
@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
proto "storj.io/storj/protos/overlay"
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user