From bbcac47d2fcf685ee6414ffc1f4cdaf0bde11257 Mon Sep 17 00:00:00 2001 From: Kaloyan Raev Date: Tue, 25 Sep 2018 14:39:14 +0300 Subject: [PATCH] Fix data race in eestream (#377) --- pkg/eestream/encode.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/eestream/encode.go b/pkg/eestream/encode.go index a9705c3a5..a9ff66a07 100644 --- a/pkg/eestream/encode.go +++ b/pkg/eestream/encode.go @@ -201,19 +201,25 @@ func (er *encodedReader) fillBuffer() { // targeted reader buffer func (er *encodedReader) copyData(num int, copier <-chan block) { // close the respective buffer channel when this goroutine exits - defer func() { - if er.eps[num].ch != nil { - close(er.eps[num].ch) - } - }() + defer er.closeReaderChannel(num) // process the channel until closed for b := range copier { er.addToReader(b) } } +func (er *encodedReader) closeReaderChannel(num int) { + // use mutex to avoid data race with checkSlowChannel + er.mux.Lock() + defer er.mux.Unlock() + if !er.eps[num].closed { + er.eps[num].closed = true + close(er.eps[num].ch) + } +} + func (er *encodedReader) addToReader(b block) { - if er.eps[b.i].ch == nil { + if er.eps[b.i].closed { // this channel is already closed for slowness - skip it return } @@ -222,6 +228,7 @@ func (er *encodedReader) addToReader(b block) { timer := time.NewTimer(50 * time.Millisecond) defer timer.Stop() // add the encoded data to the respective reader buffer channel + select { case er.eps[b.i].ch <- b: return @@ -241,7 +248,7 @@ func (er *encodedReader) checkSlowChannel(num int) (closed bool) { // check how many buffer channels are already empty ec := 0 for i := range er.eps { - if er.eps[i].ch != nil && len(er.eps[i].ch) == 0 { + if !er.eps[i].closed && len(er.eps[i].ch) == 0 { ec++ } } @@ -250,8 +257,8 @@ func (er *encodedReader) checkSlowChannel(num int) (closed bool) { // canceled closed = ec >= er.rs.MinimumThreshold() if closed { + er.eps[num].closed = true close(er.eps[num].ch) - er.eps[num].ch = nil er.eps[num].cancel() } return closed @@ -278,6 +285,7 @@ type encodedPiece struct { cancel context.CancelFunc er *encodedReader ch chan block + closed bool outbuf []byte err error }