dissertation-2-code/udp/flow.go
Jake Hillion c7e8c64751
Some checks failed
continuous-integration/drone/push Build is failing
correct udp flow tests
2021-05-13 22:11:07 +01:00

315 lines
5.8 KiB
Go

package udp
import (
"context"
"fmt"
"log"
"mpbl3p/proxy"
"mpbl3p/shared"
"net"
"sync"
"time"
)
type PacketWriter interface {
Write(b []byte) (int, error)
WriteToUDP(b []byte, addr *net.UDPAddr) (int, error)
LocalAddr() net.Addr
}
type PacketConn interface {
PacketWriter
ReadFromUDP(b []byte) (int, *net.UDPAddr, error)
}
type InitiatedFlow struct {
Local func() string
Remote string
keepalive time.Duration
mu sync.RWMutex
Flow
}
func (f *InitiatedFlow) String() string {
return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local(), f.Remote)
}
type Flow struct {
writer PacketWriter
raddr *net.UDPAddr
isAlive bool
startup bool
congestion Congestion
verifiers []proxy.MacVerifier
generators []proxy.MacGenerator
inboundDatagrams chan []byte
}
func (f Flow) String() string {
return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr())
}
func InitiateFlow(
local func() string,
remote string,
vs []proxy.MacVerifier,
gs []proxy.MacGenerator,
c Congestion,
keepalive time.Duration,
) (*InitiatedFlow, error) {
f := InitiatedFlow{
Local: local,
Remote: remote,
Flow: newFlow(c, vs, gs),
keepalive: keepalive,
}
return &f, nil
}
func newFlow(c Congestion, vs []proxy.MacVerifier, gs []proxy.MacGenerator) Flow {
return Flow{
inboundDatagrams: make(chan []byte),
congestion: c,
verifiers: vs,
generators: gs,
}
}
func (f *InitiatedFlow) Reconnect(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.isAlive {
return nil
}
localAddr, err := net.ResolveUDPAddr("udp", f.Local())
if err != nil {
return err
}
remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote)
if err != nil {
return err
}
conn, err := net.DialUDP("udp", localAddr, remoteAddr)
if err != nil {
return err
}
f.writer = conn
f.startup = true
// prod the connection once a second until we get an ack, then consider it alive
go func() {
seq, err := f.congestion.Sequence(ctx)
if err != nil {
return
}
for !f.isAlive {
if ctx.Err() != nil {
return
}
p := Packet{
ack: 0,
nack: 0,
seq: seq,
data: proxy.SimplePacket(nil),
}
_ = f.sendPacket(p)
time.Sleep(1 * time.Second)
}
}()
go func() {
_, _ = f.produceInternal(ctx, false)
}()
go f.earlyUpdateLoop(ctx, f.keepalive)
if err := f.readQueuePacket(ctx, conn); err != nil {
return err
}
f.isAlive = true
f.startup = false
go func() {
lockedAccept := func() {
f.mu.RLock()
defer f.mu.RUnlock()
if err := f.readQueuePacket(ctx, conn); err != nil {
log.Println(err)
}
}
for f.isAlive {
log.Println("alive and listening for packets")
lockedAccept()
}
log.Println("no longer alive")
}()
return nil
}
func (f *InitiatedFlow) Consume(ctx context.Context, p proxy.Packet) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(ctx, p)
}
func (f *InitiatedFlow) Produce(ctx context.Context) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(ctx)
}
func (f *Flow) IsAlive() bool {
return f.isAlive
}
func (f *Flow) Consume(ctx context.Context, pp proxy.Packet) error {
if !f.isAlive {
return shared.ErrDeadConnection
}
log.Println(f.congestion)
// Sequence is the congestion controllers opportunity to block
log.Println("awaiting sequence")
seq, err := f.congestion.Sequence(ctx)
if err != nil {
return err
}
log.Println("received sequence")
// Choose up to date ACK/NACK even after blocking
p := Packet{
seq: seq,
data: pp,
ack: f.congestion.NextAck(),
nack: f.congestion.NextNack(),
}
return f.sendPacket(p)
}
func (f *Flow) Produce(ctx context.Context) (proxy.Packet, error) {
if !f.isAlive {
return nil, shared.ErrDeadConnection
}
return f.produceInternal(ctx, true)
}
func (f *Flow) produceInternal(ctx context.Context, mustReturn bool) (proxy.Packet, error) {
for once := true; mustReturn || once; once = false {
log.Println(f.congestion)
var received []byte
select {
case received = <-f.inboundDatagrams:
case <-ctx.Done():
return nil, ctx.Err()
}
for i := range f.verifiers {
v := f.verifiers[len(f.verifiers)-i-1]
var err error
received, err = proxy.StripMac(received, v)
if err != nil {
return nil, err
}
}
p, err := UnmarshalPacket(received)
if err != nil {
return nil, err
}
// adjust congestion control based on this packet's congestion header
f.congestion.ReceivedPacket(p.seq, p.nack, p.ack)
// 12 bytes for header + the MAC + a timestamp
if len(p.Contents()) == 0 {
log.Println("handled keepalive/ack only packet")
continue
}
return p, nil
}
return nil, nil
}
func (f *Flow) queueDatagram(ctx context.Context, p []byte) error {
select {
case f.inboundDatagrams <- p:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (f *Flow) sendPacket(p Packet) error {
b := p.Marshal()
for _, g := range f.generators {
b = proxy.AppendMac(b, g)
}
if f.raddr == nil {
_, err := f.writer.Write(b)
return err
} else {
_, err := f.writer.WriteToUDP(b, f.raddr)
return err
}
}
func (f *Flow) earlyUpdateLoop(ctx context.Context, keepalive time.Duration) {
for f.isAlive {
seq, err := f.congestion.AwaitEarlyUpdate(ctx, keepalive)
if err != nil {
fmt.Printf("terminating earlyupdateloop for `%v`\n", f)
return
}
p := Packet{
seq: seq,
data: proxy.SimplePacket(nil),
ack: f.congestion.NextAck(),
nack: f.congestion.NextNack(),
}
err = f.sendPacket(p)
if err != nil {
fmt.Printf("error sending early update packet: `%v`\n", err)
}
}
}
func (f *Flow) readQueuePacket(ctx context.Context, c PacketConn) error {
// TODO: Replace 6000 with MTU+header size
buf := make([]byte, 6000)
n, _, err := c.ReadFromUDP(buf)
if err != nil {
return err
}
return f.queueDatagram(ctx, buf[:n])
}