dissertation-2-code/udp/congestion/newreno.go
Jake Hillion d65e8d3571
Some checks failed
continuous-integration/drone/push Build is failing
udp testing
2020-11-26 22:10:37 +00:00

177 lines
3.4 KiB
Go

package congestion
import (
"math"
"mpbl3p/utils"
"sync/atomic"
"time"
)
const RttExponentialFactor = 0.1
type NewReno struct {
sequence chan uint32
keepalive chan bool
outboundTimes map[uint32]time.Time
inboundTimes map[uint32]time.Time
ack, lastAck uint32
nack, lastNack uint32
slowStart bool
rtt float64
windowSize int32
windowCount int32
inFlight int32
ackNotifier chan struct{}
lastSent time.Time
acksToSend utils.Uint32Heap
}
func NewNewReno() *NewReno {
c := NewReno{
sequence: make(chan uint32),
ackNotifier: make(chan struct{}),
outboundTimes: make(map[uint32]time.Time),
inboundTimes: make(map[uint32]time.Time),
windowSize: 1,
rtt: (1 * time.Millisecond).Seconds(),
slowStart: true,
}
go func() {
var s uint32
for {
if s == 0 {
continue
}
c.sequence <- s
s++
}
}()
return &c
}
// It is assumed that ReceivedAck will only be called by one thread
func (c *NewReno) ReceivedAck(ack uint32) {
// RTT
// Update using an exponential average
rtt := time.Now().Sub(c.outboundTimes[ack]).Seconds()
delete(c.outboundTimes, ack)
c.rtt = c.rtt*(1-RttExponentialFactor) + rtt*RttExponentialFactor
// Free Window
atomic.AddInt32(&c.inFlight, -1)
select {
case c.ackNotifier <- struct{}{}:
default:
}
// GROW
// CASE: exponential. increase window size by one per ack
// CASE: standard. increase window size by one per window of acks
if c.slowStart {
atomic.AddInt32(&c.windowSize, 1)
} else {
c.windowCount++
if c.windowCount == c.windowSize {
c.windowCount = 0
atomic.AddInt32(&c.windowSize, 1)
}
}
}
// It is assumed that ReceivedNack will only be called by one thread
func (c *NewReno) ReceivedNack(nack uint32) {
// End slow start
c.slowStart = false
if s := c.windowSize; s > 1 {
atomic.StoreInt32(&c.windowSize, s/2)
}
}
func (c *NewReno) ReceivedPacket(seq uint32) {
c.inboundTimes[seq] = time.Now()
c.acksToSend.Insert(seq)
findAck := func(start uint32) uint32 {
ack := start
for len(c.acksToSend) > 0 {
if a, _ := c.acksToSend.Peek(); a == ack+1 {
ack, _ = c.acksToSend.Extract()
} else {
break
}
}
return ack
}
ack := findAck(c.ack)
if ack == c.ack {
// check if there is a nack to send
// decide this based on whether there have been 3RTTs between the offset packet
if len(c.acksToSend) > 0 {
nextAck, _ := c.acksToSend.Peek()
if time.Now().Sub(c.inboundTimes[nextAck]).Seconds() > c.rtt*3 {
atomic.StoreUint32(&c.nack, nextAck-1)
ack, _ = c.acksToSend.Extract()
ack = findAck(ack)
}
}
}
atomic.StoreUint32(&c.ack, ack)
}
func (c *NewReno) Sequence() uint32 {
for c.inFlight >= c.windowSize {
<-c.ackNotifier
}
atomic.AddInt32(&c.inFlight, 1)
s := <-c.sequence
n := time.Now()
c.lastSent = n
c.outboundTimes[s] = n
return s
}
func (c *NewReno) NextAck() uint32 {
a := c.ack
c.lastAck = a
return a
}
func (c *NewReno) NextNack() uint32 {
n := c.nack
c.lastNack = n
return n
}
func (c *NewReno) AwaitEarlyUpdate(keepalive time.Duration) {
for {
rtt := time.Duration(math.Round(c.rtt * float64(time.Second)))
time.Sleep(rtt)
// CASE 1: > 5 waiting ACKs or any waiting NACKs and no message sent in the last RTT
if (c.lastAck-c.ack) > 5 || (c.lastNack != c.nack) && time.Now().After(c.lastSent.Add(rtt)) {
return
}
// CASE 3: No message sent within the keepalive time
if keepalive != 0 && time.Now().After(c.lastSent.Add(keepalive)) {
return
}
}
}