dissertation-2-code/udp/flow.go
Jake Hillion 898496fe94
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
Missed files
2021-05-13 22:48:47 +01:00

197 lines
3.8 KiB
Go

package udp
import (
"context"
"fmt"
"log"
"mpbl3p/proxy"
"mpbl3p/shared"
"net"
"time"
)
type PacketWriter interface {
Write(b []byte) (int, error)
WriteToUDP(b []byte, addr *net.UDPAddr) (int, error)
LocalAddr() net.Addr
}
type PacketConn interface {
PacketWriter
SetReadDeadline(t time.Time) error
ReadFromUDP(b []byte) (int, *net.UDPAddr, error)
}
type Flow struct {
writer PacketWriter
raddr *net.UDPAddr
isAlive bool
congestion Congestion
verifiers []proxy.MacVerifier
generators []proxy.MacGenerator
inboundDatagrams chan []byte
}
func (f Flow) String() string {
return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr())
}
func newFlow(c Congestion, vs []proxy.MacVerifier, gs []proxy.MacGenerator) Flow {
return Flow{
inboundDatagrams: make(chan []byte),
congestion: c,
verifiers: vs,
generators: gs,
}
}
func (f *Flow) IsAlive() bool {
return f.isAlive
}
func (f *Flow) Consume(ctx context.Context, pp proxy.Packet) error {
if !f.isAlive {
return shared.ErrDeadConnection
}
log.Println(f.congestion)
// Sequence is the congestion controllers opportunity to block
log.Println("awaiting sequence")
seq, err := f.congestion.Sequence(ctx)
if err != nil {
return err
}
log.Println("received sequence")
// Choose up to date ACK/NACK even after blocking
p := Packet{
seq: seq,
data: pp,
ack: f.congestion.NextAck(),
nack: f.congestion.NextNack(),
}
return f.sendPacket(p)
}
func (f *Flow) Produce(ctx context.Context) (proxy.Packet, error) {
if !f.isAlive {
return nil, shared.ErrDeadConnection
}
return f.produceInternal(ctx, true)
}
func (f *Flow) produceInternal(ctx context.Context, mustReturn bool) (proxy.Packet, error) {
for once := true; mustReturn || once; once = false {
log.Println(f.congestion)
var received []byte
select {
case received = <-f.inboundDatagrams:
case <-ctx.Done():
return nil, ctx.Err()
}
for i := range f.verifiers {
v := f.verifiers[len(f.verifiers)-i-1]
var err error
received, err = proxy.StripMac(received, v)
if err != nil {
return nil, err
}
}
p, err := UnmarshalPacket(received)
if err != nil {
return nil, err
}
// adjust congestion control based on this packet's congestion header
f.congestion.ReceivedPacket(p.seq, p.nack, p.ack)
// 12 bytes for header + the MAC + a timestamp
if len(p.Contents()) == 0 {
log.Println("handled keepalive/ack only packet")
continue
}
return p, nil
}
return nil, nil
}
func (f *Flow) queueDatagram(ctx context.Context, p []byte) error {
select {
case f.inboundDatagrams <- p:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (f *Flow) sendPacket(p proxy.Packet) error {
b := p.Marshal()
for _, g := range f.generators {
b = proxy.AppendMac(b, g)
}
if f.raddr == nil {
_, err := f.writer.Write(b)
return err
} else {
_, err := f.writer.WriteToUDP(b, f.raddr)
return err
}
}
func (f *Flow) earlyUpdateLoop(ctx context.Context, keepalive time.Duration) {
for f.isAlive {
seq, err := f.congestion.AwaitEarlyUpdate(ctx, keepalive)
if err != nil {
fmt.Printf("terminating earlyupdateloop for `%v`\n", f)
return
}
p := Packet{
seq: seq,
data: proxy.SimplePacket(nil),
ack: f.congestion.NextAck(),
nack: f.congestion.NextNack(),
}
err = f.sendPacket(p)
if err != nil {
fmt.Printf("error sending early update packet: `%v`\n", err)
}
}
}
func (f *Flow) readPacket(ctx context.Context, c PacketConn) ([]byte, error) {
buf := make([]byte, 6000)
if d, ok := ctx.Deadline(); ok {
if err := c.SetReadDeadline(d); err != nil {
return nil, err
}
}
n, _, err := c.ReadFromUDP(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
if ctx.Err() != nil {
return nil, ctx.Err()
}
}
return nil, err
}
return buf[:n], nil
}