package udp import ( "errors" "fmt" "log" "mpbl3p/proxy" "mpbl3p/shared" "net" "sync" "time" ) type PacketWriter interface { Write(b []byte) (int, error) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) LocalAddr() net.Addr } type PacketConn interface { PacketWriter ReadFromUDP(b []byte) (int, *net.UDPAddr, error) } type InitiatedFlow struct { Local func() string Remote string g proxy.MacGenerator keepalive time.Duration mu sync.RWMutex Flow } func (f *InitiatedFlow) String() string { return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local(), f.Remote) } type Flow struct { writer PacketWriter raddr *net.UDPAddr isAlive bool startup bool congestion Congestion v proxy.MacVerifier inboundDatagrams chan []byte } func (f Flow) String() string { return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr()) } func InitiateFlow( local func() string, remote string, v proxy.MacVerifier, g proxy.MacGenerator, c Congestion, keepalive time.Duration, ) (*InitiatedFlow, error) { f := InitiatedFlow{ Local: local, Remote: remote, Flow: newFlow(c, v), g: g, keepalive: keepalive, } return &f, nil } func newFlow(c Congestion, v proxy.MacVerifier) Flow { return Flow{ inboundDatagrams: make(chan []byte), congestion: c, v: v, } } func (f *InitiatedFlow) Reconnect() error { f.mu.Lock() defer f.mu.Unlock() if f.isAlive { return nil } localAddr, err := net.ResolveUDPAddr("udp", f.Local()) if err != nil { return err } remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote) if err != nil { return err } conn, err := net.DialUDP("udp", localAddr, remoteAddr) if err != nil { return err } f.writer = conn f.startup = true // prod the connection once a second until we get an ack, then consider it alive go func() { seq := f.congestion.Sequence() for !f.isAlive { p := Packet{ ack: 0, nack: 0, seq: seq, data: proxy.SimplePacket(nil), } _ = f.sendPacket(p, f.g) } }() go func() { _, _ = f.produceInternal(f.v, false) }() go f.earlyUpdateLoop(f.g, f.keepalive) if err := f.acceptPacket(conn); err != nil { return err } f.isAlive = true f.startup = false go func() { lockedAccept := func() { f.mu.RLock() defer f.mu.RUnlock() if err := f.acceptPacket(conn); err != nil { log.Println(err) } } for f.isAlive { log.Println("alive and listening for packets") lockedAccept() } log.Println("no longer alive") }() return nil } func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error { f.mu.RLock() defer f.mu.RUnlock() return f.Flow.Consume(p, g) } func (f *InitiatedFlow) Produce(v proxy.MacVerifier) (proxy.Packet, error) { f.mu.RLock() defer f.mu.RUnlock() return f.Flow.Produce(v) } func (f *Flow) IsAlive() bool { return f.isAlive } func (f *Flow) Consume(pp proxy.Packet, g proxy.MacGenerator) error { if !f.isAlive { return shared.ErrDeadConnection } log.Println(f.congestion) // Sequence is the congestion controllers opportunity to block log.Println("awaiting sequence") p := Packet{ seq: f.congestion.Sequence(), data: pp, } log.Println("received sequence") // Choose up to date ACK/NACK even after blocking p.ack = f.congestion.NextAck() p.nack = f.congestion.NextNack() return f.sendPacket(p, g) } func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) { if !f.isAlive { return nil, shared.ErrDeadConnection } return f.produceInternal(v, true) } func (f *Flow) produceInternal(v proxy.MacVerifier, mustReturn bool) (proxy.Packet, error) { for once := true; mustReturn || once; once = false { log.Println(f.congestion) b, err := proxy.StripMac(<-f.inboundDatagrams, v) if err != nil { return nil, err } p, err := UnmarshalPacket(b) if err != nil { return nil, err } // schedule an ack for this sequence number if p.seq != 0 { f.congestion.ReceivedPacket(p.seq) } // adjust our sending congestion control based on their acks if p.ack != 0 { f.congestion.ReceivedAck(p.ack) } // adjust our sending congestion control based on their nacks if p.nack != 0 { f.congestion.ReceivedNack(p.nack) } // 12 bytes for header + the MAC + a timestamp if len(b) == 12+f.v.CodeLength()+8 { log.Println("handled keepalive/ack only packet") continue } return p, nil } return nil, nil } func (f *Flow) handleDatagram(p []byte) { f.inboundDatagrams <- p } func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error { b := p.Marshal() b = proxy.AppendMac(b, g) if f.raddr == nil { _, err := f.writer.Write(b) return err } else { _, err := f.writer.WriteToUDP(b, f.raddr) return err } } func (f *Flow) earlyUpdateLoop(g proxy.MacGenerator, keepalive time.Duration) { var err error for !errors.Is(err, shared.ErrDeadConnection) { seq := f.congestion.AwaitEarlyUpdate(keepalive) p := Packet{ ack: f.congestion.NextAck(), nack: f.congestion.NextNack(), seq: seq, data: proxy.SimplePacket(nil), } _ = f.sendPacket(p, g) } } func (f *Flow) acceptPacket(c PacketConn) error { buf := make([]byte, 6000) n, _, err := c.ReadFromUDP(buf) if err != nil { return err } f.handleDatagram(buf[:n]) return nil }