dissertation-2-code/udp/inbound_flow.go
Jake Hillion 1e8c7243a1
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing
formatting
2021-05-13 22:49:12 +01:00

143 lines
2.4 KiB
Go

package udp
import (
"context"
"log"
"mpbl3p/proxy"
"sync"
"time"
)
type InboundFlow struct {
inboundDatagrams chan []byte
mu sync.RWMutex
Flow
}
func newInboundFlow(f Flow) (*InboundFlow, error) {
fi := InboundFlow{
inboundDatagrams: make(chan []byte),
Flow: f,
}
return &fi, nil
}
func (f *InboundFlow) queueDatagram(ctx context.Context, p []byte) error {
select {
case f.inboundDatagrams <- p:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (f *InboundFlow) processPackets(ctx context.Context) {
for {
f.mu.Lock()
var err error
for once := true; err != nil || once; once = false {
if ctx.Err() != nil {
return
}
err = f.handleExchanges(ctx)
if err != nil {
log.Println(err)
}
}
f.mu.Unlock()
var p []byte
select {
case p = <-f.inboundDatagrams:
case <-ctx.Done():
return
}
// TODO: Check if p means redo exchanges
if false {
continue
}
select {
case f.Flow.inboundDatagrams <- p:
case <-ctx.Done():
return
}
}
}
func (f *InboundFlow) handleExchanges(ctx context.Context) error {
var exchanges []proxy.Exchange
if e, ok := f.congestion.(proxy.Exchange); ok {
exchanges = append(exchanges, e)
}
var exchangeData [][]byte
for _, e := range exchanges {
for once := true; !e.Complete() || once; once = false {
if err := func() (err error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var recv []byte
select {
case recv = <-f.inboundDatagrams:
case <-ctx.Done():
return ctx.Err()
}
for i := range f.verifiers {
v := f.verifiers[len(f.verifiers)-i-1]
recv, err = proxy.StripMac(recv, v)
if err != nil {
return err
}
}
var resp, data []byte
if resp, data, err = e.Handle(ctx, recv); err != nil {
return err
}
if data != nil {
exchangeData = append(exchangeData, data)
}
if resp != nil {
if err = f.sendPacket(proxy.SimplePacket(resp)); err != nil {
return err
}
}
return nil
}(); err != nil {
return err
}
}
}
return nil
}
func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(ctx, p)
}
func (f *InboundFlow) Produce(ctx context.Context) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(ctx)
}