dissertation-2-code/udp/outbound_flow.go
Jake Hillion 898496fe94
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
Missed files
2021-05-13 22:48:47 +01:00

182 lines
3.1 KiB
Go

package udp
import (
"context"
"errors"
"fmt"
"log"
"mpbl3p/proxy"
"net"
"sync"
"time"
)
type OutboundFlow struct {
Local func() string
Remote string
g proxy.MacGenerator
keepalive time.Duration
mu sync.RWMutex
Flow
}
func InitiateFlow(
local func() string,
remote string,
vs []proxy.MacVerifier,
gs []proxy.MacGenerator,
c Congestion,
keepalive time.Duration,
) (*OutboundFlow, error) {
f := OutboundFlow{
Local: local,
Remote: remote,
Flow: newFlow(c, vs, gs),
keepalive: keepalive,
}
return &f, nil
}
func (f *OutboundFlow) String() string {
return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local(), f.Remote)
}
func (f *OutboundFlow) Reconnect(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.isAlive {
return nil
}
localAddr, err := net.ResolveUDPAddr("udp", f.Local())
if err != nil {
return err
}
remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote)
if err != nil {
return err
}
conn, err := net.DialUDP("udp", localAddr, remoteAddr)
if err != nil {
return err
}
f.writer = conn
// prod the connection once a second until we get an ack, then consider it alive
var exchanges []proxy.Exchange
if e, ok := f.congestion.(proxy.Exchange); ok {
exchanges = append(exchanges, e)
}
var exchangeData [][]byte
for _, e := range exchanges {
i, err := e.Initial(ctx)
if err != nil {
return err
}
if err = f.sendPacket(proxy.SimplePacket(i)); err != nil {
return err
}
for once := true; !e.Complete() || once; once = false {
if err := func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var recv []byte
if recv, err = f.readPacket(ctx, conn); err != nil {
return err
}
for i := range f.verifiers {
v := f.verifiers[len(f.verifiers)-i-1]
recv, err = proxy.StripMac(recv, v)
if err != nil {
return err
}
}
var resp, data []byte
if resp, data, err = e.Handle(ctx, recv); err != nil {
return err
}
if data != nil {
exchangeData = append(exchangeData, data)
}
if resp != nil {
if err = f.sendPacket(proxy.SimplePacket(resp)); err != nil {
return err
}
}
return nil
}(); err != nil {
return err
}
}
}
go func() {
for _, d := range exchangeData {
if err := f.queueDatagram(ctx, d); err != nil {
return
}
}
lockedAccept := func() {
f.mu.RLock()
defer f.mu.RUnlock()
var p []byte
if p, err = f.readPacket(ctx, conn); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return
}
log.Println(err)
return
}
if err := f.queueDatagram(ctx, p); err != nil {
return
}
}
for f.isAlive {
log.Println("alive and listening for packets")
lockedAccept()
}
log.Println("no longer alive")
}()
f.isAlive = true
return nil
}
func (f *OutboundFlow) Consume(ctx context.Context, p proxy.Packet) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(ctx, p)
}
func (f *OutboundFlow) Produce(ctx context.Context) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(ctx)
}