eestream: better error reporting when downloads fail (#295)

* eestream: better error reporting when downloads fail

* Fix failing tests

* Make the check in hasEnoughShares a little bit cleaner
This commit is contained in:
JT Olio 2018-09-07 08:37:13 -06:00 committed by Brandon Iglesias
parent 0a2839b2d7
commit 1ea3f9f03b

View File

@ -4,7 +4,10 @@
package eestream
import (
"fmt"
"io"
"sort"
"strings"
"sync"
"github.com/vivint/infectious"
@ -17,7 +20,7 @@ type StripeReader struct {
bufs map[int]*PieceBuffer
inbufs [][]byte
inmap map[int][]byte
errmap map[int]bool
errmap map[int]error
}
// NewStripeReader creates a new StripeReader from the given readers, erasure
@ -35,7 +38,7 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int) *Strip
bufs: make(map[int]*PieceBuffer, es.TotalCount()),
inbufs: make([][]byte, es.TotalCount()),
inmap: make(map[int][]byte, es.TotalCount()),
errmap: make(map[int]bool, es.TotalCount()),
errmap: make(map[int]error, es.TotalCount()),
}
for i := 0; i < es.TotalCount(); i++ {
@ -86,7 +89,7 @@ func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) {
r.cond.L.Lock()
defer r.cond.L.Unlock()
for {
for r.pendingReaders() {
for r.readAvailableShares(num) == 0 {
r.cond.Wait()
}
@ -101,6 +104,8 @@ func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) {
return out, nil
}
}
// could not read enough shares to attempt a decode
return nil, r.combineErrs()
}
// readAvailableShares reads the available num-th erasure shares from the piece
@ -108,13 +113,13 @@ func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) {
// read.
func (r *StripeReader) readAvailableShares(num int64) (n int) {
for i := 0; i < len(r.bufs); i++ {
if r.inmap[i] != nil || r.errmap[i] {
if r.inmap[i] != nil || r.errmap[i] != nil {
continue
}
if r.bufs[i].HasShare(num) {
err := r.bufs[i].ReadShare(num, r.inbufs[i])
if err != nil {
r.errmap[i] = true
r.errmap[i] = err
} else {
r.inmap[i] = r.inbufs[i]
}
@ -124,11 +129,16 @@ func (r *StripeReader) readAvailableShares(num int64) (n int) {
return n
}
// pendingReaders checks if there are any pending readers to get a share from.
func (r *StripeReader) pendingReaders() bool {
return len(r.inmap)+len(r.errmap) < r.scheme.TotalCount()
}
// hasEnoughShares check if there are enough erasure shares read to attempt
// a decode.
func (r *StripeReader) hasEnoughShares() bool {
return len(r.inmap) >= r.scheme.RequiredCount()+1 ||
len(r.inmap)+len(r.errmap) >= r.scheme.TotalCount()
(len(r.inmap) == r.scheme.RequiredCount() && !r.pendingReaders())
}
// shouldWaitForMore checks the returned decode error if it makes sense to wait
@ -140,5 +150,21 @@ func (r *StripeReader) shouldWaitForMore(err error) bool {
return false
}
// check if there are more input buffers to wait for
return len(r.inmap)+len(r.errmap) < r.scheme.TotalCount()
return r.pendingReaders()
}
// combineErrs makes a useful error message from the errors in errmap.
// combineErrs always returns an error.
func (r *StripeReader) combineErrs() error {
if len(r.errmap) == 0 {
return Error.New("programmer error: no errors to combine")
}
errstrings := make([]string, 0, len(r.errmap))
for i, err := range r.errmap {
errstrings = append(errstrings,
fmt.Sprintf("\nerror retrieving piece %02d: %v", i, err))
}
sort.Strings(errstrings)
return Error.New("failed to download stripe: %s",
strings.Join(errstrings, ""))
}