143 lines
2.4 KiB
Go
143 lines
2.4 KiB
Go
package udp
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"mpbl3p/proxy"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type InboundFlow struct {
|
|
inboundDatagrams chan []byte
|
|
|
|
mu sync.RWMutex
|
|
Flow
|
|
}
|
|
|
|
func newInboundFlow(f Flow) (*InboundFlow, error) {
|
|
fi := InboundFlow{
|
|
inboundDatagrams: make(chan []byte),
|
|
Flow: f,
|
|
}
|
|
|
|
return &fi, nil
|
|
}
|
|
|
|
func (f *InboundFlow) queueDatagram(ctx context.Context, p []byte) error {
|
|
select {
|
|
case f.inboundDatagrams <- p:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (f *InboundFlow) processPackets(ctx context.Context) {
|
|
for {
|
|
f.mu.Lock()
|
|
|
|
var err error
|
|
for once := true; err != nil || once; once = false {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
err = f.handleExchanges(ctx)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
|
|
f.mu.Unlock()
|
|
|
|
var p []byte
|
|
select {
|
|
case p = <-f.inboundDatagrams:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
// TODO: Check if p means redo exchanges
|
|
if false {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case f.Flow.inboundDatagrams <- p:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *InboundFlow) handleExchanges(ctx context.Context) error {
|
|
var exchanges []proxy.Exchange
|
|
|
|
if e, ok := f.congestion.(proxy.Exchange); ok {
|
|
exchanges = append(exchanges, e)
|
|
}
|
|
|
|
var exchangeData [][]byte
|
|
|
|
for _, e := range exchanges {
|
|
for once := true; !e.Complete() || once; once = false {
|
|
if err := func() (err error) {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
var recv []byte
|
|
select {
|
|
case recv = <-f.inboundDatagrams:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
for i := range f.verifiers {
|
|
v := f.verifiers[len(f.verifiers)-i-1]
|
|
|
|
recv, err = proxy.StripMac(recv, v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var resp, data []byte
|
|
if resp, data, err = e.Handle(ctx, recv); err != nil {
|
|
return err
|
|
}
|
|
|
|
if data != nil {
|
|
exchangeData = append(exchangeData, data)
|
|
}
|
|
|
|
if resp != nil {
|
|
if err = f.sendPacket(proxy.SimplePacket(resp)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet) error {
|
|
f.mu.RLock()
|
|
defer f.mu.RUnlock()
|
|
|
|
return f.Flow.Consume(ctx, p)
|
|
}
|
|
|
|
func (f *InboundFlow) Produce(ctx context.Context) (proxy.Packet, error) {
|
|
f.mu.RLock()
|
|
defer f.mu.RUnlock()
|
|
|
|
return f.Flow.Produce(ctx)
|
|
}
|