Jake Hillion
9a19ecd0d4
All checks were successful
continuous-integration/drone/push Build is passing
370 lines
7.1 KiB
Go
370 lines
7.1 KiB
Go
package congestion
|
|
|
|
import (
|
|
"context"
|
|
"github.com/stretchr/testify/assert"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
type congestionPacket struct {
|
|
seq, nack, ack uint32
|
|
}
|
|
|
|
type newRenoTest struct {
|
|
sideA, sideB *NewReno
|
|
|
|
aOutbound, bOutbound chan congestionPacket
|
|
aInbound, bInbound chan congestionPacket
|
|
|
|
halfRtt time.Duration
|
|
}
|
|
|
|
func newNewRenoTest(rtt time.Duration) *newRenoTest {
|
|
return &newRenoTest{
|
|
sideA: NewNewReno(),
|
|
sideB: NewNewReno(),
|
|
|
|
aOutbound: make(chan congestionPacket),
|
|
bOutbound: make(chan congestionPacket),
|
|
|
|
aInbound: make(chan congestionPacket),
|
|
bInbound: make(chan congestionPacket),
|
|
|
|
halfRtt: rtt / 2,
|
|
}
|
|
}
|
|
|
|
func (n *newRenoTest) Start(ctx context.Context) {
|
|
type packetWithTime struct {
|
|
t time.Time
|
|
p congestionPacket
|
|
}
|
|
|
|
go func() {
|
|
aOutboundDelayed := make(chan packetWithTime, 128)
|
|
bOutboundDelayed := make(chan packetWithTime, 128)
|
|
|
|
delayer := func(tp chan packetWithTime, cp chan congestionPacket) {
|
|
for {
|
|
select {
|
|
case p := <-tp:
|
|
s := p.t.Add(n.halfRtt).Sub(time.Now())
|
|
time.Sleep(s)
|
|
cp <- p.p
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
go delayer(aOutboundDelayed, n.bInbound)
|
|
go delayer(bOutboundDelayed, n.aInbound)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case p := <-n.aOutbound:
|
|
aOutboundDelayed <- packetWithTime{t: time.Now(), p: p}
|
|
case p := <-n.bOutbound:
|
|
bOutboundDelayed <- packetWithTime{t: time.Now(), p: p}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (n *newRenoTest) RunSideA(ctx context.Context) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case p := <-n.aInbound:
|
|
n.sideA.ReceivedPacket(p.seq, p.nack, p.ack)
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
seq, err := n.sideA.AwaitEarlyUpdate(ctx, 500*time.Millisecond)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if seq != 0 {
|
|
// skip keepalive
|
|
// required to ensure AwaitEarlyUpdate terminates
|
|
continue
|
|
}
|
|
p := congestionPacket{
|
|
seq: seq,
|
|
nack: n.sideA.NextNack(),
|
|
ack: n.sideA.NextAck(),
|
|
}
|
|
n.aOutbound <- p
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (n *newRenoTest) RunSideB(ctx context.Context) {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case p := <-n.bInbound:
|
|
n.sideB.ReceivedPacket(p.seq, p.nack, p.ack)
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
seq, err := n.sideB.AwaitEarlyUpdate(ctx, 500*time.Millisecond)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if seq != 0 {
|
|
// skip keepalive
|
|
// required to ensure AwaitEarlyUpdate terminates
|
|
continue
|
|
}
|
|
p := congestionPacket{
|
|
seq: seq,
|
|
nack: n.sideB.NextNack(),
|
|
ack: n.sideB.NextAck(),
|
|
}
|
|
n.bOutbound <- p
|
|
}
|
|
}()
|
|
}
|
|
|
|
func TestNewReno_Congestion(t *testing.T) {
|
|
t.Run("OneWay", func(t *testing.T) {
|
|
t.Run("Lossless", func(t *testing.T) {
|
|
// ASSIGN
|
|
rtt := 80 * time.Millisecond
|
|
numPackets := 50
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
c := newNewRenoTest(rtt)
|
|
c.Start(ctx)
|
|
c.RunSideA(ctx)
|
|
c.RunSideB(ctx)
|
|
|
|
// ACT
|
|
for i := 0; i < numPackets; i++ {
|
|
// sleep to simulate preparing packet
|
|
time.Sleep(1 * time.Millisecond)
|
|
seq, _ := c.sideA.Sequence(ctx)
|
|
|
|
c.aOutbound <- congestionPacket{
|
|
seq: seq,
|
|
nack: c.sideA.NextNack(),
|
|
ack: c.sideA.NextAck(),
|
|
}
|
|
}
|
|
|
|
// allow the systems to catch up before asserting
|
|
time.Sleep(rtt + 30*time.Millisecond)
|
|
|
|
// ASSERT
|
|
|
|
assert.Equal(t, uint32(0), c.sideA.nack)
|
|
assert.Equal(t, uint32(0), c.sideA.ack)
|
|
|
|
assert.Equal(t, uint32(0), c.sideB.nack)
|
|
assert.Equal(t, uint32(numPackets), c.sideB.ack)
|
|
})
|
|
|
|
t.Run("SequenceLoss", func(t *testing.T) {
|
|
// ASSIGN
|
|
rtt := 80 * time.Millisecond
|
|
numPackets := 50
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
c := newNewRenoTest(rtt)
|
|
c.Start(ctx)
|
|
c.RunSideA(ctx)
|
|
c.RunSideB(ctx)
|
|
|
|
// ACT
|
|
for i := 0; i < numPackets; i++ {
|
|
// sleep to simulate preparing packet
|
|
time.Sleep(1 * time.Millisecond)
|
|
seq, _ := c.sideA.Sequence(ctx)
|
|
|
|
if seq == 20 {
|
|
// Simulate packet loss of sequence 20
|
|
continue
|
|
}
|
|
|
|
c.aOutbound <- congestionPacket{
|
|
seq: seq,
|
|
nack: c.sideA.NextNack(),
|
|
ack: c.sideA.NextAck(),
|
|
}
|
|
}
|
|
|
|
time.Sleep(rtt + 30*time.Millisecond)
|
|
|
|
// ASSERT
|
|
|
|
assert.Equal(t, uint32(0), c.sideA.nack)
|
|
assert.Equal(t, uint32(0), c.sideA.ack)
|
|
|
|
assert.Equal(t, uint32(20), c.sideB.nack)
|
|
assert.Equal(t, uint32(numPackets), c.sideB.ack)
|
|
})
|
|
})
|
|
|
|
t.Run("TwoWay", func(t *testing.T) {
|
|
t.Run("Lossless", func(t *testing.T) {
|
|
// ASSIGN
|
|
rtt := 80 * time.Millisecond
|
|
numPackets := 50
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
c := newNewRenoTest(rtt)
|
|
c.Start(ctx)
|
|
c.RunSideA(ctx)
|
|
c.RunSideB(ctx)
|
|
|
|
// ACT
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
for i := 0; i < numPackets; i++ {
|
|
time.Sleep(1 * time.Millisecond)
|
|
seq, _ := c.sideA.Sequence(ctx)
|
|
|
|
c.aOutbound <- congestionPacket{
|
|
seq: seq,
|
|
nack: c.sideA.NextNack(),
|
|
ack: c.sideA.NextAck(),
|
|
}
|
|
}
|
|
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
go func() {
|
|
for i := 0; i < numPackets; i++ {
|
|
time.Sleep(1 * time.Millisecond)
|
|
seq, _ := c.sideB.Sequence(ctx)
|
|
|
|
c.bOutbound <- congestionPacket{
|
|
seq: seq,
|
|
nack: c.sideB.NextNack(),
|
|
ack: c.sideB.NextAck(),
|
|
}
|
|
}
|
|
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
<-done
|
|
<-done
|
|
|
|
time.Sleep(rtt + 30*time.Millisecond)
|
|
|
|
// ASSERT
|
|
|
|
assert.Equal(t, uint32(0), c.sideA.nack)
|
|
assert.Equal(t, uint32(numPackets), c.sideA.ack)
|
|
|
|
assert.Equal(t, uint32(0), c.sideB.nack)
|
|
assert.Equal(t, uint32(numPackets), c.sideB.ack)
|
|
})
|
|
|
|
t.Run("SequenceLoss", func(t *testing.T) {
|
|
// ASSIGN
|
|
rtt := 80 * time.Millisecond
|
|
numPackets := 100
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
c := newNewRenoTest(rtt)
|
|
c.Start(ctx)
|
|
c.RunSideA(ctx)
|
|
c.RunSideB(ctx)
|
|
|
|
// ACT
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
for i := 0; i < numPackets; i++ {
|
|
time.Sleep(1 * time.Millisecond)
|
|
seq, _ := c.sideA.Sequence(ctx)
|
|
|
|
if seq == 9 {
|
|
// Simulate packet loss of sequence 9
|
|
continue
|
|
}
|
|
|
|
c.aOutbound <- congestionPacket{
|
|
seq: seq,
|
|
nack: c.sideA.NextNack(),
|
|
ack: c.sideA.NextAck(),
|
|
}
|
|
}
|
|
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
go func() {
|
|
for i := 0; i < numPackets; i++ {
|
|
time.Sleep(1 * time.Millisecond)
|
|
seq, _ := c.sideB.Sequence(ctx)
|
|
|
|
if seq == 13 {
|
|
// Simulate packet loss of sequence 13
|
|
continue
|
|
}
|
|
|
|
c.bOutbound <- congestionPacket{
|
|
seq: seq,
|
|
nack: c.sideB.NextNack(),
|
|
ack: c.sideB.NextAck(),
|
|
}
|
|
}
|
|
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
<-done
|
|
<-done
|
|
|
|
time.Sleep(rtt + 30*time.Millisecond)
|
|
|
|
// ASSERT
|
|
|
|
assert.Equal(t, uint32(13), c.sideA.nack)
|
|
assert.Equal(t, uint32(numPackets), c.sideA.ack)
|
|
|
|
assert.Equal(t, uint32(9), c.sideB.nack)
|
|
assert.Equal(t, uint32(numPackets), c.sideB.ack)
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestSortableFlights_Less(t *testing.T) {
|
|
// ASSIGN
|
|
a := []flightInfo{{sequence: 0}, {sequence: 6}, {sequence: 3}, {sequence: 2}}
|
|
|
|
// ACT
|
|
sort.Sort(sortableFlights(a))
|
|
|
|
// ASSERT
|
|
assert.Equal(t, []flightInfo{{sequence: 0}, {sequence: 2}, {sequence: 3}, {sequence: 6}}, a)
|
|
}
|