diff --git a/examples/eestream/serve-collected/main.go b/examples/eestream/serve-collected/main.go index c3cd6ea24..f2a9fe878 100644 --- a/examples/eestream/serve-collected/main.go +++ b/examples/eestream/serve-collected/main.go @@ -33,6 +33,12 @@ func main() { } } +type indexRangerError struct { + i int + rr ranger.Ranger + err error +} + func Main() error { encKey := sha256.Sum256([]byte(*key)) fc, err := infectious.NewFEC(*rsk, *rsn) @@ -46,13 +52,24 @@ func Main() error { if err != nil { return err } + // initialize http rangers in parallel to save from network latency rrs := map[int]ranger.Ranger{} - for i := 0; i < 40; i++ { - url := fmt.Sprintf("http://localhost:%d", 10000+i) - rrs[i], err = ranger.HTTPRanger(url) - if err != nil { + result := make(chan indexRangerError, *rsn) + for i := 0; i < *rsn; i++ { + go func(i int) { + url := fmt.Sprintf("http://localhost:%d", 10000+i) + rr, err := ranger.HTTPRanger(url) + result <- indexRangerError{i, rr, err} + }(i) + } + // wait for all goroutines to finish and save result in rrs map + for i := 0; i < *rsn; i++ { + res := <-result + if res.err != nil { + // return on the first failure return err } + rrs[res.i] = res.rr } rr, err := eestream.Decode(rrs, es) if err != nil { diff --git a/pkg/eestream/decode.go b/pkg/eestream/decode.go index 1dea94c98..d42a3a21c 100644 --- a/pkg/eestream/decode.go +++ b/pkg/eestream/decode.go @@ -144,6 +144,11 @@ func (dr *decodedRanger) Size() int64 { return blocks * int64(dr.es.DecodedBlockSize()) } +type indexReadCloser struct { + i int + r io.ReadCloser +} + func (dr *decodedRanger) Range(offset, length int64) io.ReadCloser { // offset and length might not be block-aligned. figure out which // blocks contain this request @@ -151,11 +156,21 @@ func (dr *decodedRanger) Range(offset, length int64) io.ReadCloser { offset, length, dr.es.DecodedBlockSize()) // go ask for ranges for all those block boundaries + // do it parallel to save from network latency readers := make(map[int]io.ReadCloser, len(dr.rrs)) + result := make(chan indexReadCloser, len(dr.rrs)) for i, rr := range dr.rrs { - readers[i] = rr.Range( - firstBlock*int64(dr.es.EncodedBlockSize()), - blockCount*int64(dr.es.EncodedBlockSize())) + go func(i int, rr ranger.Ranger) { + r := rr.Range( + firstBlock*int64(dr.es.EncodedBlockSize()), + blockCount*int64(dr.es.EncodedBlockSize())) + result <- indexReadCloser{i, r} + }(i, rr) + } + // wait for all goroutines to finish and save result in readers map + for range dr.rrs { + res := <-result + readers[res.i] = res.r } // decode from all those ranges r := DecodeReaders(readers, dr.es)