initial newreno rewrite
Some checks reported errors
continuous-integration/drone/push Build was killed

This commit is contained in:
Jake Hillion 2021-02-22 19:08:10 +00:00
parent 87c0b57502
commit a222db615e
3 changed files with 186 additions and 124 deletions

View File

@ -2,9 +2,8 @@ package congestion
import (
"fmt"
"log"
"math"
"mpbl3p/utils"
"sort"
"sync"
"sync/atomic"
"time"
@ -14,45 +13,53 @@ const RttExponentialFactor = 0.1
const RttLossDelay = 1.5
type NewReno struct {
sequence chan uint32
keepalive chan bool
sequence chan uint32
outboundTimes, inboundTimes map[uint32]time.Time
outboundTimesLock sync.Mutex
inboundTimesLock sync.RWMutex
inFlight []flightInfo
lastSent time.Time
inFlightMu sync.Mutex
ack, lastAck uint32
nack, lastNack uint32
awaitingAck sortableFlights
ack, nack uint32
lastAck, lastNack uint32
ackNackMu sync.Mutex
slowStart bool
rtt float64
windowSize int32
windowCount int32
inFlight int32
rttNanos float64
windowSize, windowCount uint32
slowStart bool
windowNotifier chan struct{}
}
ackNotifier chan struct{}
type flightInfo struct {
time time.Time
sequence uint32
}
lastSent time.Time
hasAcked bool
type sortableFlights []flightInfo
acksToSend utils.Uint32Heap
acksToSendLock sync.Mutex
func (f sortableFlights) Len() int {
return len(f)
}
func (f sortableFlights) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func (f sortableFlights) Less(i, j int) bool {
return f[i].sequence < f[j].sequence
}
func (c *NewReno) String() string {
return fmt.Sprintf("{NewReno %t %d %d %d %d}", c.slowStart, c.windowSize, c.inFlight, c.lastAck, c.lastNack)
return fmt.Sprintf("{NewReno %t %d %d %d %d}", c.slowStart, c.windowSize, len(c.inFlight), c.lastAck, c.lastNack)
}
func NewNewReno() *NewReno {
c := NewReno{
sequence: make(chan uint32),
ackNotifier: make(chan struct{}),
outboundTimes: make(map[uint32]time.Time),
inboundTimes: make(map[uint32]time.Time),
sequence: make(chan uint32),
windowNotifier: make(chan struct{}),
windowSize: 1,
rtt: (10 * time.Millisecond).Seconds(),
rttNanos: float64((10 * time.Millisecond).Nanoseconds()),
slowStart: true,
}
@ -73,47 +80,55 @@ func NewNewReno() *NewReno {
}
func (c *NewReno) ReceivedPacket(seq, nack, ack uint32) {
c.receivedSequence(seq)
c.receivedNack(nack)
c.receivedAck(ack)
if seq != 0 {
c.receivedSequence(seq)
}
if nack != 0 {
c.receivedNack(nack)
}
if ack != 0 {
c.receivedAck(ack)
}
}
func (c *NewReno) Sequence() uint32 {
c.outboundTimesLock.Lock()
defer c.outboundTimesLock.Unlock()
for c.inFlight >= c.windowSize {
<-c.ackNotifier
for len(c.inFlight) >= int(c.windowSize) {
<-c.windowNotifier
}
atomic.AddInt32(&c.inFlight, 1)
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
s := <-c.sequence
t := time.Now()
n := time.Now()
c.lastSent = n
c.outboundTimes[s] = n
c.inFlight = append(c.inFlight, flightInfo{
time: t,
sequence: s,
})
c.lastSent = t
return s
}
func (c *NewReno) NextAck() uint32 {
a := c.ack
c.lastAck = a
atomic.StoreUint32(&c.lastAck, a)
return a
}
func (c *NewReno) NextNack() uint32 {
n := c.nack
c.lastNack = n
atomic.StoreUint32(&c.lastNack, n)
return n
}
func (c *NewReno) AwaitEarlyUpdate(keepalive time.Duration) uint32 {
for {
rtt := time.Duration(math.Round(c.rtt) * float64(time.Second))
time.Sleep(rtt)
rtt := time.Duration(math.Round(c.rttNanos))
time.Sleep(rtt / 2)
c.updateAckNack()
c.checkNack()
// CASE 1: waiting ACKs or NACKs and no message sent in the last RTT
if ((c.lastAck != c.ack) || (c.lastNack != c.nack)) && time.Now().After(c.lastSent.Add(rtt)) {
@ -128,100 +143,139 @@ func (c *NewReno) AwaitEarlyUpdate(keepalive time.Duration) uint32 {
}
func (c *NewReno) receivedSequence(seq uint32) {
c.inboundTimes[seq] = time.Now()
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
c.acksToSendLock.Lock()
c.acksToSend.Insert(seq)
c.acksToSendLock.Unlock()
c.updateAckNack()
}
// It is assumed that ReceivedAck will only be called by one thread
func (c *NewReno) receivedAck(ack uint32) {
c.outboundTimesLock.Lock()
defer c.outboundTimesLock.Unlock()
log.Printf("ack received for %d", ack)
c.hasAcked = true
// RTT
// Update using an exponential average
rtt := time.Now().Sub(c.outboundTimes[ack]).Seconds()
delete(c.outboundTimes, ack)
c.rtt = c.rtt*(1-RttExponentialFactor) + rtt*RttExponentialFactor
// Free Window
// TODO: Check for freshness
// TODO: Don't expect an ACK per packet
atomic.AddInt32(&c.inFlight, -1)
select {
case c.ackNotifier <- struct{}{}:
default:
if !(c.ack != seq-1 && c.nack != seq-1) {
c.awaitingAck = append(c.awaitingAck, flightInfo{
time: time.Now(),
sequence: seq,
})
return // if this seq doesn't change the ack field, awaitingAck will be unchanged
}
// GROW
// CASE: exponential. increase window size by one per ack
// CASE: standard. increase window size by one per window of acks
if c.slowStart {
atomic.AddInt32(&c.windowSize, 1)
} else {
c.windowCount++
if c.windowCount == c.windowSize {
c.windowCount = 0
atomic.AddInt32(&c.windowSize, 1)
c.ack = seq
sort.Sort(c.awaitingAck)
a := c.ack
var i int
var e flightInfo
for i, e = range c.awaitingAck {
if a+1 == e.sequence {
a += 1
} else {
break
}
}
c.ack = a
c.awaitingAck = c.awaitingAck[i:]
}
func (c *NewReno) checkNack() {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
if len(c.awaitingAck) == 0 {
return
}
sort.Sort(c.awaitingAck)
rtt := time.Duration(c.rttNanos * RttLossDelay)
if !c.awaitingAck[0].time.Before(time.Now().Add(-rtt)) {
return
}
c.nack = c.awaitingAck[0].sequence - 1
a := c.ack
var i int
var e flightInfo
for i, e = range c.awaitingAck {
if a+1 == e.sequence {
a += 1
} else {
break
}
}
c.ack = a
c.awaitingAck = c.awaitingAck[i:]
}
// It is assumed that ReceivedNack will only be called by one thread
func (c *NewReno) receivedNack(nack uint32) {
log.Printf("nack received for %d", nack)
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
// TODO : Check for freshness
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
// End slow start
// as both ack and nack are cumulative, inflight will always be ordered by seq
var i int
for i < len(c.inFlight) && c.inFlight[i].sequence <= nack {
i++
}
c.inFlight = c.inFlight[i-1:]
c.slowStart = false
if s := c.windowSize; s > 1 {
atomic.StoreInt32(&c.windowSize, s/2)
}
}
func (c *NewReno) updateAckNack() {
c.acksToSendLock.Lock()
defer c.acksToSendLock.Unlock()
c.inboundTimesLock.Lock()
defer c.inboundTimesLock.Unlock()
findAck := func(start uint32) uint32 {
ack := start
for len(c.acksToSend) > 0 {
if a, _ := c.acksToSend.Peek(); a == ack+1 {
ack, _ = c.acksToSend.Extract()
delete(c.inboundTimes, ack)
} else {
break
}
}
return ack
if i == 0 {
return
}
ack := findAck(c.ack)
if ack == c.ack {
// check if there is a nack to send
// decide this based on whether there have been 3RTTs between the offset packet
if len(c.acksToSend) > 0 {
nextAck, _ := c.acksToSend.Peek()
if time.Now().Sub(c.inboundTimes[nextAck]).Seconds() > c.rtt*3 {
atomic.StoreUint32(&c.nack, nextAck-1)
ack, _ = c.acksToSend.Extract()
ack = findAck(ack)
}
for i > 0 {
s := c.windowSize
if s > 1 && atomic.CompareAndSwapUint32(&c.windowSize, s, s/2) {
break
}
}
atomic.StoreUint32(&c.ack, ack)
select {
case c.windowNotifier <- struct{}{}:
default:
}
}
// It is assumed that ReceivedAck will only be called by one thread
func (c *NewReno) receivedAck(ack uint32) {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
// as both ack and nack are cumulative, inflight will always be ordered by seq
var i int
for i < len(c.inFlight) && c.inFlight[i].sequence <= ack {
rtt := float64(time.Now().Sub(c.inFlight[i].time).Nanoseconds())
c.rttNanos = c.rttNanos*(1-RttExponentialFactor) + rtt*RttExponentialFactor
i++
}
if i == 0 {
return
}
c.inFlight = c.inFlight[i-1:]
if c.slowStart {
c.windowSize += uint32(i)
} else {
c.windowCount += uint32(i)
if c.windowCount > c.windowSize {
c.windowCount -= uint32(i)
atomic.AddUint32(&c.windowSize, 1)
}
}
select {
case c.windowNotifier <- struct{}{}:
default:
}
}

View File

@ -3,6 +3,8 @@ package congestion
import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"sort"
"testing"
"time"
)
@ -149,3 +151,11 @@ func TestNewReno_Congestion(t *testing.T) {
})
})
}
func TestSortableFlights_Less(t *testing.T) {
a := []flightInfo{{sequence: 0}, {sequence: 6}, {sequence: 3}, {sequence: 2}}
sort.Sort(sortableFlights(a))
assert.Equal(t, []flightInfo{{sequence: 0}, {sequence: 2}, {sequence: 3}, {sequence: 6}}, a)
}

View File

@ -1,7 +1,6 @@
package udp
import (
"errors"
"fmt"
"log"
"mpbl3p/proxy"
@ -250,8 +249,7 @@ func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error {
}
func (f *Flow) earlyUpdateLoop(g proxy.MacGenerator, keepalive time.Duration) {
var err error
for !errors.Is(err, shared.ErrDeadConnection) {
for f.isAlive {
seq := f.congestion.AwaitEarlyUpdate(keepalive)
p := Packet{
ack: f.congestion.NextAck(),