pkg/eestream: plumb contexts through (#2187)

This commit is contained in:
JT Olio 2019-06-26 07:05:58 -06:00 committed by Egon Elbre
parent 7b66e0cd7c
commit 3925e84580
2 changed files with 14 additions and 3 deletions

View File

@ -71,6 +71,9 @@ func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser, es ErasureSche
}
func (dr *decodedReader) Read(p []byte) (n int, err error) {
ctx := dr.ctx
defer mon.Task()(&ctx)(&err)
if len(dr.outbuf) == 0 {
// if the output buffer is empty, let's fill it again
// if we've already had an error, fail
@ -83,7 +86,7 @@ func (dr *decodedReader) Read(p []byte) (n int, err error) {
return 0, dr.err
}
// read the input buffers of the next stripe - may also decode it
dr.outbuf, dr.err = dr.stripeReader.ReadStripe(context.TODO(), dr.currentStripe, dr.outbuf)
dr.outbuf, dr.err = dr.stripeReader.ReadStripe(ctx, dr.currentStripe, dr.outbuf)
if dr.err != nil {
return 0, dr.err
}
@ -99,7 +102,9 @@ func (dr *decodedReader) Read(p []byte) (n int, err error) {
return n, nil
}
func (dr *decodedReader) Close() error {
func (dr *decodedReader) Close() (err error) {
ctx := dr.ctx
defer mon.Task()(&ctx)(&err)
// cancel the context to terminate reader goroutines
dr.cancel()
errorThreshold := len(dr.readers) - dr.scheme.RequiredCount()

View File

@ -119,6 +119,7 @@ func (rs *RedundancyStrategy) OptimalThreshold() int {
}
type encodedReader struct {
ctx context.Context
rs RedundancyStrategy
pieces map[int]*encodedPiece
}
@ -129,6 +130,7 @@ func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy) (_ []
defer mon.Task()(&ctx)(&err)
er := &encodedReader{
ctx: ctx,
rs: rs,
pieces: make(map[int]*encodedPiece, rs.TotalCount()),
}
@ -189,6 +191,8 @@ type encodedPiece struct {
}
func (ep *encodedPiece) Read(p []byte) (n int, err error) {
ctx := ep.er.ctx
defer mon.Task()(&ctx)(&err)
if ep.err != nil {
return 0, ep.err
}
@ -218,7 +222,9 @@ func (ep *encodedPiece) Read(p []byte) (n int, err error) {
return n, nil
}
func (ep *encodedPiece) Close() error {
func (ep *encodedPiece) Close() (err error) {
ctx := ep.er.ctx
defer mon.Task()(&ctx)(&err)
return ep.pipeReader.Close()
}