package udp import ( "context" "log" "mpbl3p/proxy" "sync" "time" ) type InboundFlow struct { g proxy.MacGenerator inboundDatagrams chan []byte mu sync.RWMutex Flow } func newInboundFlow(ctx context.Context, v proxy.MacVerifier, g proxy.MacGenerator, c Congestion) (*InboundFlow, error) { f := InboundFlow{ g: g, inboundDatagrams: make(chan []byte), Flow: newFlow(c, v), } go f.processPackets(ctx) return &f, nil } func (f *InboundFlow) queueDatagram(ctx context.Context, p []byte) error { select { case f.inboundDatagrams <- p: return nil case <-ctx.Done(): return ctx.Err() } } func (f *InboundFlow) processPackets(ctx context.Context) { for { f.mu.Lock() var err error for once := true; once || err == nil; once = false { err = f.handleExchanges(ctx) if err != nil { log.Println(err) } } f.mu.Unlock() var p []byte select { case p = <-f.inboundDatagrams: case <-ctx.Done(): return } // TODO: Check if p means redo exchanges if false { continue } select { case f.Flow.inboundDatagrams <- p: case <-ctx.Done(): return } } } func (f *InboundFlow) handleExchanges(ctx context.Context) error { var exchanges []proxy.Exchange if e, ok := f.congestion.(proxy.Exchange); ok { exchanges = append(exchanges, e) } var exchangeData [][]byte for _, e := range exchanges { for once := true; once || !e.Complete(); once = false { if err := func() (err error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() var recv []byte select { case recv = <-f.inboundDatagrams: case <-ctx.Done(): return ctx.Err() } var resp, data []byte if resp, data, err = e.Handle(ctx, recv); err != nil { return err } if data != nil { exchangeData = append(exchangeData, data) } if resp != nil { if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { return err } } return nil }(); err != nil { return err } } } } func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { f.mu.RLock() defer f.mu.RUnlock() return f.Flow.Consume(ctx, p, g) } func (f *InboundFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { f.mu.RLock() defer f.mu.RUnlock() return f.Flow.Produce(ctx, v) }