storj/pkg/eestream/stripe.go

145 lines
3.5 KiB
Go
Raw Normal View History

// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package eestream
import (
"io"
"sync"
"github.com/vivint/infectious"
)
// StripeReader can read and decodes stripes from a set of readers
type StripeReader struct {
scheme ErasureScheme
cond *sync.Cond
bufs map[int]*PieceBuffer
inbufs [][]byte
inmap map[int][]byte
errmap map[int]bool
}
// NewStripeReader creates a new StripeReader from the given readers, erasure
// scheme and max buffer memory.
func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int) *StripeReader {
bufSize := mbm / es.TotalCount()
bufSize -= bufSize % es.EncodedBlockSize()
if bufSize < es.EncodedBlockSize() {
bufSize = es.EncodedBlockSize()
}
r := &StripeReader{
scheme: es,
cond: sync.NewCond(&sync.Mutex{}),
bufs: make(map[int]*PieceBuffer, es.TotalCount()),
inbufs: make([][]byte, es.TotalCount()),
inmap: make(map[int][]byte, es.TotalCount()),
errmap: make(map[int]bool, es.TotalCount()),
}
for i := 0; i < es.TotalCount(); i++ {
r.inbufs[i] = make([]byte, es.EncodedBlockSize())
r.bufs[i] = NewPieceBuffer(make([]byte, bufSize), es.EncodedBlockSize(), r.cond)
}
// Kick off a goroutine each reader to be copied into a PieceBuffer.
for i, buf := range r.bufs {
go func(r io.Reader, buf *PieceBuffer) {
_, err := io.Copy(buf, r)
if err != nil {
buf.SetError(err)
return
}
buf.SetError(io.EOF)
}(rs[i], buf)
}
return r
}
// Close closes the StripeReader and all PieceBuffers.
func (r *StripeReader) Close() error {
errs := make(chan error, len(r.bufs))
for _, buf := range r.bufs {
go func(c io.Closer) {
errs <- c.Close()
}(buf)
}
var first error
for range r.bufs {
err := <-errs
if err != nil && first == nil {
first = Error.Wrap(err)
}
}
return first
}
// ReadStripe reads and decodes the num-th stripe and concatenates it to p. The
// return value is the updated byte slice.
func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) {
for i := range r.inmap {
delete(r.inmap, i)
}
r.cond.L.Lock()
defer r.cond.L.Unlock()
for {
for r.readAvailableShares(num) == 0 {
r.cond.Wait()
}
if r.hasEnoughShares() {
out, err := r.scheme.Decode(p, r.inmap)
if err != nil {
if r.shouldWaitForMore(err) {
continue
}
return nil, err
}
return out, nil
}
}
}
// readAvailableShares reads the available num-th erasure shares from the piece
// buffers without blocking. The return value n is the number of erasure shares
// read.
func (r *StripeReader) readAvailableShares(num int64) (n int) {
for i := 0; i < len(r.bufs); i++ {
if r.inmap[i] != nil || r.errmap[i] {
continue
}
if r.bufs[i].HasShare(num) {
err := r.bufs[i].ReadShare(num, r.inbufs[i])
if err != nil {
r.errmap[i] = true
} else {
r.inmap[i] = r.inbufs[i]
}
n++
}
}
return n
}
// hasEnoughShares check if there are enough erasure shares read to attempt
// a decode.
func (r *StripeReader) hasEnoughShares() bool {
return len(r.inmap) >= r.scheme.RequiredCount()+1 ||
len(r.inmap)+len(r.errmap) >= r.scheme.TotalCount()
}
// shouldWaitForMore checks the returned decode error if it makes sense to wait
// for more erasure shares to attempt an error correction.
func (r *StripeReader) shouldWaitForMore(err error) bool {
// check if the error is due to error detection
if !infectious.NotEnoughShares.Contains(err) &&
!infectious.TooManyErrors.Contains(err) {
return false
}
// check if there are more input buffers to wait for
return len(r.inmap)+len(r.errmap) < r.scheme.TotalCount()
}