Reduce network latency by initializing Rangers in parallel

This commit is contained in:
Kaloyan Raev 2018-04-26 15:51:49 +03:00 committed by JT Olds
parent 69239e9e17
commit acce2bfc08
2 changed files with 39 additions and 7 deletions

View File

@ -33,6 +33,12 @@ func main() {
}
}
type indexRangerError struct {
i int
rr ranger.Ranger
err error
}
func Main() error {
encKey := sha256.Sum256([]byte(*key))
fc, err := infectious.NewFEC(*rsk, *rsn)
@ -46,13 +52,24 @@ func Main() error {
if err != nil {
return err
}
// initialize http rangers in parallel to save from network latency
rrs := map[int]ranger.Ranger{}
for i := 0; i < 40; i++ {
url := fmt.Sprintf("http://localhost:%d", 10000+i)
rrs[i], err = ranger.HTTPRanger(url)
if err != nil {
result := make(chan indexRangerError, *rsn)
for i := 0; i < *rsn; i++ {
go func(i int) {
url := fmt.Sprintf("http://localhost:%d", 10000+i)
rr, err := ranger.HTTPRanger(url)
result <- indexRangerError{i, rr, err}
}(i)
}
// wait for all goroutines to finish and save result in rrs map
for i := 0; i < *rsn; i++ {
res := <-result
if res.err != nil {
// return on the first failure
return err
}
rrs[res.i] = res.rr
}
rr, err := eestream.Decode(rrs, es)
if err != nil {

View File

@ -144,6 +144,11 @@ func (dr *decodedRanger) Size() int64 {
return blocks * int64(dr.es.DecodedBlockSize())
}
type indexReadCloser struct {
i int
r io.ReadCloser
}
func (dr *decodedRanger) Range(offset, length int64) io.ReadCloser {
// offset and length might not be block-aligned. figure out which
// blocks contain this request
@ -151,11 +156,21 @@ func (dr *decodedRanger) Range(offset, length int64) io.ReadCloser {
offset, length, dr.es.DecodedBlockSize())
// go ask for ranges for all those block boundaries
// do it parallel to save from network latency
readers := make(map[int]io.ReadCloser, len(dr.rrs))
result := make(chan indexReadCloser, len(dr.rrs))
for i, rr := range dr.rrs {
readers[i] = rr.Range(
firstBlock*int64(dr.es.EncodedBlockSize()),
blockCount*int64(dr.es.EncodedBlockSize()))
go func(i int, rr ranger.Ranger) {
r := rr.Range(
firstBlock*int64(dr.es.EncodedBlockSize()),
blockCount*int64(dr.es.EncodedBlockSize()))
result <- indexReadCloser{i, r}
}(i, rr)
}
// wait for all goroutines to finish and save result in readers map
for range dr.rrs {
res := <-result
readers[res.i] = res.r
}
// decode from all those ranges
r := DecodeReaders(readers, dr.es)