diff --git a/udp/inbound_flow.go b/udp/inbound_flow.go index e07740c..d3e5fb0 100644 --- a/udp/inbound_flow.go +++ b/udp/inbound_flow.go @@ -16,15 +16,14 @@ type InboundFlow struct { Flow } -func newInboundFlow(ctx context.Context, v proxy.MacVerifier, g proxy.MacGenerator, c Congestion) (*InboundFlow, error) { - f := InboundFlow{ +func newInboundFlow(f Flow, g proxy.MacGenerator) (*InboundFlow, error) { + fi := InboundFlow{ g: g, inboundDatagrams: make(chan []byte), - Flow: newFlow(c, v), + Flow: f, } - go f.processPackets(ctx) - return &f, nil + return &fi, nil } func (f *InboundFlow) queueDatagram(ctx context.Context, p []byte) error { @@ -113,6 +112,8 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { } } } + + return nil } func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { diff --git a/udp/listener.go b/udp/listener.go index 67dd9d0..4a20934 100644 --- a/udp/listener.go +++ b/udp/listener.go @@ -38,7 +38,7 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro return err } - receivedConnections := make(map[ComparableUdpAddress]*Flow) + receivedConnections := make(map[ComparableUdpAddress]*InboundFlow) go func() { for ctx.Err() == nil { @@ -55,8 +55,6 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro panic(err) } - // TODO: Support congestion exchange here - raddr := fromUdpAddress(*addr) if f, exists := receivedConnections[raddr]; exists { log.Println("existing flow. queuing...") @@ -77,14 +75,21 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro f.raddr = addr f.isAlive = true + fi, err := newInboundFlow(f, g) + if err != nil { + log.Println(err) + continue + } + log.Printf("received new udp connection: %v\n", f) - go f.earlyUpdateLoop(ctx, g, 0) + go fi.processPackets(ctx) + go fi.earlyUpdateLoop(ctx, g, 0) - receivedConnections[raddr] = &f + receivedConnections[raddr] = fi - p.AddConsumer(ctx, &f, g) - p.AddProducer(ctx, &f, v) + p.AddConsumer(ctx, fi, g) + p.AddProducer(ctx, fi, v) log.Println("handling...") if err := f.queueDatagram(ctx, buf[:n]); err != nil {