Fix data race in eestream (#377)
This commit is contained in:
parent
4a176915ef
commit
bbcac47d2f
@ -201,19 +201,25 @@ func (er *encodedReader) fillBuffer() {
|
||||
// targeted reader buffer
|
||||
func (er *encodedReader) copyData(num int, copier <-chan block) {
|
||||
// close the respective buffer channel when this goroutine exits
|
||||
defer func() {
|
||||
if er.eps[num].ch != nil {
|
||||
close(er.eps[num].ch)
|
||||
}
|
||||
}()
|
||||
defer er.closeReaderChannel(num)
|
||||
// process the channel until closed
|
||||
for b := range copier {
|
||||
er.addToReader(b)
|
||||
}
|
||||
}
|
||||
|
||||
func (er *encodedReader) closeReaderChannel(num int) {
|
||||
// use mutex to avoid data race with checkSlowChannel
|
||||
er.mux.Lock()
|
||||
defer er.mux.Unlock()
|
||||
if !er.eps[num].closed {
|
||||
er.eps[num].closed = true
|
||||
close(er.eps[num].ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (er *encodedReader) addToReader(b block) {
|
||||
if er.eps[b.i].ch == nil {
|
||||
if er.eps[b.i].closed {
|
||||
// this channel is already closed for slowness - skip it
|
||||
return
|
||||
}
|
||||
@ -222,6 +228,7 @@ func (er *encodedReader) addToReader(b block) {
|
||||
timer := time.NewTimer(50 * time.Millisecond)
|
||||
defer timer.Stop()
|
||||
// add the encoded data to the respective reader buffer channel
|
||||
|
||||
select {
|
||||
case er.eps[b.i].ch <- b:
|
||||
return
|
||||
@ -241,7 +248,7 @@ func (er *encodedReader) checkSlowChannel(num int) (closed bool) {
|
||||
// check how many buffer channels are already empty
|
||||
ec := 0
|
||||
for i := range er.eps {
|
||||
if er.eps[i].ch != nil && len(er.eps[i].ch) == 0 {
|
||||
if !er.eps[i].closed && len(er.eps[i].ch) == 0 {
|
||||
ec++
|
||||
}
|
||||
}
|
||||
@ -250,8 +257,8 @@ func (er *encodedReader) checkSlowChannel(num int) (closed bool) {
|
||||
// canceled
|
||||
closed = ec >= er.rs.MinimumThreshold()
|
||||
if closed {
|
||||
er.eps[num].closed = true
|
||||
close(er.eps[num].ch)
|
||||
er.eps[num].ch = nil
|
||||
er.eps[num].cancel()
|
||||
}
|
||||
return closed
|
||||
@ -278,6 +285,7 @@ type encodedPiece struct {
|
||||
cancel context.CancelFunc
|
||||
er *encodedReader
|
||||
ch chan block
|
||||
closed bool
|
||||
outbuf []byte
|
||||
err error
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user