dissertation-2-code/udp/listener.go
Jake Hillion a0654b0016
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
enable/disable for listeners
2021-04-09 19:00:35 +01:00

105 lines
1.9 KiB
Go

package udp
import (
"context"
"log"
"mpbl3p/proxy"
"net"
"time"
)
type ComparableUdpAddress struct {
IP [16]byte
Port int
Zone string
}
func fromUdpAddress(address net.UDPAddr) ComparableUdpAddress {
var ip [16]byte
for i, b := range []byte(address.IP) {
ip[i] = b
}
return ComparableUdpAddress{
IP: ip,
Port: address.Port,
Zone: address.Zone,
}
}
func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() proxy.MacVerifier, g func() proxy.MacGenerator, c func() Congestion, enableConsumers bool, enableProducers bool) error {
laddr, err := net.ResolveUDPAddr("udp", local)
if err != nil {
return err
}
pconn, err := net.ListenUDP("udp", laddr)
if err != nil {
return err
}
err = pconn.SetWriteBuffer(0)
if err != nil {
panic(err)
}
receivedConnections := make(map[ComparableUdpAddress]*Flow)
go func() {
for ctx.Err() == nil {
buf := make([]byte, 6000)
if err := pconn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
panic(err)
}
n, addr, err := pconn.ReadFromUDP(buf)
if err != nil {
if e, ok := err.(net.Error); ok && e.Timeout() {
continue
}
panic(err)
}
raddr := fromUdpAddress(*addr)
if f, exists := receivedConnections[raddr]; exists {
log.Println("existing flow. queuing...")
if err := f.queueDatagram(ctx, buf[:n]); err != nil {
}
log.Println("queued")
continue
}
v := v()
g := g()
f := newFlow(c(), v)
f.writer = pconn
f.raddr = addr
f.isAlive = true
log.Printf("received new udp connection: %v\n", f)
go f.earlyUpdateLoop(ctx, g, 0)
receivedConnections[raddr] = &f
if enableConsumers {
p.AddConsumer(ctx, &f, g)
}
if enableProducers {
p.AddProducer(ctx, &f, v)
}
log.Println("handling...")
if err := f.queueDatagram(ctx, buf[:n]); err != nil {
return
}
log.Println("handled")
}
}()
return nil
}