1c56a4cf5c
* Decoder can tolerate readers with unexpected EOF * Return EOF if required number of inbufs are at EOF * Use existing randData() to generate random data for tests * Test case for io.ErrUnexpectedEOF * Add TransformReaderSize constructor
189 lines
5.0 KiB
Go
189 lines
5.0 KiB
Go
// Copyright (C) 2018 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package eestream
|
|
|
|
import (
|
|
"io"
|
|
"io/ioutil"
|
|
|
|
"storj.io/storj/internal/pkg/readcloser"
|
|
"storj.io/storj/pkg/ranger"
|
|
)
|
|
|
|
type decodedReader struct {
|
|
rs map[int]io.ReadCloser
|
|
es ErasureScheme
|
|
inbufs map[int][]byte
|
|
outbuf []byte
|
|
err error
|
|
}
|
|
|
|
type readerError struct {
|
|
i int // reader index in the map
|
|
err error
|
|
}
|
|
|
|
// DecodeReaders takes a map of readers and an ErasureScheme returning a
|
|
// combined Reader. The map, 'rs', must be a mapping of erasure piece numbers
|
|
// to erasure piece streams.
|
|
func DecodeReaders(rs map[int]io.ReadCloser, es ErasureScheme) io.ReadCloser {
|
|
dr := &decodedReader{
|
|
rs: rs,
|
|
es: es,
|
|
inbufs: make(map[int][]byte, len(rs)),
|
|
outbuf: make([]byte, 0, es.DecodedBlockSize()),
|
|
}
|
|
for i := range rs {
|
|
dr.inbufs[i] = make([]byte, es.EncodedBlockSize())
|
|
}
|
|
return dr
|
|
}
|
|
|
|
func (dr *decodedReader) Read(p []byte) (n int, err error) {
|
|
if len(dr.outbuf) <= 0 {
|
|
// if the output buffer is empty, let's fill it again
|
|
// if we've already had an error, fail
|
|
if dr.err != nil {
|
|
return 0, err
|
|
}
|
|
// we're going to kick off a bunch of goroutines. make a
|
|
// channel to catch those goroutine errors. importantly,
|
|
// the channel has a buffer size to contain all the errors
|
|
// even if we read none, so we can return without receiving
|
|
// every channel value
|
|
errs := make(chan readerError, len(dr.rs))
|
|
for i := range dr.rs {
|
|
go func(i int) {
|
|
// fill the buffer from the piece input
|
|
_, err := io.ReadFull(dr.rs[i], dr.inbufs[i])
|
|
errs <- readerError{i, err}
|
|
}(i)
|
|
}
|
|
// catch all the errors
|
|
inbufs := make(map[int][]byte, len(dr.inbufs))
|
|
eofbufs := 0
|
|
for range dr.rs {
|
|
re := <-errs
|
|
if re.err == nil {
|
|
// add inbuf for decoding only if no error
|
|
inbufs[re.i] = dr.inbufs[re.i]
|
|
} else if re.err == io.EOF {
|
|
// keep track of inbufs at EOF
|
|
eofbufs++
|
|
}
|
|
if eofbufs >= dr.es.RequiredCount() {
|
|
dr.err = io.EOF
|
|
return 0, dr.err
|
|
}
|
|
}
|
|
// we have all the input buffers, fill the decoded output buffer
|
|
dr.outbuf, err = dr.es.Decode(dr.outbuf, inbufs)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
// copy what data we have to the output
|
|
n = copy(p, dr.outbuf)
|
|
// slide the remaining bytes to the beginning
|
|
copy(dr.outbuf, dr.outbuf[n:])
|
|
// shrink the remaining buffer
|
|
dr.outbuf = dr.outbuf[:len(dr.outbuf)-n]
|
|
return n, nil
|
|
}
|
|
|
|
func (dr *decodedReader) Close() error {
|
|
var firstErr error
|
|
for _, c := range dr.rs {
|
|
err := c.Close()
|
|
if err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
type decodedRanger struct {
|
|
es ErasureScheme
|
|
rrs map[int]ranger.Ranger
|
|
inSize int64
|
|
}
|
|
|
|
// Decode takes a map of Rangers and an ErasureSchema and returns a combined
|
|
// Ranger. The map, 'rrs', must be a mapping of erasure piece numbers
|
|
// to erasure piece rangers.
|
|
func Decode(rrs map[int]ranger.Ranger, es ErasureScheme) (
|
|
ranger.Ranger, error) {
|
|
size := int64(-1)
|
|
for _, rr := range rrs {
|
|
if size == -1 {
|
|
size = rr.Size()
|
|
} else {
|
|
if size != rr.Size() {
|
|
return nil, Error.New("decode failure: range reader sizes don't " +
|
|
"all match")
|
|
}
|
|
}
|
|
}
|
|
if size == -1 {
|
|
return ranger.ByteRanger(nil), nil
|
|
}
|
|
if size%int64(es.EncodedBlockSize()) != 0 {
|
|
return nil, Error.New("invalid erasure decoder and range reader combo. " +
|
|
"range reader size must be a multiple of erasure encoder block size")
|
|
}
|
|
if len(rrs) < es.RequiredCount() {
|
|
return nil, Error.New("not enough readers to reconstruct data!")
|
|
}
|
|
return &decodedRanger{
|
|
es: es,
|
|
rrs: rrs,
|
|
inSize: size,
|
|
}, nil
|
|
}
|
|
|
|
func (dr *decodedRanger) Size() int64 {
|
|
blocks := dr.inSize / int64(dr.es.EncodedBlockSize())
|
|
return blocks * int64(dr.es.DecodedBlockSize())
|
|
}
|
|
|
|
func (dr *decodedRanger) Range(offset, length int64) io.ReadCloser {
|
|
// offset and length might not be block-aligned. figure out which
|
|
// blocks contain this request
|
|
firstBlock, blockCount := calcEncompassingBlocks(
|
|
offset, length, dr.es.DecodedBlockSize())
|
|
|
|
// go ask for ranges for all those block boundaries
|
|
// do it parallel to save from network latency
|
|
readers := make(map[int]io.ReadCloser, len(dr.rrs))
|
|
type indexReadCloser struct {
|
|
i int
|
|
r io.ReadCloser
|
|
}
|
|
result := make(chan indexReadCloser, len(dr.rrs))
|
|
for i, rr := range dr.rrs {
|
|
go func(i int, rr ranger.Ranger) {
|
|
r := rr.Range(
|
|
firstBlock*int64(dr.es.EncodedBlockSize()),
|
|
blockCount*int64(dr.es.EncodedBlockSize()))
|
|
result <- indexReadCloser{i, r}
|
|
}(i, rr)
|
|
}
|
|
// wait for all goroutines to finish and save result in readers map
|
|
for range dr.rrs {
|
|
res := <-result
|
|
readers[res.i] = res.r
|
|
}
|
|
// decode from all those ranges
|
|
r := DecodeReaders(readers, dr.es)
|
|
// offset might start a few bytes in, potentially discard the initial bytes
|
|
_, err := io.CopyN(ioutil.Discard, r,
|
|
offset-firstBlock*int64(dr.es.DecodedBlockSize()))
|
|
if err != nil {
|
|
return readcloser.FatalReadCloser(Error.Wrap(err))
|
|
}
|
|
// length might not have included all of the blocks, limit what we return
|
|
return readcloser.LimitReadCloser(r, length)
|
|
}
|