From 8bd3cafd4b0d3051cea1ed666881ddb0ad1d79a6 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Sat, 27 Feb 2021 21:41:43 +0000 Subject: [PATCH] Added loss testing TestNewReno_Congestion/TwoWay/SequenceLoss fails. This is due debugging. --- udp/congestion/newreno.go | 18 ++-- udp/congestion/newreno_test.go | 154 ++++++++++++++++++++++++++++++--- 2 files changed, 152 insertions(+), 20 deletions(-) diff --git a/udp/congestion/newreno.go b/udp/congestion/newreno.go index c1e3448..5694331 100644 --- a/udp/congestion/newreno.go +++ b/udp/congestion/newreno.go @@ -178,8 +178,11 @@ func (c *NewReno) checkNack() { sort.Sort(c.awaitingAck) - rtt := time.Duration(c.rttNanos * RttLossDelay) - if c.awaitingAck[0].time.Before(time.Now().Add(-rtt)) { + fmt.Printf("got here: seq to consider %d, %dms ago\n", c.awaitingAck[0].sequence, time.Now().Sub(c.awaitingAck[0].time).Milliseconds()) + + lossThreshold := time.Duration(c.rttNanos * RttLossDelay) + fmt.Printf("threshold time: %dms\n", lossThreshold.Milliseconds()) + if c.awaitingAck[0].time.Before(time.Now().Add(-lossThreshold)) { // if the next packet sequence to ack was received more than an rttlossdelay ago // mark the packet(s) blocking it as missing with a nack // then update ack from the delayed packet @@ -188,20 +191,21 @@ func (c *NewReno) checkNack() { } } -func (c *NewReno) updateAck(a uint32) { - var i int - var e flightInfo +func (c *NewReno) updateAck(start uint32) { + a := start - for i, e = range c.awaitingAck { + var removed int + for _, e := range c.awaitingAck { if e.sequence == a+1 { a = e.sequence + removed++ } else { break } } c.ack = a - c.awaitingAck = c.awaitingAck[i:] + c.awaitingAck = c.awaitingAck[removed:] } func (c *NewReno) receivedNack(nack uint32) { diff --git a/udp/congestion/newreno_test.go b/udp/congestion/newreno_test.go index 2595c68..e3f80a9 100644 --- a/udp/congestion/newreno_test.go +++ b/udp/congestion/newreno_test.go @@ -38,21 +38,40 @@ func newNewRenoTest(rtt time.Duration) *newRenoTest { } func (n *newRenoTest) Start(ctx context.Context) { + type packetWithTime struct { + t time.Time + p congestionPacket + } + go func() { + aOutboundDelayed := make(chan packetWithTime, 128) + bOutboundDelayed := make(chan packetWithTime, 128) + + delayer := func(tp chan packetWithTime, cp chan congestionPacket) { + for { + select { + case p := <-tp: + s := p.t.Add(n.halfRtt).Sub(time.Now()) + fmt.Printf("delayed packet for %dms\n", s.Milliseconds()) + time.Sleep(s) + cp <- p.p + case <-ctx.Done(): + return + } + } + } + + go delayer(aOutboundDelayed, n.bInbound) + go delayer(bOutboundDelayed, n.aInbound) + for { select { case <-ctx.Done(): return case p := <-n.aOutbound: - // deliver the packet at least half an halfRtt in the future (non-deterministic) - time.AfterFunc(n.halfRtt, func() { - n.bInbound <- p - }) + aOutboundDelayed <- packetWithTime{t: time.Now(), p: p} case p := <-n.bOutbound: - // deliver the packet at least half an halfRtt in the future (non-deterministic) - time.AfterFunc(n.halfRtt, func() { - n.aInbound <- p - }) + bOutboundDelayed <- packetWithTime{t: time.Now(), p: p} } } }() @@ -161,13 +180,55 @@ func TestNewReno_Congestion(t *testing.T) { assert.Equal(t, uint32(0), c.sideB.nack) assert.Equal(t, uint32(numPackets), c.sideB.ack) }) + + t.Run("SequenceLoss", func(t *testing.T) { + // ASSIGN + rtt := 80 * time.Millisecond + numPackets := 50 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := newNewRenoTest(rtt) + c.Start(ctx) + c.RunSideA(ctx) + c.RunSideB(ctx) + + // ACT + for i := 0; i < numPackets; i++ { + // sleep to simulate preparing packet + time.Sleep(1 * time.Millisecond) + seq := c.sideA.Sequence() + + if seq == 20 { + // Simulate packet loss of sequence 20 + continue + } + + c.aOutbound <- congestionPacket{ + seq: seq, + nack: c.sideA.NextNack(), + ack: c.sideA.NextAck(), + } + } + + time.Sleep(rtt + 30*time.Millisecond) + + // ASSERT + + assert.Equal(t, uint32(0), c.sideA.nack) + assert.Equal(t, uint32(0), c.sideA.ack) + + assert.Equal(t, uint32(20), c.sideB.nack) + assert.Equal(t, uint32(numPackets), c.sideB.ack) + }) }) t.Run("TwoWay", func(t *testing.T) { - t.Run("PredictsRtt", func(t *testing.T) { + t.Run("Lossless", func(t *testing.T) { // ASSIGN - rtt := 160 * time.Millisecond - numPackets := 100 + rtt := 80 * time.Millisecond + numPackets := 50 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -222,9 +283,76 @@ func TestNewReno_Congestion(t *testing.T) { assert.Equal(t, uint32(0), c.sideB.nack) assert.Equal(t, uint32(numPackets), c.sideB.ack) + }) - assert.InDelta(t, float64(rtt.Nanoseconds()), c.sideA.rttNanos, float64(8*time.Millisecond.Nanoseconds())) - assert.InDelta(t, float64(rtt.Nanoseconds()), c.sideB.rttNanos, float64(8*time.Millisecond.Nanoseconds())) + t.Run("SequenceLoss", func(t *testing.T) { + // ASSIGN + rtt := 80 * time.Millisecond + numPackets := 100 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := newNewRenoTest(rtt) + c.Start(ctx) + c.RunSideA(ctx) + c.RunSideB(ctx) + + // ACT + done := make(chan struct{}) + + go func() { + for i := 0; i < numPackets; i++ { + time.Sleep(1 * time.Millisecond) + seq := c.sideA.Sequence() + + if seq == 9 { + // Simulate packet loss of sequence 9 + continue + } + + c.aOutbound <- congestionPacket{ + seq: seq, + nack: c.sideA.NextNack(), + ack: c.sideA.NextAck(), + } + } + + done <- struct{}{} + }() + + go func() { + for i := 0; i < numPackets; i++ { + time.Sleep(1 * time.Millisecond) + seq := c.sideB.Sequence() + + if seq == 13 { + // Simulate packet loss of sequence 13 + continue + } + + c.bOutbound <- congestionPacket{ + seq: seq, + nack: c.sideB.NextNack(), + ack: c.sideB.NextAck(), + } + } + + done <- struct{}{} + }() + + <-done + <-done + + time.Sleep(rtt + 30*time.Millisecond) + + // ASSERT + + assert.Equal(t, uint32(13), c.sideA.nack) + assert.Equal(t, uint32(numPackets), c.sideA.ack) + + assert.Equal(t, uint32(9), c.sideB.nack) + assert.Equal(t, uint32(numPackets), c.sideB.ack) }) }) }