udp testing
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Jake Hillion 2020-11-26 22:10:37 +00:00
parent 787f80dc90
commit d65e8d3571
10 changed files with 312 additions and 86 deletions

View File

@ -13,7 +13,14 @@ func main() {
log.Println("loading config...") log.Println("loading config...")
c, err := config.LoadConfig("config.ini") var configLoc string
if v, ok := os.LookupEnv("CONFIG_LOC"); ok {
configLoc = v
} else {
configLoc = "config.ini"
}
c, err := config.LoadConfig(configLoc)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -75,13 +75,13 @@ func (p Proxy) AddConsumer(c Consumer) {
if reconnectable { if reconnectable {
var err error var err error
for once := true; err != nil || once; once = false { for once := true; err != nil || once; once = false {
log.Printf("attempting to connect `%v`\n", c) log.Printf("attempting to connect consumer `%v`\n", c)
err = c.(Reconnectable).Reconnect() err = c.(Reconnectable).Reconnect()
if !once { if !once {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }
log.Printf("connected `%v`\n", c) log.Printf("connected consumer `%v`\n", c)
} }
for c.IsAlive() { for c.IsAlive() {
@ -92,7 +92,7 @@ func (p Proxy) AddConsumer(c Consumer) {
} }
} }
log.Printf("closed connection `%v`\n", c) log.Printf("closed consumer `%v`\n", c)
}() }()
} }
@ -104,13 +104,13 @@ func (p Proxy) AddProducer(pr Producer, v MacVerifier) {
if reconnectable { if reconnectable {
var err error var err error
for once := true; err != nil || once; once = false { for once := true; err != nil || once; once = false {
log.Printf("attempting to connect `%v`\n", pr) log.Printf("attempting to connect producer `%v`\n", pr)
err = pr.(Reconnectable).Reconnect() err = pr.(Reconnectable).Reconnect()
if !once { if !once {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }
log.Printf("connected `%v`\n", pr) log.Printf("connected producer `%v`\n", pr)
} }
for pr.IsAlive() { for pr.IsAlive() {
@ -123,6 +123,6 @@ func (p Proxy) AddProducer(pr Producer, v MacVerifier) {
} }
} }
log.Printf("closed connection `%v`\n", pr) log.Printf("closed producer `%v`\n", pr)
}() }()
} }

View File

@ -88,10 +88,6 @@ func (f *InitiatedFlow) Reconnect() error {
return nil return nil
} }
func (f *Flow) IsAlive() bool {
return f.isAlive
}
func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error { func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error {
f.mu.RLock() f.mu.RLock()
defer f.mu.RUnlock() defer f.mu.RUnlock()
@ -99,6 +95,17 @@ func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error {
return f.Flow.Consume(p, g) return f.Flow.Consume(p, g)
} }
func (f *InitiatedFlow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(v)
}
func (f *Flow) IsAlive() bool {
return f.isAlive
}
func (f *Flow) Consume(p proxy.Packet, g proxy.MacGenerator) (err error) { func (f *Flow) Consume(p proxy.Packet, g proxy.MacGenerator) (err error) {
if !f.isAlive { if !f.isAlive {
return shared.ErrDeadConnection return shared.ErrDeadConnection
@ -127,13 +134,6 @@ func (f *Flow) consumeMarshalled(data []byte) error {
return err return err
} }
func (f *InitiatedFlow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(v)
}
func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) { func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
if !f.isAlive { if !f.isAlive {
return nil, shared.ErrDeadConnection return nil, shared.ErrDeadConnection

View File

@ -31,7 +31,7 @@ func NewListener(p *proxy.Proxy, local string, v proxy.MacVerifier) error {
f := Flow{conn: conn, isAlive: true} f := Flow{conn: conn, isAlive: true}
log.Printf("received new connection: %v\n", f) log.Printf("received new tcp connection: %v\n", f)
p.AddConsumer(&f) p.AddConsumer(&f)
p.AddProducer(&f, v) p.AddProducer(&f, v)

View File

@ -1,5 +1,7 @@
package udp package udp
import "time"
type Congestion interface { type Congestion interface {
Sequence() uint32 Sequence() uint32
ReceivedPacket(seq uint32) ReceivedPacket(seq uint32)
@ -9,4 +11,6 @@ type Congestion interface {
ReceivedNack(uint32) ReceivedNack(uint32)
NextNack() uint32 NextNack() uint32
AwaitEarlyUpdate(keepalive time.Duration)
} }

View File

@ -1,20 +1,33 @@
package congestion package congestion
import ( import (
"math"
"mpbl3p/utils" "mpbl3p/utils"
"sync/atomic"
"time" "time"
) )
const RttExponentialFactor = 0.1
type NewReno struct { type NewReno struct {
sequence chan uint32 sequence chan uint32
packetTimes map[uint32]time.Time keepalive chan bool
nextAck uint32 outboundTimes map[uint32]time.Time
nextNack uint32 inboundTimes map[uint32]time.Time
fastStart bool ack, lastAck uint32
windowSize uint nack, lastNack uint32
rtt time.Duration
slowStart bool
rtt float64
windowSize int32
windowCount int32
inFlight int32
ackNotifier chan struct{}
lastSent time.Time
acksToSend utils.Uint32Heap acksToSend utils.Uint32Heap
} }
@ -22,8 +35,14 @@ type NewReno struct {
func NewNewReno() *NewReno { func NewNewReno() *NewReno {
c := NewReno{ c := NewReno{
sequence: make(chan uint32), sequence: make(chan uint32),
packetTimes: make(map[uint32]time.Time), ackNotifier: make(chan struct{}),
windowSize: 1,
outboundTimes: make(map[uint32]time.Time),
inboundTimes: make(map[uint32]time.Time),
windowSize: 1,
rtt: (1 * time.Millisecond).Seconds(),
slowStart: true,
} }
go func() { go func() {
@ -45,41 +64,113 @@ func NewNewReno() *NewReno {
func (c *NewReno) ReceivedAck(ack uint32) { func (c *NewReno) ReceivedAck(ack uint32) {
// RTT // RTT
// Update using an exponential average // Update using an exponential average
rtt := time.Now().Sub(c.outboundTimes[ack]).Seconds()
delete(c.outboundTimes, ack)
c.rtt = c.rtt*(1-RttExponentialFactor) + rtt*RttExponentialFactor
// Free Window
atomic.AddInt32(&c.inFlight, -1)
select {
case c.ackNotifier <- struct{}{}:
default:
}
// GROW // GROW
// CASE: exponential. increase window size by one per ack // CASE: exponential. increase window size by one per ack
// CASE: standard. increase window size by one per window of acks // CASE: standard. increase window size by one per window of acks
if c.slowStart {
atomic.AddInt32(&c.windowSize, 1)
} else {
c.windowCount++
if c.windowCount == c.windowSize {
c.windowCount = 0
atomic.AddInt32(&c.windowSize, 1)
}
}
} }
// It is assumed that ReceivedNack will only be called by one thread // It is assumed that ReceivedNack will only be called by one thread
func (c *NewReno) ReceivedNack(nack uint32) { func (c *NewReno) ReceivedNack(nack uint32) {
// Back off // End slow start
c.slowStart = false
if s := c.windowSize; s > 1 {
atomic.StoreInt32(&c.windowSize, s/2)
}
} }
func (c *NewReno) ReceivedPacket(seq uint32) { func (c *NewReno) ReceivedPacket(seq uint32) {
c.inboundTimes[seq] = time.Now()
c.acksToSend.Insert(seq) c.acksToSend.Insert(seq)
ack, err := c.acksToSend.Extract() findAck := func(start uint32) uint32 {
if err != nil { ack := start
panic(err) for len(c.acksToSend) > 0 {
if a, _ := c.acksToSend.Peek(); a == ack+1 {
ack, _ = c.acksToSend.Extract()
} else {
break
}
}
return ack
} }
for a, _ := c.acksToSend.Peek(); a == ack+1; { ack := findAck(c.ack)
ack, _ = c.acksToSend.Extract() if ack == c.ack {
// check if there is a nack to send
// decide this based on whether there have been 3RTTs between the offset packet
if len(c.acksToSend) > 0 {
nextAck, _ := c.acksToSend.Peek()
if time.Now().Sub(c.inboundTimes[nextAck]).Seconds() > c.rtt*3 {
atomic.StoreUint32(&c.nack, nextAck-1)
ack, _ = c.acksToSend.Extract()
ack = findAck(ack)
}
}
} }
atomic.StoreUint32(&c.ack, ack)
} }
func (c *NewReno) Sequence() uint32 { func (c *NewReno) Sequence() uint32 {
for c.inFlight >= c.windowSize {
<-c.ackNotifier
}
atomic.AddInt32(&c.inFlight, 1)
s := <-c.sequence s := <-c.sequence
c.packetTimes[s] = time.Now()
n := time.Now()
c.lastSent = n
c.outboundTimes[s] = n
return s return s
} }
func (c *NewReno) NextAck() uint32 { func (c *NewReno) NextAck() uint32 {
return c.nextAck a := c.ack
c.lastAck = a
return a
} }
func (c *NewReno) NextNack() uint32 { func (c *NewReno) NextNack() uint32 {
return c.nextNack n := c.nack
c.lastNack = n
return n
}
func (c *NewReno) AwaitEarlyUpdate(keepalive time.Duration) {
for {
rtt := time.Duration(math.Round(c.rtt * float64(time.Second)))
time.Sleep(rtt)
// CASE 1: > 5 waiting ACKs or any waiting NACKs and no message sent in the last RTT
if (c.lastAck-c.ack) > 5 || (c.lastNack != c.nack) && time.Now().After(c.lastSent.Add(rtt)) {
return
}
// CASE 3: No message sent within the keepalive time
if keepalive != 0 && time.Now().After(c.lastSent.Add(keepalive)) {
return
}
}
} }

View File

@ -1,13 +1,20 @@
package udp package udp
import ( import (
"errors"
"fmt"
"log"
"mpbl3p/config"
"mpbl3p/proxy" "mpbl3p/proxy"
"mpbl3p/shared"
"net" "net"
"sync" "sync"
"time"
) )
type PacketWriter interface { type PacketWriter interface {
WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error)
LocalAddr() net.Addr
} }
type PacketConn interface { type PacketConn interface {
@ -16,13 +23,20 @@ type PacketConn interface {
} }
type InitiatedFlow struct { type InitiatedFlow struct {
Local string Local string
Remote string Remote string
g proxy.MacGenerator
keepalive time.Duration
mu sync.RWMutex mu sync.RWMutex
Flow Flow
} }
func (f *InitiatedFlow) String() string {
return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local, f.Remote)
}
type Flow struct { type Flow struct {
writer PacketWriter writer PacketWriter
raddr net.UDPAddr raddr net.UDPAddr
@ -33,34 +47,107 @@ type Flow struct {
inboundDatagrams chan []byte inboundDatagrams chan []byte
} }
func newOutboundFlow(c Congestion) *Flow { func (f Flow) String() string {
return &Flow{ return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr())
congestion: c,
inboundDatagrams: make(chan []byte),
}
} }
func newInboundFlow(c Congestion) *Flow { func InitiateFlow(
return &Flow{ local, remote string,
congestion: c, g proxy.MacGenerator,
inboundDatagrams: make(chan []byte), c Congestion,
} keepalive time.Duration,
} ) (*InitiatedFlow, error) {
func InitiateFlow(local, remote string) (*InitiatedFlow, error) {
f := InitiatedFlow{ f := InitiatedFlow{
Local: local, Local: local,
Remote: remote, Remote: remote,
Flow: newFlow(c),
g: g,
keepalive: keepalive,
} }
return &f, nil return &f, nil
} }
func newFlow(c Congestion) Flow {
return Flow{
inboundDatagrams: make(chan []byte),
congestion: c,
}
}
func (f *InitiatedFlow) Reconnect() error {
f.mu.Lock()
defer f.mu.Unlock()
if f.isAlive {
return nil
}
localAddr, err := net.ResolveUDPAddr("udp", f.Local)
if err != nil {
return err
}
remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote)
if err != nil {
return err
}
conn, err := net.DialUDP("udp", localAddr, remoteAddr)
if err != nil {
return err
}
f.writer = conn
f.isAlive = true
go func() {
for {
buf := make([]byte, 6000)
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
panic(err)
}
f.inboundDatagrams <- buf[:n]
}
}()
go func() {
var err error
for !errors.Is(err, shared.ErrDeadConnection) {
f.congestion.AwaitEarlyUpdate(f.keepalive)
err = f.Consume(proxy.NewSimplePacket(nil), f.g)
}
}()
return nil
}
func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(p, g)
}
func (f *InitiatedFlow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(v)
}
func (f *Flow) IsAlive() bool { func (f *Flow) IsAlive() bool {
return f.isAlive return f.isAlive
} }
func (f *Flow) Consume(pp proxy.Packet, g proxy.MacGenerator) error { func (f *Flow) Consume(pp proxy.Packet, g proxy.MacGenerator) error {
if !f.isAlive {
return shared.ErrDeadConnection
}
// Sequence is the congestion controllers opportunity to block
p := Packet{ p := Packet{
seq: f.congestion.Sequence(), seq: f.congestion.Sequence(),
data: pp, data: pp,
@ -77,6 +164,10 @@ func (f *Flow) Consume(pp proxy.Packet, g proxy.MacGenerator) error {
} }
func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) { func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
if !f.isAlive {
return nil, shared.ErrDeadConnection
}
b, err := proxy.StripMac(<-f.inboundDatagrams, v) b, err := proxy.StripMac(<-f.inboundDatagrams, v)
if err != nil { if err != nil {
return nil, err return nil, err
@ -103,5 +194,30 @@ func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
} }
func (f *Flow) handleDatagram(p []byte) { func (f *Flow) handleDatagram(p []byte) {
f.inboundDatagrams <- p // TODO: Fix with security
// 12 bytes for header + the MAC + a timestamp
if len(p) == 12+(config.UselessMac{}).CodeLength()+8 {
b, err := proxy.StripMac(<-f.inboundDatagrams, config.UselessMac{})
if err != nil {
log.Println(err)
return
}
p, err := UnmarshalPacket(b)
if err != nil {
log.Println(err)
return
}
// TODO: Decide whether to use this line. It means an ACK loop will start, but also is a packet loss.
f.congestion.ReceivedPacket(p.seq)
if p.ack != 0 {
f.congestion.ReceivedAck(p.ack)
}
if p.nack != 0 {
f.congestion.ReceivedNack(p.nack)
}
} else {
f.inboundDatagrams <- p
}
} }

View File

@ -1,8 +1,10 @@
package udp package udp
import ( import (
"errors"
"log" "log"
"mpbl3p/proxy" "mpbl3p/proxy"
"mpbl3p/shared"
"net" "net"
) )
@ -25,7 +27,7 @@ func fromUdpAddress(address net.UDPAddr) ComparableUdpAddress {
} }
} }
func NewListener(p *proxy.Proxy, local string, v proxy.MacVerifier) error { func NewListener(p *proxy.Proxy, local string, v proxy.MacVerifier, g proxy.MacGenerator, c Congestion) error {
laddr, err := net.ResolveUDPAddr("udp", local) laddr, err := net.ResolveUDPAddr("udp", local)
if err != nil { if err != nil {
return err return err
@ -58,15 +60,23 @@ func NewListener(p *proxy.Proxy, local string, v proxy.MacVerifier) error {
continue continue
} }
f := Flow{ f := newFlow(c)
writer: pconn,
raddr: *addr, f.writer = pconn
isAlive: true, f.raddr = *addr
} f.isAlive = true
go func() {
var err error
for !errors.Is(err, shared.ErrDeadConnection) {
f.congestion.AwaitEarlyUpdate(0)
err = f.Consume(proxy.NewSimplePacket(nil), g)
}
}()
receivedConnections[raddr] = &f receivedConnections[raddr] = &f
log.Printf("received new connection: %v\n", f) log.Printf("received new udp connection: %v\n", f)
p.AddConsumer(&f) p.AddConsumer(&f)
p.AddProducer(&f, v) p.AddProducer(&f, v)

View File

@ -5,23 +5,21 @@ import "errors"
var ErrorEmptyHeap = errors.New("attempted to extract from empty heap") var ErrorEmptyHeap = errors.New("attempted to extract from empty heap")
// A MinHeap for Uint64 // A MinHeap for Uint64
type Uint32Heap struct { type Uint32Heap []uint32
data []uint32
}
func (h *Uint32Heap) swap(x, y int) { func (h *Uint32Heap) swap(x, y int) {
h.data[x] = h.data[x] ^ h.data[y] (*h)[x] = (*h)[x] ^ (*h)[y]
h.data[y] = h.data[y] ^ h.data[x] (*h)[y] = (*h)[y] ^ (*h)[x]
h.data[x] = h.data[x] ^ h.data[y] (*h)[x] = (*h)[x] ^ (*h)[y]
} }
func (h *Uint32Heap) Insert(new uint32) uint32 { func (h *Uint32Heap) Insert(new uint32) uint32 {
h.data = append(h.data, new) *h = append(*h, new)
child := len(h.data) - 1 child := len(*h) - 1
for child != 0 { for child != 0 {
parent := (child - 1) / 2 parent := (child - 1) / 2
if h.data[parent] > h.data[child] { if (*h)[parent] > (*h)[child] {
h.swap(parent, child) h.swap(parent, child)
} else { } else {
break break
@ -29,24 +27,24 @@ func (h *Uint32Heap) Insert(new uint32) uint32 {
child = parent child = parent
} }
return h.data[0] return (*h)[0]
} }
func (h *Uint32Heap) Extract() (uint32, error) { func (h *Uint32Heap) Extract() (uint32, error) {
if len(h.data) == 0 { if len(*h) == 0 {
return 0, ErrorEmptyHeap return 0, ErrorEmptyHeap
} }
min := h.data[0] min := (*h)[0]
h.data[0] = h.data[len(h.data)-1] (*h)[0] = (*h)[len(*h)-1]
h.data = h.data[:len(h.data)-1] *h = (*h)[:len(*h)-1]
parent := 0 parent := 0
for { for {
left, right := parent*2+1, parent*2+2 left, right := parent*2+1, parent*2+2
if (left < len(h.data) && h.data[parent] > h.data[left]) || (right < len(h.data) && h.data[parent] > h.data[right]) { if (left < len(*h) && (*h)[parent] > (*h)[left]) || (right < len(*h) && (*h)[parent] > (*h)[right]) {
if right < len(h.data) && h.data[left] > h.data[right] { if right < len(*h) && (*h)[left] > (*h)[right] {
h.swap(parent, right) h.swap(parent, right)
parent = right parent = right
} else { } else {
@ -60,8 +58,8 @@ func (h *Uint32Heap) Extract() (uint32, error) {
} }
func (h *Uint32Heap) Peek() (uint32, error) { func (h *Uint32Heap) Peek() (uint32, error) {
if len(h.data) == 0 { if len(*h) == 0 {
return 0, ErrorEmptyHeap return 0, ErrorEmptyHeap
} }
return h.data[0], nil return (*h)[0], nil
} }

View File

@ -8,8 +8,8 @@ import (
"time" "time"
) )
func SlowHeapSort(in []uint64) []uint64 { func SlowHeapSort(in []uint32) []uint32 {
out := make([]uint64, len(in)) out := make([]uint32, len(in))
var heap Uint32Heap var heap Uint32Heap
@ -27,16 +27,16 @@ func SlowHeapSort(in []uint64) []uint64 {
return out return out
} }
func TestUint64Heap(t *testing.T) { func TestUint32Heap(t *testing.T) {
t.Run("EquivalentToMerge", func(t *testing.T) { t.Run("EquivalentToMerge", func(t *testing.T) {
const ArrayLength = 50 const ArrayLength = 50
sortedArray := make([]uint64, ArrayLength) sortedArray := make([]uint32, ArrayLength)
array := make([]uint64, ArrayLength) array := make([]uint32, ArrayLength)
for i := range array { for i := range array {
sortedArray[i] = uint64(i) sortedArray[i] = uint32(i)
array[i] = uint64(i) array[i] = uint32(i)
} }
rand.Seed(time.Now().Unix()) rand.Seed(time.Now().Unix())