diff --git a/pkg/eestream/decode.go b/pkg/eestream/decode.go index 3e9c21d98..0c2a60a43 100644 --- a/pkg/eestream/decode.go +++ b/pkg/eestream/decode.go @@ -71,6 +71,9 @@ func DecodeReaders(ctx context.Context, rs map[int]io.ReadCloser, es ErasureSche } func (dr *decodedReader) Read(p []byte) (n int, err error) { + ctx := dr.ctx + defer mon.Task()(&ctx)(&err) + if len(dr.outbuf) == 0 { // if the output buffer is empty, let's fill it again // if we've already had an error, fail @@ -83,7 +86,7 @@ func (dr *decodedReader) Read(p []byte) (n int, err error) { return 0, dr.err } // read the input buffers of the next stripe - may also decode it - dr.outbuf, dr.err = dr.stripeReader.ReadStripe(context.TODO(), dr.currentStripe, dr.outbuf) + dr.outbuf, dr.err = dr.stripeReader.ReadStripe(ctx, dr.currentStripe, dr.outbuf) if dr.err != nil { return 0, dr.err } @@ -99,7 +102,9 @@ func (dr *decodedReader) Read(p []byte) (n int, err error) { return n, nil } -func (dr *decodedReader) Close() error { +func (dr *decodedReader) Close() (err error) { + ctx := dr.ctx + defer mon.Task()(&ctx)(&err) // cancel the context to terminate reader goroutines dr.cancel() errorThreshold := len(dr.readers) - dr.scheme.RequiredCount() diff --git a/pkg/eestream/encode.go b/pkg/eestream/encode.go index 066aca197..909f298a9 100644 --- a/pkg/eestream/encode.go +++ b/pkg/eestream/encode.go @@ -119,6 +119,7 @@ func (rs *RedundancyStrategy) OptimalThreshold() int { } type encodedReader struct { + ctx context.Context rs RedundancyStrategy pieces map[int]*encodedPiece } @@ -129,6 +130,7 @@ func EncodeReader(ctx context.Context, r io.Reader, rs RedundancyStrategy) (_ [] defer mon.Task()(&ctx)(&err) er := &encodedReader{ + ctx: ctx, rs: rs, pieces: make(map[int]*encodedPiece, rs.TotalCount()), } @@ -189,6 +191,8 @@ type encodedPiece struct { } func (ep *encodedPiece) Read(p []byte) (n int, err error) { + ctx := ep.er.ctx + defer mon.Task()(&ctx)(&err) if ep.err != nil { return 0, ep.err } @@ -218,7 +222,9 @@ func (ep *encodedPiece) Read(p []byte) (n int, err error) { return n, nil } -func (ep *encodedPiece) Close() error { +func (ep *encodedPiece) Close() (err error) { + ctx := ep.er.ctx + defer mon.Task()(&ctx)(&err) return ep.pipeReader.Close() }