diff --git a/udp/flow.go b/udp/flow.go index 2ab941f..ee4a357 100644 --- a/udp/flow.go +++ b/udp/flow.go @@ -2,13 +2,11 @@ package udp import ( "context" - "errors" "fmt" "log" "mpbl3p/proxy" "mpbl3p/shared" "net" - "sync" "time" ) @@ -24,21 +22,6 @@ type PacketConn interface { ReadFromUDP(b []byte) (int, *net.UDPAddr, error) } -type InitiatedFlow struct { - Local func() string - Remote string - - g proxy.MacGenerator - keepalive time.Duration - - mu sync.RWMutex - Flow -} - -func (f *InitiatedFlow) String() string { - return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local(), f.Remote) -} - type Flow struct { writer PacketWriter raddr *net.UDPAddr @@ -55,25 +38,6 @@ func (f Flow) String() string { return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr()) } -func InitiateFlow( - local func() string, - remote string, - v proxy.MacVerifier, - g proxy.MacGenerator, - c Congestion, - keepalive time.Duration, -) (*InitiatedFlow, error) { - f := InitiatedFlow{ - Local: local, - Remote: remote, - Flow: newFlow(c, v), - g: g, - keepalive: keepalive, - } - - return &f, nil -} - func newFlow(c Congestion, v proxy.MacVerifier) Flow { return Flow{ inboundDatagrams: make(chan []byte), @@ -82,133 +46,6 @@ func newFlow(c Congestion, v proxy.MacVerifier) Flow { } } -func (f *InitiatedFlow) Reconnect(ctx context.Context) error { - f.mu.Lock() - defer f.mu.Unlock() - - if f.isAlive { - return nil - } - - localAddr, err := net.ResolveUDPAddr("udp", f.Local()) - if err != nil { - return err - } - - remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote) - if err != nil { - return err - } - - conn, err := net.DialUDP("udp", localAddr, remoteAddr) - if err != nil { - return err - } - - f.writer = conn - - // prod the connection once a second until we get an ack, then consider it alive - var exchanges []proxy.Exchange - - if e, ok := f.congestion.(proxy.Exchange); ok { - exchanges = append(exchanges, e) - } - - var exchangeData [][]byte - - for _, e := range exchanges { - i, err := e.Initial(ctx) - if err != nil { - return err - } - - if err = f.sendPacket(proxy.SimplePacket(i), f.g); err != nil { - return err - } - - for once := true; once || !e.Complete(); once = false { - if err := func() error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - var recv []byte - if recv, err = f.readPacket(ctx, conn); err != nil { - return err - } - - var resp, data []byte - if resp, data, err = e.Handle(ctx, recv); err != nil { - return err - } - - if data != nil { - exchangeData = append(exchangeData, data) - } - - if resp != nil { - if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { - return err - } - } - - return nil - }(); err != nil { - return err - } - } - } - - go func() { - for _, d := range exchangeData { - if err := f.queueDatagram(ctx, d); err != nil { - return - } - } - - lockedAccept := func() { - f.mu.RLock() - defer f.mu.RUnlock() - - var p []byte - if p, err = f.readPacket(ctx, conn); err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - return - } - log.Println(err) - return - } - - if err := f.queueDatagram(ctx, p); err != nil { - return - } - - } - - for f.isAlive { - log.Println("alive and listening for packets") - lockedAccept() - } - log.Println("no longer alive") - }() - - f.isAlive = true - return nil -} - -func (f *InitiatedFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { - f.mu.RLock() - defer f.mu.RUnlock() - - return f.Flow.Consume(ctx, p, g) -} - -func (f *InitiatedFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { - f.mu.RLock() - defer f.mu.RUnlock() - - return f.Flow.Produce(ctx, v) -} - func (f *Flow) IsAlive() bool { return f.isAlive } diff --git a/udp/inbound_flow.go b/udp/inbound_flow.go new file mode 100644 index 0000000..e07740c --- /dev/null +++ b/udp/inbound_flow.go @@ -0,0 +1,130 @@ +package udp + +import ( + "context" + "log" + "mpbl3p/proxy" + "sync" + "time" +) + +type InboundFlow struct { + g proxy.MacGenerator + inboundDatagrams chan []byte + + mu sync.RWMutex + Flow +} + +func newInboundFlow(ctx context.Context, v proxy.MacVerifier, g proxy.MacGenerator, c Congestion) (*InboundFlow, error) { + f := InboundFlow{ + g: g, + inboundDatagrams: make(chan []byte), + Flow: newFlow(c, v), + } + + go f.processPackets(ctx) + return &f, nil +} + +func (f *InboundFlow) queueDatagram(ctx context.Context, p []byte) error { + select { + case f.inboundDatagrams <- p: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (f *InboundFlow) processPackets(ctx context.Context) { + for { + f.mu.Lock() + + var err error + for once := true; once || err == nil; once = false { + err = f.handleExchanges(ctx) + if err != nil { + log.Println(err) + } + } + + f.mu.Unlock() + + var p []byte + select { + case p = <-f.inboundDatagrams: + case <-ctx.Done(): + return + } + + // TODO: Check if p means redo exchanges + if false { + continue + } + + select { + case f.Flow.inboundDatagrams <- p: + case <-ctx.Done(): + return + } + } +} + +func (f *InboundFlow) handleExchanges(ctx context.Context) error { + var exchanges []proxy.Exchange + + if e, ok := f.congestion.(proxy.Exchange); ok { + exchanges = append(exchanges, e) + } + + var exchangeData [][]byte + + for _, e := range exchanges { + for once := true; once || !e.Complete(); once = false { + if err := func() (err error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var recv []byte + select { + case recv = <-f.inboundDatagrams: + case <-ctx.Done(): + return ctx.Err() + } + + var resp, data []byte + if resp, data, err = e.Handle(ctx, recv); err != nil { + return err + } + + if data != nil { + exchangeData = append(exchangeData, data) + } + + if resp != nil { + if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { + return err + } + } + + return nil + }(); err != nil { + return err + } + } + } +} + +func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Consume(ctx, p, g) +} + +func (f *InboundFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Produce(ctx, v) +} diff --git a/udp/listener.go b/udp/listener.go index 0a1e661..67dd9d0 100644 --- a/udp/listener.go +++ b/udp/listener.go @@ -55,11 +55,14 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro panic(err) } + // TODO: Support congestion exchange here + raddr := fromUdpAddress(*addr) if f, exists := receivedConnections[raddr]; exists { log.Println("existing flow. queuing...") if err := f.queueDatagram(ctx, buf[:n]); err != nil { - + log.Println("error") + continue } log.Println("queued") continue diff --git a/udp/outbound_flow.go b/udp/outbound_flow.go new file mode 100644 index 0000000..f76aaea --- /dev/null +++ b/udp/outbound_flow.go @@ -0,0 +1,173 @@ +package udp + +import ( + "context" + "errors" + "fmt" + "log" + "mpbl3p/proxy" + "net" + "sync" + "time" +) + +type OutboundFlow struct { + Local func() string + Remote string + + g proxy.MacGenerator + keepalive time.Duration + + mu sync.RWMutex + Flow +} + +func InitiateFlow( + local func() string, + remote string, + v proxy.MacVerifier, + g proxy.MacGenerator, + c Congestion, + keepalive time.Duration, +) (*OutboundFlow, error) { + f := OutboundFlow{ + Local: local, + Remote: remote, + Flow: newFlow(c, v), + g: g, + keepalive: keepalive, + } + + return &f, nil +} + +func (f *OutboundFlow) String() string { + return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local(), f.Remote) +} + +func (f *OutboundFlow) Reconnect(ctx context.Context) error { + f.mu.Lock() + defer f.mu.Unlock() + + if f.isAlive { + return nil + } + + localAddr, err := net.ResolveUDPAddr("udp", f.Local()) + if err != nil { + return err + } + + remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote) + if err != nil { + return err + } + + conn, err := net.DialUDP("udp", localAddr, remoteAddr) + if err != nil { + return err + } + + f.writer = conn + + // prod the connection once a second until we get an ack, then consider it alive + var exchanges []proxy.Exchange + + if e, ok := f.congestion.(proxy.Exchange); ok { + exchanges = append(exchanges, e) + } + + var exchangeData [][]byte + + for _, e := range exchanges { + i, err := e.Initial(ctx) + if err != nil { + return err + } + + if err = f.sendPacket(proxy.SimplePacket(i), f.g); err != nil { + return err + } + + for once := true; once || !e.Complete(); once = false { + if err := func() error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var recv []byte + if recv, err = f.readPacket(ctx, conn); err != nil { + return err + } + + var resp, data []byte + if resp, data, err = e.Handle(ctx, recv); err != nil { + return err + } + + if data != nil { + exchangeData = append(exchangeData, data) + } + + if resp != nil { + if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { + return err + } + } + + return nil + }(); err != nil { + return err + } + } + } + + go func() { + for _, d := range exchangeData { + if err := f.queueDatagram(ctx, d); err != nil { + return + } + } + + lockedAccept := func() { + f.mu.RLock() + defer f.mu.RUnlock() + + var p []byte + if p, err = f.readPacket(ctx, conn); err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return + } + log.Println(err) + return + } + + if err := f.queueDatagram(ctx, p); err != nil { + return + } + + } + + for f.isAlive { + log.Println("alive and listening for packets") + lockedAccept() + } + log.Println("no longer alive") + }() + + f.isAlive = true + return nil +} + +func (f *OutboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Consume(ctx, p, g) +} + +func (f *OutboundFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Produce(ctx, v) +}