dissertation-2-code/udp/congestion/newreno.go
Jake Hillion 4f35c9e721
Some checks failed
continuous-integration/drone/push Build is failing
refactored updateAck
2021-02-25 11:05:59 +00:00

269 lines
5.0 KiB
Go

package congestion
import (
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
"time"
)
const RttExponentialFactor = 0.1
const RttLossDelay = 1.5
type NewReno struct {
sequence chan uint32
inFlight []flightInfo
lastSent time.Time
inFlightMu sync.Mutex
awaitingAck sortableFlights
ack, nack uint32
lastAck, lastNack uint32
ackNackMu sync.Mutex
rttNanos float64
windowSize, windowCount uint32
slowStart bool
windowNotifier chan struct{}
}
type flightInfo struct {
time time.Time
sequence uint32
}
type sortableFlights []flightInfo
func (f sortableFlights) Len() int {
return len(f)
}
func (f sortableFlights) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func (f sortableFlights) Less(i, j int) bool {
return f[i].sequence < f[j].sequence
}
func (c *NewReno) String() string {
return fmt.Sprintf("{NewReno %t %d %d %d %d}", c.slowStart, c.windowSize, len(c.inFlight), c.lastAck, c.lastNack)
}
func NewNewReno() *NewReno {
c := NewReno{
sequence: make(chan uint32),
windowNotifier: make(chan struct{}),
windowSize: 1,
rttNanos: float64((10 * time.Millisecond).Nanoseconds()),
slowStart: true,
}
go func() {
var s uint32
for {
if s == 0 {
s++
continue
}
c.sequence <- s
s++
}
}()
return &c
}
func (c *NewReno) ReceivedPacket(seq, nack, ack uint32) {
if seq != 0 {
c.receivedSequence(seq)
}
if nack != 0 {
c.receivedNack(nack)
}
if ack != 0 {
c.receivedAck(ack)
}
}
func (c *NewReno) Sequence() uint32 {
for len(c.inFlight) >= int(c.windowSize) {
<-c.windowNotifier
}
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
s := <-c.sequence
t := time.Now()
c.inFlight = append(c.inFlight, flightInfo{
time: t,
sequence: s,
})
c.lastSent = t
return s
}
func (c *NewReno) NextAck() uint32 {
a := c.ack
atomic.StoreUint32(&c.lastAck, a)
return a
}
func (c *NewReno) NextNack() uint32 {
n := c.nack
atomic.StoreUint32(&c.lastNack, n)
return n
}
func (c *NewReno) AwaitEarlyUpdate(keepalive time.Duration) uint32 {
for {
rtt := time.Duration(math.Round(c.rttNanos))
time.Sleep(rtt / 2)
c.checkNack()
// CASE 1: waiting ACKs or NACKs and no message sent in the last half-RTT
// this targets arrival in 0.5+0.5 ± 0.5 RTTs (1±0.5 RTTs)
if ((c.lastAck != c.ack) || (c.lastNack != c.nack)) && time.Now().After(c.lastSent.Add(rtt/2)) {
return 0 // no ack needed
}
// CASE 2: No message sent within the keepalive time
if keepalive != 0 && time.Now().After(c.lastSent.Add(keepalive)) {
return c.Sequence() // require an ack
}
}
}
func (c *NewReno) receivedSequence(seq uint32) {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
if seq != c.ack+1 && seq != c.nack+1 {
if seq > c.ack && seq > c.nack {
c.awaitingAck = append(c.awaitingAck, flightInfo{
time: time.Now(),
sequence: seq,
})
} // else discard as it's already been cumulatively ACKed/NACKed
return // if this seq doesn't change the ack field, awaitingAck will be unchanged
}
c.updateAck(seq)
}
func (c *NewReno) checkNack() {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
if len(c.awaitingAck) == 0 {
return
}
sort.Sort(c.awaitingAck)
rtt := time.Duration(c.rttNanos * RttLossDelay)
if !c.awaitingAck[0].time.Before(time.Now().Add(-rtt)) {
return
}
c.nack = c.awaitingAck[0].sequence - 1
c.updateAck(c.nack)
}
func (c *NewReno) updateAck(a uint32) {
var i int
var e flightInfo
for i, e = range c.awaitingAck {
if a+1 == e.sequence {
a += 1
} else {
break
}
}
c.ack = a
c.awaitingAck = c.awaitingAck[i:]
}
// It is assumed that ReceivedNack will only be called by one thread
func (c *NewReno) receivedNack(nack uint32) {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
// as both ack and nack are cumulative, inflight will always be ordered by seq
var i int
for i < len(c.inFlight) && c.inFlight[i].sequence <= nack {
i++
}
c.slowStart = false
if i == 0 {
return
}
c.inFlight = c.inFlight[i-1:]
for i > 0 {
s := c.windowSize
if s > 1 && atomic.CompareAndSwapUint32(&c.windowSize, s, s/2) {
break
}
}
select {
case c.windowNotifier <- struct{}{}:
default:
}
}
// It is assumed that ReceivedAck will only be called by one thread
func (c *NewReno) receivedAck(ack uint32) {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
// as both ack and nack are cumulative, inflight will always be ordered by seq
var i int
for i < len(c.inFlight) && c.inFlight[i].sequence <= ack {
rtt := float64(time.Now().Sub(c.inFlight[i].time).Nanoseconds())
c.rttNanos = c.rttNanos*(1-RttExponentialFactor) + rtt*RttExponentialFactor
i++
}
if i == 0 {
return
}
c.inFlight = c.inFlight[i-1:]
if c.slowStart {
c.windowSize += uint32(i)
} else {
c.windowCount += uint32(i)
if c.windowCount > c.windowSize {
c.windowCount -= uint32(i)
atomic.AddUint32(&c.windowSize, 1)
}
}
select {
case c.windowNotifier <- struct{}{}:
default:
}
}