dissertation-2-code/udp/flow.go
Jake Hillion 179025ad2b
All checks were successful
continuous-integration/drone/push Build is passing
refactored timestamping
2020-11-28 17:15:56 +00:00

286 lines
5.2 KiB
Go

package udp
import (
"errors"
"fmt"
"log"
"mpbl3p/proxy"
"mpbl3p/shared"
"net"
"sync"
"time"
)
type PacketWriter interface {
Write(b []byte) (int, error)
WriteToUDP(b []byte, addr *net.UDPAddr) (int, error)
LocalAddr() net.Addr
}
type PacketConn interface {
PacketWriter
ReadFromUDP(b []byte) (int, *net.UDPAddr, error)
}
type InitiatedFlow struct {
Local string
Remote string
g proxy.MacGenerator
keepalive time.Duration
mu sync.RWMutex
Flow
}
func (f *InitiatedFlow) String() string {
return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local, f.Remote)
}
type Flow struct {
writer PacketWriter
raddr *net.UDPAddr
isAlive bool
startup bool
congestion Congestion
v proxy.MacVerifier
inboundDatagrams chan []byte
}
func (f Flow) String() string {
return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr())
}
func InitiateFlow(
local, remote string,
v proxy.MacVerifier,
g proxy.MacGenerator,
c Congestion,
keepalive time.Duration,
) (*InitiatedFlow, error) {
f := InitiatedFlow{
Local: local,
Remote: remote,
Flow: newFlow(c, v),
g: g,
keepalive: keepalive,
}
return &f, nil
}
func newFlow(c Congestion, v proxy.MacVerifier) Flow {
return Flow{
inboundDatagrams: make(chan []byte),
congestion: c,
v: v,
}
}
func (f *InitiatedFlow) Reconnect() error {
f.mu.Lock()
defer f.mu.Unlock()
if f.isAlive {
return nil
}
localAddr, err := net.ResolveUDPAddr("udp", f.Local)
if err != nil {
return err
}
remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote)
if err != nil {
return err
}
conn, err := net.DialUDP("udp", localAddr, remoteAddr)
if err != nil {
return err
}
f.writer = conn
f.startup = true
// prod the connection once a second until we get an ack, then consider it alive
go func() {
seq := f.congestion.Sequence()
for !f.isAlive {
p := Packet{
ack: 0,
nack: 0,
seq: seq,
data: proxy.SimplePacket(nil),
}
_ = f.sendPacket(p, f.g)
}
}()
go func() {
_, _ = f.produceInternal(f.v, false)
}()
go f.earlyUpdateLoop(f.g, f.keepalive)
if err := f.acceptPacket(conn); err != nil {
return err
}
f.isAlive = true
f.startup = false
go func() {
lockedAccept := func() {
f.mu.RLock()
defer f.mu.RUnlock()
if err := f.acceptPacket(conn); err != nil {
log.Println(err)
}
}
for f.isAlive {
log.Println("alive and listening for packets")
lockedAccept()
}
log.Println("no longer alive")
}()
return nil
}
func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(p, g)
}
func (f *InitiatedFlow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(v)
}
func (f *Flow) IsAlive() bool {
return f.isAlive
}
func (f *Flow) Consume(pp proxy.Packet, g proxy.MacGenerator) error {
if !f.isAlive {
return shared.ErrDeadConnection
}
log.Println(f.congestion)
// Sequence is the congestion controllers opportunity to block
log.Println("awaiting sequence")
p := Packet{
seq: f.congestion.Sequence(),
data: pp,
}
log.Println("received sequence")
// Choose up to date ACK/NACK even after blocking
p.ack = f.congestion.NextAck()
p.nack = f.congestion.NextNack()
return f.sendPacket(p, g)
}
func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
if !f.isAlive {
return nil, shared.ErrDeadConnection
}
return f.produceInternal(v, true)
}
func (f *Flow) produceInternal(v proxy.MacVerifier, mustReturn bool) (proxy.Packet, error) {
for once := true; mustReturn || once; once = false {
log.Println(f.congestion)
b, err := proxy.StripMac(<-f.inboundDatagrams, v)
if err != nil {
return nil, err
}
p, err := UnmarshalPacket(b)
if err != nil {
return nil, err
}
// schedule an ack for this sequence number
if p.seq != 0 {
f.congestion.ReceivedPacket(p.seq)
}
// adjust our sending congestion control based on their acks
if p.ack != 0 {
f.congestion.ReceivedAck(p.ack)
}
// adjust our sending congestion control based on their nacks
if p.nack != 0 {
f.congestion.ReceivedNack(p.nack)
}
// 12 bytes for header + the MAC + a timestamp
if len(b) == 12+f.v.CodeLength()+8 {
log.Println("handled keepalive/ack only packet")
continue
}
return p, nil
}
return nil, nil
}
func (f *Flow) handleDatagram(p []byte) {
f.inboundDatagrams <- p
}
func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error {
b := p.Marshal()
b = proxy.AppendMac(b, g)
if f.raddr == nil {
_, err := f.writer.Write(b)
return err
} else {
_, err := f.writer.WriteToUDP(b, f.raddr)
return err
}
}
func (f *Flow) earlyUpdateLoop(g proxy.MacGenerator, keepalive time.Duration) {
var err error
for !errors.Is(err, shared.ErrDeadConnection) {
seq := f.congestion.AwaitEarlyUpdate(keepalive)
p := Packet{
ack: f.congestion.NextAck(),
nack: f.congestion.NextNack(),
seq: seq,
data: proxy.SimplePacket(nil),
}
_ = f.sendPacket(p, g)
}
}
func (f *Flow) acceptPacket(c PacketConn) error {
buf := make([]byte, 6000)
n, _, err := c.ReadFromUDP(buf)
if err != nil {
return err
}
f.handleDatagram(buf[:n])
return nil
}