merge develop into master #21

Merged
JakeHillion merged 149 commits from develop into master 2021-05-12 00:22:59 +01:00
13 changed files with 611 additions and 315 deletions
Showing only changes of commit 69eb4d4810 - Show all commits

View File

@ -5,14 +5,14 @@ name: default
steps:
- name: format
image: golang:1.15
image: golang:1.16
commands:
- bash -c "gofmt -l . | wc -l | cmp -s <(echo 0) || (gofmt -l . && exit 1)"
- name: install
image: golang:1.15
image: golang:1.16
environment:
GOPROXY: http://10.20.0.25:3142|direct
GOPROXY: http://containers.internal.hillion.co.uk:3142,direct
volumes:
- name: cache
path: /go
@ -20,7 +20,7 @@ steps:
- go test -i ./...
- name: test
image: golang:1.15
image: golang:1.16
volumes:
- name: cache
path: /go
@ -28,7 +28,7 @@ steps:
- go test ./...
- name: build (debian)
image: golang:1.15-buster
image: golang:1.16-buster
when:
event:
- push
@ -36,8 +36,9 @@ steps:
- name: cache
path: /go
commands:
- GOOS=linux GOARCH=amd64 go build -o linux_amd64
- GOOS=linux GOARCH=arm GOARM=7 go build -o linux_arm_v7
- GOOS=linux GOARCH=amd64 go build -o linux_amd64
- GOOS=linux GOARCH=arm GOARM=7 go build -o linux_arm_v7
- GOOS=freebsd GOARCH=arm64 go build -o freebsd_arm64_v8a
- name: upload
image: minio/mc
@ -49,9 +50,10 @@ steps:
SECRET_KEY:
from_secret: s3_secret_key
commands:
- mc alias set s3 http://10.20.0.25:3900 $${ACCESS_KEY} $${SECRET_KEY}
- mc alias set s3 https://s3.us-west-001.backblazeb2.com $${ACCESS_KEY} $${SECRET_KEY}
- mc cp linux_amd64 s3/dissertation/binaries/debian/${DRONE_BRANCH}_linux_amd64
- mc cp linux_arm_v7 s3/dissertation/binaries/debian/${DRONE_BRANCH}_linux_arm_v7
- mc cp freebsd_arm64_v8a s3/dissertation/binaries/debian/${DRONE_BRANCH}_freebsd_arm64_v8a
volumes:
- name: cache
@ -59,6 +61,6 @@ volumes:
---
kind: signature
hmac: a7c498332fbf43f422a68475c53daa0a65b7801004f09300e40275007dec9bee
hmac: de07a3ab113028b48c590c406f7ab8f74aeae49679287862168d912ec10e9920
...

View File

@ -70,11 +70,15 @@ func (c Configuration) Build() (*proxy.Proxy, error) {
}
func buildTcp(p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() proxy.MacVerifier) error {
var laddr func() string
if peer.LocalPort == 0 {
laddr = func() string { return fmt.Sprintf("%s:", peer.GetLocalHost()) }
} else {
laddr = func() string { return fmt.Sprintf("%s:%d", peer.GetLocalHost(), peer.LocalPort) }
}
if peer.RemoteHost != "" {
f, err := tcp.InitiateFlow(
func() string { return fmt.Sprintf("%s:", peer.GetLocalHost()) },
fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort),
)
f, err := tcp.InitiateFlow(laddr, fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort))
if err != nil {
return err
@ -86,7 +90,7 @@ func buildTcp(p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() p
return nil
}
err := tcp.NewListener(p, fmt.Sprintf("%s:%d", peer.GetLocalHost(), peer.LocalPort), v, g)
err := tcp.NewListener(p, laddr(), v, g)
if err != nil {
return err
}
@ -95,6 +99,13 @@ func buildTcp(p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() p
}
func buildUdp(p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() proxy.MacVerifier) error {
var laddr func() string
if peer.LocalPort == 0 {
laddr = func() string { return fmt.Sprintf("%s:", peer.GetLocalHost()) }
} else {
laddr = func() string { return fmt.Sprintf("%s:%d", peer.GetLocalHost(), peer.LocalPort) }
}
var c func() udp.Congestion
switch peer.Congestion {
case "None":
@ -107,7 +118,7 @@ func buildUdp(p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() p
if peer.RemoteHost != "" {
f, err := udp.InitiateFlow(
func() string { return fmt.Sprintf("%s:", peer.GetLocalHost()) },
laddr,
fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort),
v(),
g(),
@ -125,7 +136,7 @@ func buildUdp(p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() p
return nil
}
err := udp.NewListener(p, fmt.Sprintf("%s:%d", peer.GetLocalHost(), peer.LocalPort), v, g, c)
err := udp.NewListener(p, laddr(), v, g, c)
if err != nil {
return err
}

2
go.mod
View File

@ -3,8 +3,8 @@ module mpbl3p
go 1.15
require (
github.com/JakeHillion/taptun v0.0.0-20210320133200-cf0ef75b1bff
github.com/go-playground/validator/v10 v10.4.1
github.com/pkg/taptun v0.0.0-20160424131934-bbbd335672ab
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.4.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9

6
go.sum
View File

@ -1,3 +1,7 @@
github.com/JakeHillion/taptun v0.0.0-20160424131934-bbbd335672ab h1:/UB98lLA11PJZOqhdzqeITMTMz6eiTjti9Z9kYq1SWQ=
github.com/JakeHillion/taptun v0.0.0-20160424131934-bbbd335672ab/go.mod h1:8WBFCKR7ZdT+WVtgyiSPJf6gqXiNZUvfglN8vwkoyBE=
github.com/JakeHillion/taptun v0.0.0-20210320133200-cf0ef75b1bff h1:O+wiKpOHS2BidwDz6ZuR3dVQNsrD55raE9mY4yub6Wc=
github.com/JakeHillion/taptun v0.0.0-20210320133200-cf0ef75b1bff/go.mod h1:8WBFCKR7ZdT+WVtgyiSPJf6gqXiNZUvfglN8vwkoyBE=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
@ -14,8 +18,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/pkg/taptun v0.0.0-20160424131934-bbbd335672ab h1:dAXDRtXYxj4sTR5WeRuTFJGH18QMT6AUpUgRwedI6es=
github.com/pkg/taptun v0.0.0-20160424131934-bbbd335672ab/go.mod h1:N5a/Ll2ZNk5wjiLNW9LIiNtO9RNYcaYmcXSYKMYrlDg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=

View File

@ -1,7 +1,7 @@
package tun
import (
"github.com/pkg/taptun"
"github.com/JakeHillion/taptun"
"io"
"log"
"mpbl3p/proxy"

View File

@ -4,14 +4,10 @@ import "time"
type Congestion interface {
Sequence() uint32
ReceivedPacket(seq uint32)
ReceivedAck(uint32)
NextAck() uint32
ReceivedNack(uint32)
NextNack() uint32
ReceivedPacket(seq, nack, ack uint32)
AwaitEarlyUpdate(keepalive time.Duration) uint32
Reset()
}

View File

@ -2,56 +2,64 @@ package congestion
import (
"fmt"
"log"
"math"
"mpbl3p/utils"
"sort"
"sync"
"sync/atomic"
"time"
)
const RttExponentialFactor = 0.1
const RttLossDelay = 1.5
type NewReno struct {
sequence chan uint32
keepalive chan bool
sequence chan uint32
outboundTimes, inboundTimes map[uint32]time.Time
outboundTimesLock sync.Mutex
inboundTimesLock sync.RWMutex
inFlight []flightInfo
lastSent time.Time
inFlightMu sync.Mutex
ack, lastAck uint32
nack, lastNack uint32
awaitingAck sortableFlights
ack, nack uint32
lastAck, lastNack uint32
ackNackMu sync.Mutex
slowStart bool
rtt float64
windowSize int32
windowCount int32
inFlight int32
rttNanos float64
windowSize, windowCount uint32
slowStart bool
windowNotifier chan struct{}
}
ackNotifier chan struct{}
type flightInfo struct {
time time.Time
sequence uint32
}
lastSent time.Time
hasAcked bool
type sortableFlights []flightInfo
acksToSend utils.Uint32Heap
acksToSendLock sync.Mutex
func (f sortableFlights) Len() int {
return len(f)
}
func (f sortableFlights) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
func (f sortableFlights) Less(i, j int) bool {
return f[i].sequence < f[j].sequence
}
func (c *NewReno) String() string {
return fmt.Sprintf("{NewReno %t %d %d %d %d}", c.slowStart, c.windowSize, c.inFlight, c.lastAck, c.lastNack)
return fmt.Sprintf("{NewReno %t %d %d %d %d}", c.slowStart, c.windowSize, len(c.inFlight), c.lastAck, c.lastNack)
}
func NewNewReno() *NewReno {
c := NewReno{
sequence: make(chan uint32),
ackNotifier: make(chan struct{}),
sequence: make(chan uint32),
windowNotifier: make(chan struct{}),
outboundTimes: make(map[uint32]time.Time),
inboundTimes: make(map[uint32]time.Time),
windowSize: 8,
rtt: (1 * time.Millisecond).Seconds(),
windowSize: 1,
rttNanos: float64((10 * time.Millisecond).Nanoseconds()),
slowStart: true,
}
@ -71,161 +79,199 @@ func NewNewReno() *NewReno {
return &c
}
func (c *NewReno) Reset() {
c.outboundTimes = make(map[uint32]time.Time)
c.inboundTimes = make(map[uint32]time.Time)
c.windowSize = 8
c.rtt = (1 * time.Millisecond).Seconds()
c.slowStart = true
c.hasAcked = false
}
// It is assumed that ReceivedAck will only be called by one thread
func (c *NewReno) ReceivedAck(ack uint32) {
c.outboundTimesLock.Lock()
defer c.outboundTimesLock.Unlock()
log.Printf("ack received for %d", ack)
c.hasAcked = true
// RTT
// Update using an exponential average
rtt := time.Now().Sub(c.outboundTimes[ack]).Seconds()
delete(c.outboundTimes, ack)
c.rtt = c.rtt*(1-RttExponentialFactor) + rtt*RttExponentialFactor
// Free Window
// TODO: Check for freshness
// TODO: Don't expect an ACK per packet
atomic.AddInt32(&c.inFlight, -1)
select {
case c.ackNotifier <- struct{}{}:
default:
func (c *NewReno) ReceivedPacket(seq, nack, ack uint32) {
// decide what acks and nacks to send
if seq != 0 {
c.receivedSequence(seq)
}
// GROW
// CASE: exponential. increase window size by one per ack
// CASE: standard. increase window size by one per window of acks
if c.slowStart {
atomic.AddInt32(&c.windowSize, 1)
} else {
c.windowCount++
if c.windowCount == c.windowSize {
c.windowCount = 0
atomic.AddInt32(&c.windowSize, 1)
}
// decide how window size was affected
if nack != 0 {
c.receivedNack(nack)
}
}
// It is assumed that ReceivedNack will only be called by one thread
func (c *NewReno) ReceivedNack(nack uint32) {
log.Printf("nack received for %d", nack)
// TODO : Check for freshness
// End slow start
c.slowStart = false
if s := c.windowSize; s > 1 {
atomic.StoreInt32(&c.windowSize, s/2)
if ack != 0 {
c.receivedAck(ack)
}
}
func (c *NewReno) ReceivedPacket(seq uint32) {
log.Printf("seq received for %d", seq)
c.inboundTimes[seq] = time.Now()
c.acksToSendLock.Lock()
c.acksToSend.Insert(seq)
c.acksToSendLock.Unlock()
c.updateAckNack()
}
func (c *NewReno) updateAckNack() {
c.acksToSendLock.Lock()
defer c.acksToSendLock.Unlock()
c.inboundTimesLock.Lock()
defer c.inboundTimesLock.Unlock()
findAck := func(start uint32) uint32 {
ack := start
for len(c.acksToSend) > 0 {
if a, _ := c.acksToSend.Peek(); a == ack+1 {
ack, _ = c.acksToSend.Extract()
delete(c.inboundTimes, ack)
} else {
break
}
}
return ack
}
ack := findAck(c.ack)
if ack == c.ack {
// check if there is a nack to send
// decide this based on whether there have been 3RTTs between the offset packet
if len(c.acksToSend) > 0 {
nextAck, _ := c.acksToSend.Peek()
if time.Now().Sub(c.inboundTimes[nextAck]).Seconds() > c.rtt*3 {
atomic.StoreUint32(&c.nack, nextAck-1)
ack, _ = c.acksToSend.Extract()
ack = findAck(ack)
}
}
}
atomic.StoreUint32(&c.ack, ack)
}
func (c *NewReno) Sequence() uint32 {
c.outboundTimesLock.Lock()
defer c.outboundTimesLock.Unlock()
for c.inFlight >= c.windowSize {
<-c.ackNotifier
for len(c.inFlight) >= int(c.windowSize) {
<-c.windowNotifier
}
atomic.AddInt32(&c.inFlight, 1)
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
s := <-c.sequence
t := time.Now()
n := time.Now()
c.lastSent = n
c.outboundTimes[s] = n
c.inFlight = append(c.inFlight, flightInfo{
time: t,
sequence: s,
})
c.lastSent = t
return s
}
func (c *NewReno) NextAck() uint32 {
a := c.ack
c.lastAck = a
atomic.StoreUint32(&c.lastAck, a)
return a
}
func (c *NewReno) NextNack() uint32 {
n := c.nack
c.lastNack = n
atomic.StoreUint32(&c.lastNack, n)
return n
}
func (c *NewReno) AwaitEarlyUpdate(keepalive time.Duration) uint32 {
for {
rtt := time.Duration(math.Round(c.rtt * float64(time.Second)))
time.Sleep(rtt)
rtt := time.Duration(math.Round(c.rttNanos))
time.Sleep(rtt / 2)
c.updateAckNack()
c.checkNack()
// CASE 1: waiting ACKs or NACKs and no message sent in the last RTT
if ((c.lastAck != c.ack) || (c.lastNack != c.nack)) && time.Now().After(c.lastSent.Add(rtt)) {
return 0
// CASE 1: waiting ACKs or NACKs and no message sent in the last half-RTT
// this targets arrival in 0.5+0.5 ± 0.5 RTTs (1±0.5 RTTs)
if ((c.lastAck != c.ack) || (c.lastNack != c.nack)) && time.Now().After(c.lastSent.Add(rtt/2)) {
return 0 // no ack needed
}
// CASE 3: No message sent within the keepalive time
// CASE 2: No message sent within the keepalive time
if keepalive != 0 && time.Now().After(c.lastSent.Add(keepalive)) {
return c.Sequence()
return c.Sequence() // require an ack
}
}
}
func (c *NewReno) receivedSequence(seq uint32) {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
if seq < c.nack || seq < c.ack {
// packet received out of order has already been cumulatively NACKed
// or duplicate packet received and already ACKed
return
}
if seq != c.ack+1 && seq != c.nack+1 {
c.awaitingAck = append(c.awaitingAck, flightInfo{
time: time.Now(),
sequence: seq,
})
return // if this seq doesn't change the ack field, updateAck will still not do anything useful
}
sort.Sort(c.awaitingAck)
c.updateAck(seq)
}
func (c *NewReno) checkNack() {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
if len(c.awaitingAck) == 0 {
return
}
sort.Sort(c.awaitingAck)
lossThreshold := time.Duration(c.rttNanos * RttLossDelay)
if c.awaitingAck[0].time.Before(time.Now().Add(-lossThreshold)) {
// if the next packet sequence to ack was received more than an rttlossdelay ago
// mark the packet(s) blocking it as missing with a nack
// then update ack from the delayed packet
c.nack = c.awaitingAck[0].sequence - 1
c.updateAck(c.nack)
}
}
func (c *NewReno) updateAck(start uint32) {
a := start
var removed int
for _, e := range c.awaitingAck {
if e.sequence == a+1 {
a = e.sequence
removed++
} else {
break
}
}
c.ack = a
c.awaitingAck = c.awaitingAck[removed:]
}
func (c *NewReno) receivedNack(nack uint32) {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
// as both ack and nack are cumulative, inflight will always be ordered by seq
var i int
for i < len(c.inFlight) && c.inFlight[i].sequence <= nack {
i++
}
if i == 0 {
return
}
c.slowStart = false
c.inFlight = c.inFlight[i:]
for {
s := c.windowSize
if s == 1 || atomic.CompareAndSwapUint32(&c.windowSize, s, s/2) {
break
}
}
select {
case c.windowNotifier <- struct{}{}:
default:
}
}
func (c *NewReno) receivedAck(ack uint32) {
c.ackNackMu.Lock()
defer c.ackNackMu.Unlock()
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
// as both ack and nack are cumulative, inflight will always be ordered by seq
var i int
for i < len(c.inFlight) && c.inFlight[i].sequence <= ack {
rtt := float64(time.Now().Sub(c.inFlight[i].time).Nanoseconds())
c.rttNanos = c.rttNanos*(1-RttExponentialFactor) + rtt*RttExponentialFactor
i++
}
if i == 0 {
return
}
c.inFlight = c.inFlight[i:]
if c.slowStart {
atomic.AddUint32(&c.windowSize, uint32(i))
} else {
c.windowCount += uint32(i)
s := c.windowSize
if c.windowCount > s {
c.windowCount -= s
atomic.AddUint32(&c.windowSize, 1)
}
}
select {
case c.windowNotifier <- struct{}{}:
default:
}
}

View File

@ -0,0 +1,371 @@
package congestion
import (
"context"
"github.com/stretchr/testify/assert"
"sort"
"testing"
"time"
)
type congestionPacket struct {
seq, nack, ack uint32
}
type newRenoTest struct {
sideA, sideB *NewReno
aOutbound, bOutbound chan congestionPacket
aInbound, bInbound chan congestionPacket
halfRtt time.Duration
}
func newNewRenoTest(rtt time.Duration) *newRenoTest {
return &newRenoTest{
sideA: NewNewReno(),
sideB: NewNewReno(),
aOutbound: make(chan congestionPacket),
bOutbound: make(chan congestionPacket),
aInbound: make(chan congestionPacket),
bInbound: make(chan congestionPacket),
halfRtt: rtt / 2,
}
}
func (n *newRenoTest) Start(ctx context.Context) {
type packetWithTime struct {
t time.Time
p congestionPacket
}
go func() {
aOutboundDelayed := make(chan packetWithTime, 128)
bOutboundDelayed := make(chan packetWithTime, 128)
delayer := func(tp chan packetWithTime, cp chan congestionPacket) {
for {
select {
case p := <-tp:
s := p.t.Add(n.halfRtt).Sub(time.Now())
time.Sleep(s)
cp <- p.p
case <-ctx.Done():
return
}
}
}
go delayer(aOutboundDelayed, n.bInbound)
go delayer(bOutboundDelayed, n.aInbound)
for {
select {
case <-ctx.Done():
return
case p := <-n.aOutbound:
aOutboundDelayed <- packetWithTime{t: time.Now(), p: p}
case p := <-n.bOutbound:
bOutboundDelayed <- packetWithTime{t: time.Now(), p: p}
}
}
}()
}
func (n *newRenoTest) RunSideA(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case p := <-n.aInbound:
n.sideA.ReceivedPacket(p.seq, p.nack, p.ack)
}
}
}()
go func() {
for {
if ctx.Err() != nil {
return
}
seq := n.sideA.AwaitEarlyUpdate(500 * time.Millisecond)
if seq != 0 {
// skip keepalive
// required to ensure AwaitEarlyUpdate terminates
continue
}
p := congestionPacket{
seq: seq,
nack: n.sideA.NextNack(),
ack: n.sideA.NextAck(),
}
n.aOutbound <- p
}
}()
}
func (n *newRenoTest) RunSideB(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case p := <-n.bInbound:
n.sideB.ReceivedPacket(p.seq, p.nack, p.ack)
}
}
}()
go func() {
for {
if ctx.Err() != nil {
return
}
seq := n.sideB.AwaitEarlyUpdate(500 * time.Millisecond)
if seq != 0 {
// skip keepalive
// required to ensure AwaitEarlyUpdate terminates
continue
}
p := congestionPacket{
seq: seq,
nack: n.sideB.NextNack(),
ack: n.sideB.NextAck(),
}
n.bOutbound <- p
}
}()
}
func TestNewReno_Congestion(t *testing.T) {
t.Run("OneWay", func(t *testing.T) {
t.Run("Lossless", func(t *testing.T) {
// ASSIGN
rtt := 80 * time.Millisecond
numPackets := 50
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newNewRenoTest(rtt)
c.Start(ctx)
c.RunSideA(ctx)
c.RunSideB(ctx)
// ACT
for i := 0; i < numPackets; i++ {
// sleep to simulate preparing packet
time.Sleep(1 * time.Millisecond)
seq := c.sideA.Sequence()
c.aOutbound <- congestionPacket{
seq: seq,
nack: c.sideA.NextNack(),
ack: c.sideA.NextAck(),
}
}
// allow the systems to catch up before asserting
time.Sleep(rtt + 30*time.Millisecond)
// ASSERT
assert.Equal(t, uint32(0), c.sideA.nack)
assert.Equal(t, uint32(0), c.sideA.ack)
assert.Equal(t, uint32(0), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
t.Run("SequenceLoss", func(t *testing.T) {
// ASSIGN
rtt := 80 * time.Millisecond
numPackets := 50
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newNewRenoTest(rtt)
c.Start(ctx)
c.RunSideA(ctx)
c.RunSideB(ctx)
// ACT
for i := 0; i < numPackets; i++ {
// sleep to simulate preparing packet
time.Sleep(1 * time.Millisecond)
seq := c.sideA.Sequence()
if seq == 20 {
// Simulate packet loss of sequence 20
continue
}
c.aOutbound <- congestionPacket{
seq: seq,
nack: c.sideA.NextNack(),
ack: c.sideA.NextAck(),
}
}
time.Sleep(rtt + 30*time.Millisecond)
// ASSERT
assert.Equal(t, uint32(0), c.sideA.nack)
assert.Equal(t, uint32(0), c.sideA.ack)
assert.Equal(t, uint32(20), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
})
t.Run("TwoWay", func(t *testing.T) {
t.Run("Lossless", func(t *testing.T) {
// ASSIGN
rtt := 80 * time.Millisecond
numPackets := 50
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newNewRenoTest(rtt)
c.Start(ctx)
c.RunSideA(ctx)
c.RunSideB(ctx)
// ACT
done := make(chan struct{})
go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(1 * time.Millisecond)
seq := c.sideA.Sequence()
c.aOutbound <- congestionPacket{
seq: seq,
nack: c.sideA.NextNack(),
ack: c.sideA.NextAck(),
}
}
done <- struct{}{}
}()
go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(1 * time.Millisecond)
seq := c.sideB.Sequence()
c.bOutbound <- congestionPacket{
seq: seq,
nack: c.sideB.NextNack(),
ack: c.sideB.NextAck(),
}
}
done <- struct{}{}
}()
<-done
<-done
time.Sleep(rtt + 30*time.Millisecond)
// ASSERT
assert.Equal(t, uint32(0), c.sideA.nack)
assert.Equal(t, uint32(numPackets), c.sideA.ack)
assert.Equal(t, uint32(0), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
t.Run("SequenceLoss", func(t *testing.T) {
// ASSIGN
rtt := 80 * time.Millisecond
numPackets := 100
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := newNewRenoTest(rtt)
c.Start(ctx)
c.RunSideA(ctx)
c.RunSideB(ctx)
// ACT
done := make(chan struct{})
go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(1 * time.Millisecond)
seq := c.sideA.Sequence()
if seq == 9 {
// Simulate packet loss of sequence 9
continue
}
c.aOutbound <- congestionPacket{
seq: seq,
nack: c.sideA.NextNack(),
ack: c.sideA.NextAck(),
}
}
done <- struct{}{}
}()
go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(1 * time.Millisecond)
seq := c.sideB.Sequence()
if seq == 13 {
// Simulate packet loss of sequence 13
continue
}
c.bOutbound <- congestionPacket{
seq: seq,
nack: c.sideB.NextNack(),
ack: c.sideB.NextAck(),
}
}
done <- struct{}{}
}()
<-done
<-done
time.Sleep(rtt + 30*time.Millisecond)
// ASSERT
assert.Equal(t, uint32(13), c.sideA.nack)
assert.Equal(t, uint32(numPackets), c.sideA.ack)
assert.Equal(t, uint32(9), c.sideB.nack)
assert.Equal(t, uint32(numPackets), c.sideB.ack)
})
})
}
func TestSortableFlights_Less(t *testing.T) {
// ASSIGN
a := []flightInfo{{sequence: 0}, {sequence: 6}, {sequence: 3}, {sequence: 2}}
// ACT
sort.Sort(sortableFlights(a))
// ASSERT
assert.Equal(t, []flightInfo{{sequence: 0}, {sequence: 2}, {sequence: 3}, {sequence: 6}}, a)
}

View File

@ -34,10 +34,7 @@ func (c *None) String() string {
return fmt.Sprintf("{None}")
}
func (c *None) ReceivedPacket(uint32) {}
func (c *None) ReceivedAck(uint32) {}
func (c *None) ReceivedNack(uint32) {}
func (c *None) Reset() {}
func (c *None) ReceivedPacket(uint32, uint32, uint32) {}
func (c *None) NextNack() uint32 { return 0 }
func (c *None) NextAck() uint32 { return 0 }
func (c *None) AwaitEarlyUpdate(time.Duration) uint32 { select {} }

View File

@ -1,7 +1,6 @@
package udp
import (
"errors"
"fmt"
"log"
"mpbl3p/proxy"
@ -120,6 +119,7 @@ func (f *InitiatedFlow) Reconnect() error {
}
_ = f.sendPacket(p, f.g)
time.Sleep(1 * time.Second)
}
}()
@ -217,18 +217,8 @@ func (f *Flow) produceInternal(v proxy.MacVerifier, mustReturn bool) (proxy.Pack
return nil, err
}
// schedule an ack for this sequence number
if p.seq != 0 {
f.congestion.ReceivedPacket(p.seq)
}
// adjust our sending congestion control based on their acks
if p.ack != 0 {
f.congestion.ReceivedAck(p.ack)
}
// adjust our sending congestion control based on their nacks
if p.nack != 0 {
f.congestion.ReceivedNack(p.nack)
}
// adjust congestion control based on this packet's congestion header
f.congestion.ReceivedPacket(p.seq, p.nack, p.ack)
// 12 bytes for header + the MAC + a timestamp
if len(b) == 12+f.v.CodeLength()+8 {
@ -260,8 +250,7 @@ func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error {
}
func (f *Flow) earlyUpdateLoop(g proxy.MacGenerator, keepalive time.Duration) {
var err error
for !errors.Is(err, shared.ErrDeadConnection) {
for f.isAlive {
seq := f.congestion.AwaitEarlyUpdate(keepalive)
p := Packet{
ack: f.congestion.NextAck(),

View File

@ -32,4 +32,5 @@ function mpbl3p_udp.dissector(buffer, pinfo, tree)
end
end
DissectorTable.get("udp.port"):add(1234, mpbl3p_udp)
DissectorTable.get("udp.port"):add(4724, mpbl3p_udp)
DissectorTable.get("udp.port"):add(4725, mpbl3p_udp)

View File

@ -1,65 +0,0 @@
package utils
import "errors"
var ErrorEmptyHeap = errors.New("attempted to extract from empty heap")
// A MinHeap for Uint64
type Uint32Heap []uint32
func (h *Uint32Heap) swap(x, y int) {
(*h)[x] = (*h)[x] ^ (*h)[y]
(*h)[y] = (*h)[y] ^ (*h)[x]
(*h)[x] = (*h)[x] ^ (*h)[y]
}
func (h *Uint32Heap) Insert(new uint32) uint32 {
*h = append(*h, new)
child := len(*h) - 1
for child != 0 {
parent := (child - 1) / 2
if (*h)[parent] > (*h)[child] {
h.swap(parent, child)
} else {
break
}
child = parent
}
return (*h)[0]
}
func (h *Uint32Heap) Extract() (uint32, error) {
if len(*h) == 0 {
return 0, ErrorEmptyHeap
}
min := (*h)[0]
(*h)[0] = (*h)[len(*h)-1]
*h = (*h)[:len(*h)-1]
parent := 0
for {
left, right := parent*2+1, parent*2+2
if (left < len(*h) && (*h)[parent] > (*h)[left]) || (right < len(*h) && (*h)[parent] > (*h)[right]) {
if right < len(*h) && (*h)[left] > (*h)[right] {
h.swap(parent, right)
parent = right
} else {
h.swap(parent, left)
parent = left
}
} else {
return min, nil
}
}
}
func (h *Uint32Heap) Peek() (uint32, error) {
if len(*h) == 0 {
return 0, ErrorEmptyHeap
}
return (*h)[0], nil
}

View File

@ -1,54 +0,0 @@
package utils
import (
"fmt"
"github.com/stretchr/testify/assert"
"math/rand"
"testing"
"time"
)
func SlowHeapSort(in []uint32) []uint32 {
out := make([]uint32, len(in))
var heap Uint32Heap
for _, x := range in {
heap.Insert(x)
}
for i := range out {
var err error
out[i], err = heap.Extract()
if err != nil {
panic(err)
}
}
return out
}
func TestUint32Heap(t *testing.T) {
t.Run("EquivalentToMerge", func(t *testing.T) {
const ArrayLength = 50
sortedArray := make([]uint32, ArrayLength)
array := make([]uint32, ArrayLength)
for i := range array {
sortedArray[i] = uint32(i)
array[i] = uint32(i)
}
rand.Seed(time.Now().Unix())
for i := 0; i < 100; i++ {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
rand.Shuffle(50, func(i, j int) { array[i], array[j] = array[j], array[i] })
heapSorted := SlowHeapSort(array)
assert.Equal(t, sortedArray, heapSorted)
})
}
})
}