dissertation-2-code/udp/flow.go

286 lines
5.2 KiB
Go
Raw Normal View History

2020-11-25 19:35:31 +00:00
package udp
import (
2020-11-26 22:10:37 +00:00
"errors"
"fmt"
"log"
2020-11-25 19:35:31 +00:00
"mpbl3p/proxy"
2020-11-26 22:10:37 +00:00
"mpbl3p/shared"
2020-11-25 19:35:31 +00:00
"net"
2020-11-26 18:55:29 +00:00
"sync"
2020-11-26 22:10:37 +00:00
"time"
2020-11-25 19:35:31 +00:00
)
type PacketWriter interface {
2020-11-26 22:39:07 +00:00
Write(b []byte) (int, error)
2020-11-26 22:46:37 +00:00
WriteToUDP(b []byte, addr *net.UDPAddr) (int, error)
2020-11-26 22:10:37 +00:00
LocalAddr() net.Addr
2020-11-25 19:35:31 +00:00
}
type PacketConn interface {
PacketWriter
ReadFromUDP(b []byte) (int, *net.UDPAddr, error)
}
2020-11-26 18:55:29 +00:00
type InitiatedFlow struct {
2020-11-26 22:10:37 +00:00
Local string
2020-11-26 18:55:29 +00:00
Remote string
2020-11-25 19:35:31 +00:00
2020-11-26 22:10:37 +00:00
g proxy.MacGenerator
keepalive time.Duration
2020-11-26 18:55:29 +00:00
mu sync.RWMutex
Flow
2020-11-25 19:35:31 +00:00
}
2020-11-26 22:10:37 +00:00
func (f *InitiatedFlow) String() string {
return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local, f.Remote)
}
2020-11-25 19:35:31 +00:00
type Flow struct {
writer PacketWriter
2020-11-26 22:46:37 +00:00
raddr *net.UDPAddr
2020-11-25 19:35:31 +00:00
isAlive bool
2020-11-27 17:31:32 +00:00
startup bool
2020-11-25 19:35:31 +00:00
congestion Congestion
2020-11-26 18:55:29 +00:00
2020-11-26 22:15:22 +00:00
v proxy.MacVerifier
2020-11-26 18:55:29 +00:00
inboundDatagrams chan []byte
}
2020-11-26 22:10:37 +00:00
func (f Flow) String() string {
return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr())
}
func InitiateFlow(
local, remote string,
2020-11-26 22:15:22 +00:00
v proxy.MacVerifier,
2020-11-26 22:10:37 +00:00
g proxy.MacGenerator,
c Congestion,
keepalive time.Duration,
) (*InitiatedFlow, error) {
f := InitiatedFlow{
Local: local,
Remote: remote,
2020-11-26 22:15:22 +00:00
Flow: newFlow(c, v),
2020-11-26 22:10:37 +00:00
g: g,
keepalive: keepalive,
2020-11-26 18:55:29 +00:00
}
2020-11-26 22:10:37 +00:00
return &f, nil
2020-11-26 18:55:29 +00:00
}
2020-11-26 22:15:22 +00:00
func newFlow(c Congestion, v proxy.MacVerifier) Flow {
2020-11-26 22:10:37 +00:00
return Flow{
2020-11-26 18:55:29 +00:00
inboundDatagrams: make(chan []byte),
2020-11-26 22:10:37 +00:00
congestion: c,
2020-11-26 22:15:22 +00:00
v: v,
2020-11-26 18:55:29 +00:00
}
}
2020-11-26 22:10:37 +00:00
func (f *InitiatedFlow) Reconnect() error {
f.mu.Lock()
2020-11-27 20:17:59 +00:00
defer f.mu.Unlock()
2020-11-26 22:10:37 +00:00
if f.isAlive {
return nil
2020-11-26 18:55:29 +00:00
}
2020-11-26 22:10:37 +00:00
localAddr, err := net.ResolveUDPAddr("udp", f.Local)
if err != nil {
return err
}
remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote)
if err != nil {
return err
}
conn, err := net.DialUDP("udp", localAddr, remoteAddr)
if err != nil {
return err
}
f.writer = conn
2020-11-27 17:31:32 +00:00
f.startup = true
2020-11-26 22:10:37 +00:00
2020-11-27 17:31:32 +00:00
// prod the connection once a second until we get an ack, then consider it alive
2020-11-26 22:10:37 +00:00
go func() {
2020-11-27 17:31:32 +00:00
seq := f.congestion.Sequence()
for !f.isAlive {
p := Packet{
ack: 0,
nack: 0,
seq: seq,
2020-11-28 17:15:56 +00:00
data: proxy.SimplePacket(nil),
2020-11-26 22:10:37 +00:00
}
2020-11-27 17:31:32 +00:00
_ = f.sendPacket(p, f.g)
2020-11-26 22:10:37 +00:00
}
}()
go func() {
2020-11-27 20:17:59 +00:00
_, _ = f.produceInternal(f.v, false)
2020-11-26 22:10:37 +00:00
}()
2020-11-27 20:17:59 +00:00
go f.earlyUpdateLoop(f.g, f.keepalive)
if err := f.acceptPacket(conn); err != nil {
return err
}
f.isAlive = true
f.startup = false
2020-11-26 22:10:37 +00:00
2020-11-27 17:31:32 +00:00
go func() {
2020-11-27 20:17:59 +00:00
lockedAccept := func() {
f.mu.RLock()
defer f.mu.RUnlock()
if err := f.acceptPacket(conn); err != nil {
log.Println(err)
}
}
for f.isAlive {
log.Println("alive and listening for packets")
lockedAccept()
}
log.Println("no longer alive")
2020-11-27 17:31:32 +00:00
}()
2020-11-26 22:10:37 +00:00
return nil
}
func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(p, g)
}
func (f *InitiatedFlow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(v)
2020-11-25 19:35:31 +00:00
}
func (f *Flow) IsAlive() bool {
return f.isAlive
}
func (f *Flow) Consume(pp proxy.Packet, g proxy.MacGenerator) error {
2020-11-26 22:10:37 +00:00
if !f.isAlive {
return shared.ErrDeadConnection
}
2020-11-27 17:31:32 +00:00
log.Println(f.congestion)
2020-11-26 22:10:37 +00:00
// Sequence is the congestion controllers opportunity to block
2020-11-27 17:31:32 +00:00
log.Println("awaiting sequence")
2020-11-25 19:35:31 +00:00
p := Packet{
seq: f.congestion.Sequence(),
data: pp,
}
2020-11-27 17:31:32 +00:00
log.Println("received sequence")
2020-11-25 19:35:31 +00:00
2020-11-27 17:31:32 +00:00
// Choose up to date ACK/NACK even after blocking
2020-11-25 19:35:31 +00:00
p.ack = f.congestion.NextAck()
p.nack = f.congestion.NextNack()
2020-11-27 17:31:32 +00:00
return f.sendPacket(p, g)
2020-11-25 19:35:31 +00:00
}
func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
2020-11-26 22:10:37 +00:00
if !f.isAlive {
return nil, shared.ErrDeadConnection
}
2020-11-27 17:31:32 +00:00
return f.produceInternal(v, true)
2020-11-25 19:35:31 +00:00
}
2020-11-27 17:31:32 +00:00
func (f *Flow) produceInternal(v proxy.MacVerifier, mustReturn bool) (proxy.Packet, error) {
for once := true; mustReturn || once; once = false {
log.Println(f.congestion)
b, err := proxy.StripMac(<-f.inboundDatagrams, v)
2020-11-26 22:10:37 +00:00
if err != nil {
2020-11-27 17:31:32 +00:00
return nil, err
2020-11-26 22:10:37 +00:00
}
p, err := UnmarshalPacket(b)
if err != nil {
2020-11-27 17:31:32 +00:00
return nil, err
2020-11-26 22:10:37 +00:00
}
2020-11-27 17:31:32 +00:00
// schedule an ack for this sequence number
if p.seq != 0 {
f.congestion.ReceivedPacket(p.seq)
}
// adjust our sending congestion control based on their acks
2020-11-26 22:10:37 +00:00
if p.ack != 0 {
f.congestion.ReceivedAck(p.ack)
}
2020-11-27 17:31:32 +00:00
// adjust our sending congestion control based on their nacks
2020-11-26 22:10:37 +00:00
if p.nack != 0 {
f.congestion.ReceivedNack(p.nack)
}
2020-11-27 17:31:32 +00:00
// 12 bytes for header + the MAC + a timestamp
if len(b) == 12+f.v.CodeLength()+8 {
log.Println("handled keepalive/ack only packet")
continue
}
return p, nil
}
return nil, nil
}
func (f *Flow) handleDatagram(p []byte) {
f.inboundDatagrams <- p
}
func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error {
b := p.Marshal()
b = proxy.AppendMac(b, g)
if f.raddr == nil {
_, err := f.writer.Write(b)
return err
2020-11-26 22:10:37 +00:00
} else {
2020-11-27 17:31:32 +00:00
_, err := f.writer.WriteToUDP(b, f.raddr)
return err
}
}
func (f *Flow) earlyUpdateLoop(g proxy.MacGenerator, keepalive time.Duration) {
var err error
for !errors.Is(err, shared.ErrDeadConnection) {
seq := f.congestion.AwaitEarlyUpdate(keepalive)
p := Packet{
ack: f.congestion.NextAck(),
nack: f.congestion.NextNack(),
seq: seq,
2020-11-28 17:15:56 +00:00
data: proxy.SimplePacket(nil),
2020-11-27 17:31:32 +00:00
}
_ = f.sendPacket(p, g)
2020-11-26 22:10:37 +00:00
}
2020-11-25 19:35:31 +00:00
}
2020-11-27 20:17:59 +00:00
func (f *Flow) acceptPacket(c PacketConn) error {
buf := make([]byte, 6000)
n, _, err := c.ReadFromUDP(buf)
if err != nil {
return err
}
f.handleDatagram(buf[:n])
return nil
}