storj/uplink/eestream/decode.go

241 lines
7.3 KiB
Go
Raw Normal View History

2019-01-24 20:15:10 +00:00
// Copyright (C) 2019 Storj Labs, Inc.
2018-04-11 14:41:50 +01:00
// See LICENSE for copying information.
package eestream
import (
"context"
2018-04-11 14:41:50 +01:00
"io"
"io/ioutil"
"sync"
2018-04-11 14:41:50 +01:00
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/errs2"
"storj.io/storj/internal/readcloser"
"storj.io/storj/pkg/encryption"
2018-04-11 14:41:50 +01:00
"storj.io/storj/pkg/ranger"
)
type decodedReader struct {
2019-07-31 15:38:44 +01:00
log *zap.Logger
ctx context.Context
cancel context.CancelFunc
readers map[int]io.ReadCloser
scheme ErasureScheme
stripeReader *StripeReader
outbuf []byte
err error
currentStripe int64
expectedStripes int64
close sync.Once
closeErr error
}
2018-04-11 14:41:50 +01:00
// DecodeReaders takes a map of readers and an ErasureScheme returning a
// combined Reader.
//
// rs is a map of erasure piece numbers to erasure piece streams.
// expectedSize is the number of bytes expected to be returned by the Reader.
// mbm is the maximum memory (in bytes) to be allocated for read buffers. If
// set to 0, the minimum possible memory will be used.
// if forceErrorDetection is set to true then k+1 pieces will be always
// required for decoding, so corrupted pieces can be detected.
2019-07-31 15:38:44 +01:00
func DecodeReaders(ctx context.Context, log *zap.Logger, rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser {
defer mon.Task()(&ctx)(nil)
if expectedSize < 0 {
return readcloser.FatalReadCloser(Error.New("negative expected size"))
}
if expectedSize%int64(es.StripeSize()) != 0 {
return readcloser.FatalReadCloser(
Error.New("expected size (%d) not a factor decoded block size (%d)",
expectedSize, es.StripeSize()))
}
if err := checkMBM(mbm); err != nil {
return readcloser.FatalReadCloser(err)
}
2018-04-11 14:41:50 +01:00
dr := &decodedReader{
2019-07-31 15:38:44 +01:00
log: log,
readers: rs,
scheme: es,
2019-07-31 15:38:44 +01:00
stripeReader: NewStripeReader(log, rs, es, mbm, forceErrorDetection),
outbuf: make([]byte, 0, es.StripeSize()),
expectedStripes: expectedSize / int64(es.StripeSize()),
}
dr.ctx, dr.cancel = context.WithCancel(ctx)
// Kick off a goroutine to watch for context cancelation.
go func() {
<-dr.ctx.Done()
_ = dr.Close()
}()
2018-04-11 14:41:50 +01:00
return dr
}
func (dr *decodedReader) Read(p []byte) (n int, err error) {
ctx := dr.ctx
defer mon.Task()(&ctx)(&err)
if len(dr.outbuf) == 0 {
2018-04-11 14:41:50 +01:00
// if the output buffer is empty, let's fill it again
// if we've already had an error, fail
if dr.err != nil {
return 0, dr.err
2018-04-11 14:41:50 +01:00
}
// return EOF is the expected stripes were read
if dr.currentStripe >= dr.expectedStripes {
dr.err = io.EOF
return 0, dr.err
2018-04-11 14:41:50 +01:00
}
// read the input buffers of the next stripe - may also decode it
dr.outbuf, dr.err = dr.stripeReader.ReadStripe(ctx, dr.currentStripe, dr.outbuf)
if dr.err != nil {
return 0, dr.err
}
dr.currentStripe++
2018-04-11 14:41:50 +01:00
}
// copy what data we have to the output
n = copy(p, dr.outbuf)
// slide the remaining bytes to the beginning
copy(dr.outbuf, dr.outbuf[n:])
// shrink the remaining buffer
dr.outbuf = dr.outbuf[:len(dr.outbuf)-n]
return n, nil
}
func (dr *decodedReader) Close() (err error) {
ctx := dr.ctx
defer mon.Task()(&ctx)(&err)
// cancel the context to terminate reader goroutines
dr.cancel()
errorThreshold := len(dr.readers) - dr.scheme.RequiredCount()
var closeGroup errs2.Group
// avoid double close of readers
dr.close.Do(func() {
for _, r := range dr.readers {
closeGroup.Go(r.Close)
}
// close the stripe reader
closeGroup.Go(dr.stripeReader.Close)
allErrors := closeGroup.Wait()
errorThreshold -= len(allErrors)
dr.closeErr = errs.Combine(allErrors...)
})
// TODO this is workaround, we need reorganize to return multiple errors or divide into fatal, non fatal
2019-05-15 13:49:13 +01:00
if errorThreshold < 0 {
return dr.closeErr
}
if dr.closeErr != nil {
2019-07-31 15:38:44 +01:00
dr.log.Debug("decode close non fatal error: ", zap.Error(dr.closeErr))
}
return nil
}
2018-04-11 14:41:50 +01:00
type decodedRanger struct {
2019-07-31 15:38:44 +01:00
log *zap.Logger
es ErasureScheme
rrs map[int]ranger.Ranger
inSize int64
mbm int // max buffer memory
forceErrorDetection bool
2018-04-11 14:41:50 +01:00
}
// Decode takes a map of Rangers and an ErasureScheme and returns a combined
// Ranger.
//
// rrs is a map of erasure piece numbers to erasure piece rangers.
// mbm is the maximum memory (in bytes) to be allocated for read buffers. If
// set to 0, the minimum possible memory will be used.
// if forceErrorDetection is set to true then k+1 pieces will be always
// required for decoding, so corrupted pieces can be detected.
2019-07-31 15:38:44 +01:00
func Decode(log *zap.Logger, rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error) {
if err := checkMBM(mbm); err != nil {
return nil, err
}
if len(rrs) < es.RequiredCount() {
return nil, Error.New("not enough readers to reconstruct data!")
}
2018-04-11 14:41:50 +01:00
size := int64(-1)
for _, rr := range rrs {
if size == -1 {
size = rr.Size()
} else if size != rr.Size() {
return nil, Error.New(
"decode failure: range reader sizes don't all match")
2018-04-11 14:41:50 +01:00
}
}
if size == -1 {
return ranger.ByteRanger(nil), nil
2018-04-11 14:41:50 +01:00
}
if size%int64(es.ErasureShareSize()) != 0 {
return nil, Error.New("invalid erasure decoder and range reader combo. "+
"range reader size (%d) must be a multiple of erasure encoder block size (%d)",
size, es.ErasureShareSize())
2018-04-11 14:41:50 +01:00
}
return &decodedRanger{
2019-07-31 15:38:44 +01:00
log: log,
es: es,
rrs: rrs,
inSize: size,
mbm: mbm,
forceErrorDetection: forceErrorDetection,
2018-04-11 14:41:50 +01:00
}, nil
}
func (dr *decodedRanger) Size() int64 {
blocks := dr.inSize / int64(dr.es.ErasureShareSize())
return blocks * int64(dr.es.StripeSize())
2018-04-11 14:41:50 +01:00
}
func (dr *decodedRanger) Range(ctx context.Context, offset, length int64) (_ io.ReadCloser, err error) {
defer mon.Task()(&ctx)(&err)
2018-04-11 14:41:50 +01:00
// offset and length might not be block-aligned. figure out which
// blocks contain this request
firstBlock, blockCount := encryption.CalcEncompassingBlocks(offset, length, dr.es.StripeSize())
2018-04-11 14:41:50 +01:00
// go ask for ranges for all those block boundaries
// do it parallel to save from network latency
readers := make(map[int]io.ReadCloser, len(dr.rrs))
2018-04-26 18:51:28 +01:00
type indexReadCloser struct {
i int
r io.ReadCloser
err error
2018-04-26 18:51:28 +01:00
}
result := make(chan indexReadCloser, len(dr.rrs))
2018-04-11 14:41:50 +01:00
for i, rr := range dr.rrs {
go func(i int, rr ranger.Ranger) {
r, err := rr.Range(ctx,
firstBlock*int64(dr.es.ErasureShareSize()),
blockCount*int64(dr.es.ErasureShareSize()))
result <- indexReadCloser{i: i, r: r, err: err}
}(i, rr)
}
// wait for all goroutines to finish and save result in readers map
for range dr.rrs {
res := <-result
if res.err != nil {
readers[res.i] = readcloser.FatalReadCloser(res.err)
} else {
readers[res.i] = res.r
}
2018-04-11 14:41:50 +01:00
}
// decode from all those ranges
2019-07-31 15:38:44 +01:00
r := DecodeReaders(ctx, dr.log, readers, dr.es, blockCount*int64(dr.es.StripeSize()), dr.mbm, dr.forceErrorDetection)
2018-04-11 14:41:50 +01:00
// offset might start a few bytes in, potentially discard the initial bytes
2019-07-31 15:38:44 +01:00
_, err = io.CopyN(ioutil.Discard, r, offset-firstBlock*int64(dr.es.StripeSize()))
2018-04-11 14:41:50 +01:00
if err != nil {
return nil, Error.Wrap(err)
2018-04-11 14:41:50 +01:00
}
// length might not have included all of the blocks, limit what we return
return readcloser.LimitReadCloser(r, length), nil
2018-04-11 14:41:50 +01:00
}
func checkMBM(mbm int) error {
if mbm < 0 {
return Error.New("negative max buffer memory")
}
return nil
}