Added loss testing
Some checks failed
continuous-integration/drone/push Build is failing

TestNewReno_Congestion/TwoWay/SequenceLoss fails.
This is due debugging.
This commit is contained in:
Jake Hillion 2021-02-27 21:41:43 +00:00
parent c1f7c325c6
commit 8bd3cafd4b
2 changed files with 152 additions and 20 deletions

View File

@ -178,8 +178,11 @@ func (c *NewReno) checkNack() {
sort.Sort(c.awaitingAck)
rtt := time.Duration(c.rttNanos * RttLossDelay)
if c.awaitingAck[0].time.Before(time.Now().Add(-rtt)) {
fmt.Printf("got here: seq to consider %d, %dms ago\n", c.awaitingAck[0].sequence, time.Now().Sub(c.awaitingAck[0].time).Milliseconds())
lossThreshold := time.Duration(c.rttNanos * RttLossDelay)
fmt.Printf("threshold time: %dms\n", lossThreshold.Milliseconds())
if c.awaitingAck[0].time.Before(time.Now().Add(-lossThreshold)) {
// if the next packet sequence to ack was received more than an rttlossdelay ago
// mark the packet(s) blocking it as missing with a nack
// then update ack from the delayed packet
@ -188,20 +191,21 @@ func (c *NewReno) checkNack() {
}
}
func (c *NewReno) updateAck(a uint32) {
var i int
var e flightInfo
func (c *NewReno) updateAck(start uint32) {
a := start
for i, e = range c.awaitingAck {
var removed int
for _, e := range c.awaitingAck {
if e.sequence == a+1 {
a = e.sequence
removed++
} else {
break
}
}
c.ack = a
c.awaitingAck = c.awaitingAck[i:]
c.awaitingAck = c.awaitingAck[removed:]
}
func (c *NewReno) receivedNack(nack uint32) {

View File

@ -38,21 +38,40 @@ func newNewRenoTest(rtt time.Duration) *newRenoTest {
}
func (n *newRenoTest) Start(ctx context.Context) {
type packetWithTime struct {
t time.Time
p congestionPacket
}
go func() {
aOutboundDelayed := make(chan packetWithTime, 128)
bOutboundDelayed := make(chan packetWithTime, 128)
delayer := func(tp chan packetWithTime, cp chan congestionPacket) {
for {
select {
case p := <-tp:
s := p.t.Add(n.halfRtt).Sub(time.Now())
fmt.Printf("delayed packet for %dms\n", s.Milliseconds())
time.Sleep(s)
cp <- p.p
case <-ctx.Done():
return
}
}
}
go delayer(aOutboundDelayed, n.bInbound)
go delayer(bOutboundDelayed, n.aInbound)
for {
select {
case <-ctx.Done():
return
case p := <-n.aOutbound:
// deliver the packet at least half an halfRtt in the future (non-deterministic)
time.AfterFunc(n.halfRtt, func() {
n.bInbound <- p
})
aOutboundDelayed <- packetWithTime{t: time.Now(), p: p}
case p := <-n.bOutbound:
// deliver the packet at least half an halfRtt in the future (non-deterministic)
time.AfterFunc(n.halfRtt, func() {
n.aInbound <- p
})
bOutboundDelayed <- packetWithTime{t: time.Now(), p: p}
}
}
}()
@ -161,13 +180,55 @@ func TestNewReno_Congestion(t *testing.T) {
assert.Equal(t, uint32(0), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
t.Run("SequenceLoss", func(t *testing.T) {
// ASSIGN
rtt := 80 * time.Millisecond
numPackets := 50
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newNewRenoTest(rtt)
c.Start(ctx)
c.RunSideA(ctx)
c.RunSideB(ctx)
// ACT
for i := 0; i < numPackets; i++ {
// sleep to simulate preparing packet
time.Sleep(1 * time.Millisecond)
seq := c.sideA.Sequence()
if seq == 20 {
// Simulate packet loss of sequence 20
continue
}
c.aOutbound <- congestionPacket{
seq: seq,
nack: c.sideA.NextNack(),
ack: c.sideA.NextAck(),
}
}
time.Sleep(rtt + 30*time.Millisecond)
// ASSERT
assert.Equal(t, uint32(0), c.sideA.nack)
assert.Equal(t, uint32(0), c.sideA.ack)
assert.Equal(t, uint32(20), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
})
t.Run("TwoWay", func(t *testing.T) {
t.Run("PredictsRtt", func(t *testing.T) {
t.Run("Lossless", func(t *testing.T) {
// ASSIGN
rtt := 160 * time.Millisecond
numPackets := 100
rtt := 80 * time.Millisecond
numPackets := 50
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -222,9 +283,76 @@ func TestNewReno_Congestion(t *testing.T) {
assert.Equal(t, uint32(0), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
assert.InDelta(t, float64(rtt.Nanoseconds()), c.sideA.rttNanos, float64(8*time.Millisecond.Nanoseconds()))
assert.InDelta(t, float64(rtt.Nanoseconds()), c.sideB.rttNanos, float64(8*time.Millisecond.Nanoseconds()))
t.Run("SequenceLoss", func(t *testing.T) {
// ASSIGN
rtt := 80 * time.Millisecond
numPackets := 100
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newNewRenoTest(rtt)
c.Start(ctx)
c.RunSideA(ctx)
c.RunSideB(ctx)
// ACT
done := make(chan struct{})
go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(1 * time.Millisecond)
seq := c.sideA.Sequence()
if seq == 9 {
// Simulate packet loss of sequence 9
continue
}
c.aOutbound <- congestionPacket{
seq: seq,
nack: c.sideA.NextNack(),
ack: c.sideA.NextAck(),
}
}
done <- struct{}{}
}()
go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(1 * time.Millisecond)
seq := c.sideB.Sequence()
if seq == 13 {
// Simulate packet loss of sequence 13
continue
}
c.bOutbound <- congestionPacket{
seq: seq,
nack: c.sideB.NextNack(),
ack: c.sideB.NextAck(),
}
}
done <- struct{}{}
}()
<-done
<-done
time.Sleep(rtt + 30*time.Millisecond)
// ASSERT
assert.Equal(t, uint32(13), c.sideA.nack)
assert.Equal(t, uint32(numPackets), c.sideA.ack)
assert.Equal(t, uint32(9), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
})
}