From c9c32349f2a1f64a24128aa47bbc5b32a5daaade Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Fri, 9 Apr 2021 22:51:46 +0100 Subject: [PATCH 01/17] added exchange type --- proxy/exchange.go | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 proxy/exchange.go diff --git a/proxy/exchange.go b/proxy/exchange.go new file mode 100644 index 0000000..35c6460 --- /dev/null +++ b/proxy/exchange.go @@ -0,0 +1,7 @@ +package proxy + +type Exchange interface { + Complete() bool + Initial() (out []byte, err error) + Handle(in []byte) (out []byte, data []byte, err error) +} -- 2.47.0 From c9596909f2dcbbc846d7bda7198fa0c6aef37e77 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Wed, 14 Apr 2021 17:07:59 +0100 Subject: [PATCH 02/17] new udp exchange --- config/builder.go | 3 +- proxy/exchange.go | 6 +- shared/errors.go | 1 + udp/congestion/newreno/exchange.go | 119 ++++++++++++++++++ udp/congestion/{ => newreno}/newreno.go | 19 +-- udp/congestion/{ => newreno}/newreno_test.go | 2 +- udp/flow.go | 125 +++++++++++++------ 7 files changed, 215 insertions(+), 60 deletions(-) create mode 100644 udp/congestion/newreno/exchange.go rename udp/congestion/{ => newreno}/newreno.go (97%) rename udp/congestion/{ => newreno}/newreno_test.go (99%) diff --git a/config/builder.go b/config/builder.go index b83995c..ba84eb7 100644 --- a/config/builder.go +++ b/config/builder.go @@ -10,6 +10,7 @@ import ( "mpbl3p/tcp" "mpbl3p/udp" "mpbl3p/udp/congestion" + "mpbl3p/udp/congestion/newreno" "time" ) @@ -104,7 +105,7 @@ func buildUdp(ctx context.Context, p *proxy.Proxy, peer Peer, g func() proxy.Mac default: fallthrough case "NewReno": - c = func() udp.Congestion { return congestion.NewNewReno() } + c = func() udp.Congestion { return newreno.NewNewReno() } } if peer.RemoteHost != "" { diff --git a/proxy/exchange.go b/proxy/exchange.go index 35c6460..dae3bb6 100644 --- a/proxy/exchange.go +++ b/proxy/exchange.go @@ -1,7 +1,9 @@ package proxy +import "context" + type Exchange interface { + Initial(ctx context.Context) (out []byte, err error) + Handle(ctx context.Context, in []byte) (out []byte, data []byte, err error) Complete() bool - Initial() (out []byte, err error) - Handle(in []byte) (out []byte, data []byte, err error) } diff --git a/shared/errors.go b/shared/errors.go index 0db0c92..ec5b537 100644 --- a/shared/errors.go +++ b/shared/errors.go @@ -5,3 +5,4 @@ import "errors" var ErrBadChecksum = errors.New("the packet had a bad checksum") var ErrDeadConnection = errors.New("the connection is dead") var ErrNotEnoughBytes = errors.New("not enough bytes") +var ErrBadExchange = errors.New("bad exchange") diff --git a/udp/congestion/newreno/exchange.go b/udp/congestion/newreno/exchange.go new file mode 100644 index 0000000..409bb0a --- /dev/null +++ b/udp/congestion/newreno/exchange.go @@ -0,0 +1,119 @@ +package newreno + +import ( + "context" + "encoding/binary" + "math/rand" + "mpbl3p/shared" + "time" +) + +func (c *NewReno) Initial(ctx context.Context) (out []byte, err error) { + c.alive = false + c.wasInitial = true + c.startSequenceLoop(ctx) + + var s uint32 + select { + case s = <-c.sequence: + case <-ctx.Done(): + return nil, ctx.Err() + } + + b := make([]byte, 12) + binary.LittleEndian.PutUint32(b[8:12], s) + + c.inFlight = []flightInfo{{time.Now(), s}} + + return b, nil +} + +func (c *NewReno) Handle(ctx context.Context, in []byte) (out []byte, data []byte, err error) { + if c.alive || c.stopSequence == nil { + // reset + c.alive = false + c.startSequenceLoop(ctx) + } + + // receive + if len(in) != 12 { + return nil, nil, shared.ErrBadExchange + } + + rcvAck := binary.LittleEndian.Uint32(in[0:4]) + rcvNack := binary.LittleEndian.Uint32(in[4:8]) + rcvSeq := binary.LittleEndian.Uint32(in[8:12]) + + // verify + var ack, seq uint32 + + if rcvNack != 0 { + return nil, nil, shared.ErrBadExchange + } + + if c.wasInitial { + if rcvAck == c.inFlight[0].sequence { + ack = rcvSeq + c.alive = true + } else { + return nil, nil, shared.ErrBadExchange + } + } else { // if !c.wasInitial + if rcvAck == 0 { + // theirs is a syn packet + ack = rcvSeq + select { + case seq = <-c.sequence: + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + + c.inFlight = []flightInfo{{time.Now(), seq}} + } else if len(c.inFlight) == 1 && rcvAck == c.inFlight[0].sequence { + ack = rcvSeq + c.alive = true + } else { + return nil, nil, shared.ErrBadExchange + } + } + + // respond + b := make([]byte, 12) + binary.LittleEndian.PutUint32(b[0:4], ack) + binary.LittleEndian.PutUint32(b[8:12], seq) + + return b, nil, nil +} + +func (c *NewReno) Complete() bool { + return c.alive +} + +func (c *NewReno) startSequenceLoop(ctx context.Context) { + if c.stopSequence != nil { + c.stopSequence() + } + + var s uint32 + for s == 0 { + s = rand.Uint32() + } + + ctx, c.stopSequence = context.WithCancel(ctx) + go func() { + s := s + for { + if s == 0 { + s++ + continue + } + + select { + case c.sequence <- s: + case <-ctx.Done(): + return + } + s++ + } + }() +} diff --git a/udp/congestion/newreno.go b/udp/congestion/newreno/newreno.go similarity index 97% rename from udp/congestion/newreno.go rename to udp/congestion/newreno/newreno.go index d796e11..9ee1a8a 100644 --- a/udp/congestion/newreno.go +++ b/udp/congestion/newreno/newreno.go @@ -1,4 +1,4 @@ -package congestion +package newreno import ( "context" @@ -14,7 +14,9 @@ const RttExponentialFactor = 0.1 const RttLossDelay = 1.5 type NewReno struct { - sequence chan uint32 + sequence chan uint32 + stopSequence context.CancelFunc + wasInitial, alive bool inFlight []flightInfo lastSent time.Time @@ -64,19 +66,6 @@ func NewNewReno() *NewReno { slowStart: true, } - go func() { - var s uint32 - for { - if s == 0 { - s++ - continue - } - - c.sequence <- s - s++ - } - }() - return &c } diff --git a/udp/congestion/newreno_test.go b/udp/congestion/newreno/newreno_test.go similarity index 99% rename from udp/congestion/newreno_test.go rename to udp/congestion/newreno/newreno_test.go index f97dfb9..813e400 100644 --- a/udp/congestion/newreno_test.go +++ b/udp/congestion/newreno/newreno_test.go @@ -1,4 +1,4 @@ -package congestion +package newreno import ( "context" diff --git a/udp/flow.go b/udp/flow.go index d552d93..2ab941f 100644 --- a/udp/flow.go +++ b/udp/flow.go @@ -2,6 +2,7 @@ package udp import ( "context" + "errors" "fmt" "log" "mpbl3p/proxy" @@ -19,6 +20,7 @@ type PacketWriter interface { type PacketConn interface { PacketWriter + SetReadDeadline(t time.Time) error ReadFromUDP(b []byte) (int, *net.UDPAddr, error) } @@ -42,7 +44,6 @@ type Flow struct { raddr *net.UDPAddr isAlive bool - startup bool congestion Congestion v proxy.MacVerifier @@ -105,52 +106,82 @@ func (f *InitiatedFlow) Reconnect(ctx context.Context) error { } f.writer = conn - f.startup = true // prod the connection once a second until we get an ack, then consider it alive - go func() { - seq, err := f.congestion.Sequence(ctx) - if err != nil { - return - } + var exchanges []proxy.Exchange - for !f.isAlive { - if ctx.Err() != nil { - return - } - - p := Packet{ - ack: 0, - nack: 0, - seq: seq, - data: proxy.SimplePacket(nil), - } - - _ = f.sendPacket(p, f.g) - time.Sleep(1 * time.Second) - } - }() - - go func() { - _, _ = f.produceInternal(ctx, f.v, false) - }() - go f.earlyUpdateLoop(ctx, f.g, f.keepalive) - - if err := f.readQueuePacket(ctx, conn); err != nil { - return err + if e, ok := f.congestion.(proxy.Exchange); ok { + exchanges = append(exchanges, e) } - f.isAlive = true - f.startup = false + var exchangeData [][]byte + + for _, e := range exchanges { + i, err := e.Initial(ctx) + if err != nil { + return err + } + + if err = f.sendPacket(proxy.SimplePacket(i), f.g); err != nil { + return err + } + + for once := true; once || !e.Complete(); once = false { + if err := func() error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var recv []byte + if recv, err = f.readPacket(ctx, conn); err != nil { + return err + } + + var resp, data []byte + if resp, data, err = e.Handle(ctx, recv); err != nil { + return err + } + + if data != nil { + exchangeData = append(exchangeData, data) + } + + if resp != nil { + if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { + return err + } + } + + return nil + }(); err != nil { + return err + } + } + } go func() { + for _, d := range exchangeData { + if err := f.queueDatagram(ctx, d); err != nil { + return + } + } + lockedAccept := func() { f.mu.RLock() defer f.mu.RUnlock() - if err := f.readQueuePacket(ctx, conn); err != nil { + var p []byte + if p, err = f.readPacket(ctx, conn); err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return + } log.Println(err) + return } + + if err := f.queueDatagram(ctx, p); err != nil { + return + } + } for f.isAlive { @@ -160,6 +191,7 @@ func (f *InitiatedFlow) Reconnect(ctx context.Context) error { log.Println("no longer alive") }() + f.isAlive = true return nil } @@ -260,7 +292,7 @@ func (f *Flow) queueDatagram(ctx context.Context, p []byte) error { } } -func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error { +func (f *Flow) sendPacket(p proxy.Packet, g proxy.MacGenerator) error { b := p.Marshal() b = proxy.AppendMac(b, g) @@ -294,13 +326,24 @@ func (f *Flow) earlyUpdateLoop(ctx context.Context, g proxy.MacGenerator, keepal } } -func (f *Flow) readQueuePacket(ctx context.Context, c PacketConn) error { - // TODO: Replace 6000 with MTU+header size +func (f *Flow) readPacket(ctx context.Context, c PacketConn) ([]byte, error) { buf := make([]byte, 6000) - n, _, err := c.ReadFromUDP(buf) - if err != nil { - return err + + if d, ok := ctx.Deadline(); ok { + if err := c.SetReadDeadline(d); err != nil { + return nil, err + } } - return f.queueDatagram(ctx, buf[:n]) + n, _, err := c.ReadFromUDP(buf) + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + } + return nil, err + } + + return buf[:n], nil } -- 2.47.0 From fef01c18c751f9faa2e21898589e3b405c26e5c4 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Wed, 14 Apr 2021 17:27:53 +0100 Subject: [PATCH 03/17] macos changes --- flags/flags.go | 2 -- udp/listener.go | 5 ----- 2 files changed, 7 deletions(-) diff --git a/flags/flags.go b/flags/flags.go index 236d72a..4f6606d 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -1,14 +1,12 @@ package flags import ( - "errors" "fmt" goflags "github.com/jessevdk/go-flags" "os" ) var PrintedHelpErr = goflags.ErrHelp -var NotEnoughArgs = errors.New("not enough arguments") type Options struct { Foreground bool `short:"f" long:"foreground" description:"Run in the foreground"` diff --git a/udp/listener.go b/udp/listener.go index cf3e04f..0a1e661 100644 --- a/udp/listener.go +++ b/udp/listener.go @@ -38,11 +38,6 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro return err } - err = pconn.SetWriteBuffer(0) - if err != nil { - panic(err) - } - receivedConnections := make(map[ComparableUdpAddress]*Flow) go func() { -- 2.47.0 From e67ce534b495ea367ba286c885a4872782a060d1 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Wed, 14 Apr 2021 17:40:05 +0100 Subject: [PATCH 04/17] missed file --- flags/locs_darwin.go | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 flags/locs_darwin.go diff --git a/flags/locs_darwin.go b/flags/locs_darwin.go new file mode 100644 index 0000000..13bbda0 --- /dev/null +++ b/flags/locs_darwin.go @@ -0,0 +1,4 @@ +package flags + +const DefaultConfigFile = "/usr/local/etc/netcombiner/%s" +const DefaultPidFile = "/var/run/netcombiner/%s.pid" -- 2.47.0 From 7f5cbea01d21897c74ae7dce90a3f3a9f7f0cf66 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 15 Apr 2021 15:28:50 +0100 Subject: [PATCH 05/17] split out udp flows --- udp/flow.go | 163 ---------------------------------------- udp/inbound_flow.go | 130 ++++++++++++++++++++++++++++++++ udp/listener.go | 5 +- udp/outbound_flow.go | 173 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 307 insertions(+), 164 deletions(-) create mode 100644 udp/inbound_flow.go create mode 100644 udp/outbound_flow.go diff --git a/udp/flow.go b/udp/flow.go index 2ab941f..ee4a357 100644 --- a/udp/flow.go +++ b/udp/flow.go @@ -2,13 +2,11 @@ package udp import ( "context" - "errors" "fmt" "log" "mpbl3p/proxy" "mpbl3p/shared" "net" - "sync" "time" ) @@ -24,21 +22,6 @@ type PacketConn interface { ReadFromUDP(b []byte) (int, *net.UDPAddr, error) } -type InitiatedFlow struct { - Local func() string - Remote string - - g proxy.MacGenerator - keepalive time.Duration - - mu sync.RWMutex - Flow -} - -func (f *InitiatedFlow) String() string { - return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local(), f.Remote) -} - type Flow struct { writer PacketWriter raddr *net.UDPAddr @@ -55,25 +38,6 @@ func (f Flow) String() string { return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr()) } -func InitiateFlow( - local func() string, - remote string, - v proxy.MacVerifier, - g proxy.MacGenerator, - c Congestion, - keepalive time.Duration, -) (*InitiatedFlow, error) { - f := InitiatedFlow{ - Local: local, - Remote: remote, - Flow: newFlow(c, v), - g: g, - keepalive: keepalive, - } - - return &f, nil -} - func newFlow(c Congestion, v proxy.MacVerifier) Flow { return Flow{ inboundDatagrams: make(chan []byte), @@ -82,133 +46,6 @@ func newFlow(c Congestion, v proxy.MacVerifier) Flow { } } -func (f *InitiatedFlow) Reconnect(ctx context.Context) error { - f.mu.Lock() - defer f.mu.Unlock() - - if f.isAlive { - return nil - } - - localAddr, err := net.ResolveUDPAddr("udp", f.Local()) - if err != nil { - return err - } - - remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote) - if err != nil { - return err - } - - conn, err := net.DialUDP("udp", localAddr, remoteAddr) - if err != nil { - return err - } - - f.writer = conn - - // prod the connection once a second until we get an ack, then consider it alive - var exchanges []proxy.Exchange - - if e, ok := f.congestion.(proxy.Exchange); ok { - exchanges = append(exchanges, e) - } - - var exchangeData [][]byte - - for _, e := range exchanges { - i, err := e.Initial(ctx) - if err != nil { - return err - } - - if err = f.sendPacket(proxy.SimplePacket(i), f.g); err != nil { - return err - } - - for once := true; once || !e.Complete(); once = false { - if err := func() error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - var recv []byte - if recv, err = f.readPacket(ctx, conn); err != nil { - return err - } - - var resp, data []byte - if resp, data, err = e.Handle(ctx, recv); err != nil { - return err - } - - if data != nil { - exchangeData = append(exchangeData, data) - } - - if resp != nil { - if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { - return err - } - } - - return nil - }(); err != nil { - return err - } - } - } - - go func() { - for _, d := range exchangeData { - if err := f.queueDatagram(ctx, d); err != nil { - return - } - } - - lockedAccept := func() { - f.mu.RLock() - defer f.mu.RUnlock() - - var p []byte - if p, err = f.readPacket(ctx, conn); err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - return - } - log.Println(err) - return - } - - if err := f.queueDatagram(ctx, p); err != nil { - return - } - - } - - for f.isAlive { - log.Println("alive and listening for packets") - lockedAccept() - } - log.Println("no longer alive") - }() - - f.isAlive = true - return nil -} - -func (f *InitiatedFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { - f.mu.RLock() - defer f.mu.RUnlock() - - return f.Flow.Consume(ctx, p, g) -} - -func (f *InitiatedFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { - f.mu.RLock() - defer f.mu.RUnlock() - - return f.Flow.Produce(ctx, v) -} - func (f *Flow) IsAlive() bool { return f.isAlive } diff --git a/udp/inbound_flow.go b/udp/inbound_flow.go new file mode 100644 index 0000000..e07740c --- /dev/null +++ b/udp/inbound_flow.go @@ -0,0 +1,130 @@ +package udp + +import ( + "context" + "log" + "mpbl3p/proxy" + "sync" + "time" +) + +type InboundFlow struct { + g proxy.MacGenerator + inboundDatagrams chan []byte + + mu sync.RWMutex + Flow +} + +func newInboundFlow(ctx context.Context, v proxy.MacVerifier, g proxy.MacGenerator, c Congestion) (*InboundFlow, error) { + f := InboundFlow{ + g: g, + inboundDatagrams: make(chan []byte), + Flow: newFlow(c, v), + } + + go f.processPackets(ctx) + return &f, nil +} + +func (f *InboundFlow) queueDatagram(ctx context.Context, p []byte) error { + select { + case f.inboundDatagrams <- p: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (f *InboundFlow) processPackets(ctx context.Context) { + for { + f.mu.Lock() + + var err error + for once := true; once || err == nil; once = false { + err = f.handleExchanges(ctx) + if err != nil { + log.Println(err) + } + } + + f.mu.Unlock() + + var p []byte + select { + case p = <-f.inboundDatagrams: + case <-ctx.Done(): + return + } + + // TODO: Check if p means redo exchanges + if false { + continue + } + + select { + case f.Flow.inboundDatagrams <- p: + case <-ctx.Done(): + return + } + } +} + +func (f *InboundFlow) handleExchanges(ctx context.Context) error { + var exchanges []proxy.Exchange + + if e, ok := f.congestion.(proxy.Exchange); ok { + exchanges = append(exchanges, e) + } + + var exchangeData [][]byte + + for _, e := range exchanges { + for once := true; once || !e.Complete(); once = false { + if err := func() (err error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var recv []byte + select { + case recv = <-f.inboundDatagrams: + case <-ctx.Done(): + return ctx.Err() + } + + var resp, data []byte + if resp, data, err = e.Handle(ctx, recv); err != nil { + return err + } + + if data != nil { + exchangeData = append(exchangeData, data) + } + + if resp != nil { + if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { + return err + } + } + + return nil + }(); err != nil { + return err + } + } + } +} + +func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Consume(ctx, p, g) +} + +func (f *InboundFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Produce(ctx, v) +} diff --git a/udp/listener.go b/udp/listener.go index 0a1e661..67dd9d0 100644 --- a/udp/listener.go +++ b/udp/listener.go @@ -55,11 +55,14 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro panic(err) } + // TODO: Support congestion exchange here + raddr := fromUdpAddress(*addr) if f, exists := receivedConnections[raddr]; exists { log.Println("existing flow. queuing...") if err := f.queueDatagram(ctx, buf[:n]); err != nil { - + log.Println("error") + continue } log.Println("queued") continue diff --git a/udp/outbound_flow.go b/udp/outbound_flow.go new file mode 100644 index 0000000..f76aaea --- /dev/null +++ b/udp/outbound_flow.go @@ -0,0 +1,173 @@ +package udp + +import ( + "context" + "errors" + "fmt" + "log" + "mpbl3p/proxy" + "net" + "sync" + "time" +) + +type OutboundFlow struct { + Local func() string + Remote string + + g proxy.MacGenerator + keepalive time.Duration + + mu sync.RWMutex + Flow +} + +func InitiateFlow( + local func() string, + remote string, + v proxy.MacVerifier, + g proxy.MacGenerator, + c Congestion, + keepalive time.Duration, +) (*OutboundFlow, error) { + f := OutboundFlow{ + Local: local, + Remote: remote, + Flow: newFlow(c, v), + g: g, + keepalive: keepalive, + } + + return &f, nil +} + +func (f *OutboundFlow) String() string { + return fmt.Sprintf("UdpOutbound{%v -> %v}", f.Local(), f.Remote) +} + +func (f *OutboundFlow) Reconnect(ctx context.Context) error { + f.mu.Lock() + defer f.mu.Unlock() + + if f.isAlive { + return nil + } + + localAddr, err := net.ResolveUDPAddr("udp", f.Local()) + if err != nil { + return err + } + + remoteAddr, err := net.ResolveUDPAddr("udp", f.Remote) + if err != nil { + return err + } + + conn, err := net.DialUDP("udp", localAddr, remoteAddr) + if err != nil { + return err + } + + f.writer = conn + + // prod the connection once a second until we get an ack, then consider it alive + var exchanges []proxy.Exchange + + if e, ok := f.congestion.(proxy.Exchange); ok { + exchanges = append(exchanges, e) + } + + var exchangeData [][]byte + + for _, e := range exchanges { + i, err := e.Initial(ctx) + if err != nil { + return err + } + + if err = f.sendPacket(proxy.SimplePacket(i), f.g); err != nil { + return err + } + + for once := true; once || !e.Complete(); once = false { + if err := func() error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var recv []byte + if recv, err = f.readPacket(ctx, conn); err != nil { + return err + } + + var resp, data []byte + if resp, data, err = e.Handle(ctx, recv); err != nil { + return err + } + + if data != nil { + exchangeData = append(exchangeData, data) + } + + if resp != nil { + if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { + return err + } + } + + return nil + }(); err != nil { + return err + } + } + } + + go func() { + for _, d := range exchangeData { + if err := f.queueDatagram(ctx, d); err != nil { + return + } + } + + lockedAccept := func() { + f.mu.RLock() + defer f.mu.RUnlock() + + var p []byte + if p, err = f.readPacket(ctx, conn); err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return + } + log.Println(err) + return + } + + if err := f.queueDatagram(ctx, p); err != nil { + return + } + + } + + for f.isAlive { + log.Println("alive and listening for packets") + lockedAccept() + } + log.Println("no longer alive") + }() + + f.isAlive = true + return nil +} + +func (f *OutboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Consume(ctx, p, g) +} + +func (f *OutboundFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Produce(ctx, v) +} -- 2.47.0 From 7ad99cd8149f76c8eb60e6c96490c0baceaac0c1 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 15 Apr 2021 15:35:46 +0100 Subject: [PATCH 06/17] implemented inboundflow --- udp/inbound_flow.go | 11 ++++++----- udp/listener.go | 19 ++++++++++++------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/udp/inbound_flow.go b/udp/inbound_flow.go index e07740c..d3e5fb0 100644 --- a/udp/inbound_flow.go +++ b/udp/inbound_flow.go @@ -16,15 +16,14 @@ type InboundFlow struct { Flow } -func newInboundFlow(ctx context.Context, v proxy.MacVerifier, g proxy.MacGenerator, c Congestion) (*InboundFlow, error) { - f := InboundFlow{ +func newInboundFlow(f Flow, g proxy.MacGenerator) (*InboundFlow, error) { + fi := InboundFlow{ g: g, inboundDatagrams: make(chan []byte), - Flow: newFlow(c, v), + Flow: f, } - go f.processPackets(ctx) - return &f, nil + return &fi, nil } func (f *InboundFlow) queueDatagram(ctx context.Context, p []byte) error { @@ -113,6 +112,8 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { } } } + + return nil } func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { diff --git a/udp/listener.go b/udp/listener.go index 67dd9d0..4a20934 100644 --- a/udp/listener.go +++ b/udp/listener.go @@ -38,7 +38,7 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro return err } - receivedConnections := make(map[ComparableUdpAddress]*Flow) + receivedConnections := make(map[ComparableUdpAddress]*InboundFlow) go func() { for ctx.Err() == nil { @@ -55,8 +55,6 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro panic(err) } - // TODO: Support congestion exchange here - raddr := fromUdpAddress(*addr) if f, exists := receivedConnections[raddr]; exists { log.Println("existing flow. queuing...") @@ -77,14 +75,21 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro f.raddr = addr f.isAlive = true + fi, err := newInboundFlow(f, g) + if err != nil { + log.Println(err) + continue + } + log.Printf("received new udp connection: %v\n", f) - go f.earlyUpdateLoop(ctx, g, 0) + go fi.processPackets(ctx) + go fi.earlyUpdateLoop(ctx, g, 0) - receivedConnections[raddr] = &f + receivedConnections[raddr] = fi - p.AddConsumer(ctx, &f, g) - p.AddProducer(ctx, &f, v) + p.AddConsumer(ctx, fi, g) + p.AddProducer(ctx, fi, v) log.Println("handling...") if err := f.queueDatagram(ctx, buf[:n]); err != nil { -- 2.47.0 From 54ac76ab58054efb4c540bda577a90294c039ab6 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 15 Apr 2021 16:15:24 +0100 Subject: [PATCH 07/17] code changes --- proxy/proxy.go | 14 ++++++++------ udp/congestion/newreno/exchange.go | 16 ++++++++-------- udp/inbound_flow.go | 12 ++++++++++-- udp/listener.go | 10 ++++------ udp/outbound_flow.go | 6 +++++- 5 files changed, 35 insertions(+), 23 deletions(-) diff --git a/proxy/proxy.go b/proxy/proxy.go index 593d65b..f2d8021 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -75,12 +75,13 @@ func (p Proxy) AddConsumer(ctx context.Context, c Consumer, g MacGenerator) { if reconnectable { var err error for once := true; err != nil || once; once = false { - log.Printf("attempting to connect consumer `%v`\n", c) - err = c.(Reconnectable).Reconnect(ctx) - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + if err := ctx.Err(); err != nil { log.Printf("closed consumer `%v` (context)\n", c) return } + + log.Printf("attempting to connect consumer `%v`\n", c) + err = c.(Reconnectable).Reconnect(ctx) if !once { time.Sleep(time.Second) } @@ -118,12 +119,13 @@ func (p Proxy) AddProducer(ctx context.Context, pr Producer, v MacVerifier) { if reconnectable { var err error for once := true; err != nil || once; once = false { - log.Printf("attempting to connect producer `%v`\n", pr) - err = pr.(Reconnectable).Reconnect(ctx) - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + if err := ctx.Err(); err != nil { log.Printf("closed producer `%v` (context)\n", pr) return } + + log.Printf("attempting to connect producer `%v`\n", pr) + err = pr.(Reconnectable).Reconnect(ctx) if !once { time.Sleep(time.Second) } diff --git a/udp/congestion/newreno/exchange.go b/udp/congestion/newreno/exchange.go index 409bb0a..29ca16a 100644 --- a/udp/congestion/newreno/exchange.go +++ b/udp/congestion/newreno/exchange.go @@ -45,23 +45,24 @@ func (c *NewReno) Handle(ctx context.Context, in []byte) (out []byte, data []byt rcvSeq := binary.LittleEndian.Uint32(in[8:12]) // verify - var ack, seq uint32 - if rcvNack != 0 { return nil, nil, shared.ErrBadExchange } + var seq uint32 + if c.wasInitial { if rcvAck == c.inFlight[0].sequence { - ack = rcvSeq - c.alive = true + c.ack, c.lastAck = rcvSeq, rcvSeq + c.alive, c.inFlight = true, nil } else { return nil, nil, shared.ErrBadExchange } } else { // if !c.wasInitial if rcvAck == 0 { // theirs is a syn packet - ack = rcvSeq + c.ack, c.lastAck = rcvSeq, rcvSeq + select { case seq = <-c.sequence: case <-ctx.Done(): @@ -70,8 +71,7 @@ func (c *NewReno) Handle(ctx context.Context, in []byte) (out []byte, data []byt c.inFlight = []flightInfo{{time.Now(), seq}} } else if len(c.inFlight) == 1 && rcvAck == c.inFlight[0].sequence { - ack = rcvSeq - c.alive = true + c.alive, c.inFlight = true, nil } else { return nil, nil, shared.ErrBadExchange } @@ -79,7 +79,7 @@ func (c *NewReno) Handle(ctx context.Context, in []byte) (out []byte, data []byt // respond b := make([]byte, 12) - binary.LittleEndian.PutUint32(b[0:4], ack) + binary.LittleEndian.PutUint32(b[0:4], c.ack) binary.LittleEndian.PutUint32(b[8:12], seq) return b, nil, nil diff --git a/udp/inbound_flow.go b/udp/inbound_flow.go index d3e5fb0..b802252 100644 --- a/udp/inbound_flow.go +++ b/udp/inbound_flow.go @@ -40,7 +40,11 @@ func (f *InboundFlow) processPackets(ctx context.Context) { f.mu.Lock() var err error - for once := true; once || err == nil; once = false { + for once := true; err != nil || once; once = false { + if ctx.Err() != nil { + return + } + err = f.handleExchanges(ctx) if err != nil { log.Println(err) @@ -79,7 +83,7 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { var exchangeData [][]byte for _, e := range exchanges { - for once := true; once || !e.Complete(); once = false { + for once := true; !e.Complete() || once; once = false { if err := func() (err error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -91,6 +95,10 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { return ctx.Err() } + if recv, err = proxy.StripMac(recv, f.v); err != nil { + return err + } + var resp, data []byte if resp, data, err = e.Handle(ctx, recv); err != nil { return err diff --git a/udp/listener.go b/udp/listener.go index 4a20934..7becda4 100644 --- a/udp/listener.go +++ b/udp/listener.go @@ -16,9 +16,7 @@ type ComparableUdpAddress struct { func fromUdpAddress(address net.UDPAddr) ComparableUdpAddress { var ip [16]byte - for i, b := range []byte(address.IP) { - ip[i] = b - } + copy(ip[:], address.IP) return ComparableUdpAddress{ IP: ip, @@ -56,9 +54,9 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro } raddr := fromUdpAddress(*addr) - if f, exists := receivedConnections[raddr]; exists { + if fi, exists := receivedConnections[raddr]; exists { log.Println("existing flow. queuing...") - if err := f.queueDatagram(ctx, buf[:n]); err != nil { + if err := fi.queueDatagram(ctx, buf[:n]); err != nil { log.Println("error") continue } @@ -92,7 +90,7 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro p.AddProducer(ctx, fi, v) log.Println("handling...") - if err := f.queueDatagram(ctx, buf[:n]); err != nil { + if err := fi.queueDatagram(ctx, buf[:n]); err != nil { return } log.Println("handled") diff --git a/udp/outbound_flow.go b/udp/outbound_flow.go index f76aaea..ed68a8a 100644 --- a/udp/outbound_flow.go +++ b/udp/outbound_flow.go @@ -89,7 +89,7 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error { return err } - for once := true; once || !e.Complete(); once = false { + for once := true; !e.Complete() || once; once = false { if err := func() error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -99,6 +99,10 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error { return err } + if recv, err = proxy.StripMac(recv, f.v); err != nil { + return err + } + var resp, data []byte if resp, data, err = e.Handle(ctx, recv); err != nil { return err -- 2.47.0 From 33f5df8f3597665e3a4615993ac4c27cd2c3e32b Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Wed, 12 May 2021 00:43:51 +0100 Subject: [PATCH 08/17] newreno exchange testing --- udp/congestion/newreno/exchange_test.go | 57 +++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 udp/congestion/newreno/exchange_test.go diff --git a/udp/congestion/newreno/exchange_test.go b/udp/congestion/newreno/exchange_test.go new file mode 100644 index 0000000..ae56355 --- /dev/null +++ b/udp/congestion/newreno/exchange_test.go @@ -0,0 +1,57 @@ +package newreno + +import ( + "context" + "encoding/binary" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNewReno_InitialHandle(t *testing.T) { + t.Run("InitialAckNackAreZero", func(t *testing.T) { + // ASSIGN + a := NewNewReno() + + ctx := context.Background() + + // ACT + initial, err := a.Initial(ctx) + + ack := binary.LittleEndian.Uint32(initial[0:4]) + nack := binary.LittleEndian.Uint32(initial[4:8]) + + // ASSERT + require.Nil(t, err) + + assert.Zero(t, ack) + assert.Zero(t, nack) + }) + + t.Run("InitialHandledWithAck", func(t *testing.T) { + // ASSIGN + a := NewNewReno() + b := NewNewReno() + + ctx := context.Background() + + // ACT + initial, err := a.Initial(ctx) + require.Nil(t, err) + + initialSeq := binary.LittleEndian.Uint32(initial[8:12]) + + response, data, err := b.Handle(ctx , initial) + + ack := binary.LittleEndian.Uint32(response[0:4]) + nack := binary.LittleEndian.Uint32(response[4:8]) + + // ASSERT + require.Nil(t, err) + + assert.Equal(t, initialSeq, ack) + assert.Zero(t, nack) + + assert.Nil(t, data) + }) +} -- 2.47.0 From 81dce2855d021ddfd0837a860f901d9a9970ecb7 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Wed, 12 May 2021 00:44:16 +0100 Subject: [PATCH 09/17] formatting --- udp/congestion/newreno/exchange_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/udp/congestion/newreno/exchange_test.go b/udp/congestion/newreno/exchange_test.go index ae56355..73b2a9f 100644 --- a/udp/congestion/newreno/exchange_test.go +++ b/udp/congestion/newreno/exchange_test.go @@ -41,7 +41,7 @@ func TestNewReno_InitialHandle(t *testing.T) { initialSeq := binary.LittleEndian.Uint32(initial[8:12]) - response, data, err := b.Handle(ctx , initial) + response, data, err := b.Handle(ctx, initial) ack := binary.LittleEndian.Uint32(response[0:4]) nack := binary.LittleEndian.Uint32(response[4:8]) -- 2.47.0 From 068b544b7d99a8a7537558d52353577a46616df7 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Wed, 12 May 2021 00:52:46 +0100 Subject: [PATCH 10/17] read deadline test method --- mocks/packetconn.go | 9 ++++++++- udp/flow_test.go | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/mocks/packetconn.go b/mocks/packetconn.go index 9360db4..9f79e10 100644 --- a/mocks/packetconn.go +++ b/mocks/packetconn.go @@ -1,6 +1,9 @@ package mocks -import "net" +import ( + "net" + "time" +) type MockPerfectBiPacketConn struct { directionA chan []byte @@ -44,6 +47,10 @@ func (c MockPerfectPacketConn) LocalAddr() net.Addr { } } +func (c MockPerfectPacketConn) SetReadDeadline(time.Time) error { + return nil +} + func (c MockPerfectPacketConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) { p := <-c.inbound return copy(b, p), &net.UDPAddr{ diff --git a/udp/flow_test.go b/udp/flow_test.go index d044477..30a26ea 100644 --- a/udp/flow_test.go +++ b/udp/flow_test.go @@ -64,7 +64,7 @@ func TestFlow_Produce(t *testing.T) { flowA.isAlive = true go func() { - err := flowA.readQueuePacket(context.Background(), testConn.SideB()) + _, err := flowA.readPacket(context.Background(), testConn.SideB()) assert.Nil(t, err) }() p, err := flowA.Produce(context.Background(), testMac) -- 2.47.0 From ac116734b70213c5f9eb42d9d2805f380b25a194 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 13 May 2021 17:43:28 +0100 Subject: [PATCH 11/17] newreno timeout decreased --- udp/congestion/newreno/newreno.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/udp/congestion/newreno/newreno.go b/udp/congestion/newreno/newreno.go index 9ee1a8a..a3c0e89 100644 --- a/udp/congestion/newreno/newreno.go +++ b/udp/congestion/newreno/newreno.go @@ -11,7 +11,7 @@ import ( ) const RttExponentialFactor = 0.1 -const RttLossDelay = 1.5 +const RttLossDelay = 0.5 type NewReno struct { sequence chan uint32 -- 2.47.0 From 898496fe94442861ffcc0b3fc1aa0126ad110002 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 13 May 2021 22:48:47 +0100 Subject: [PATCH 12/17] Missed files --- udp/flow.go | 44 +++++++++++++++++++++++++++----------------- udp/flow_test.go | 2 +- udp/inbound_flow.go | 24 ++++++++++++++---------- udp/listener.go | 2 +- udp/outbound_flow.go | 28 ++++++++++++++++------------ 5 files changed, 59 insertions(+), 41 deletions(-) diff --git a/udp/flow.go b/udp/flow.go index ee4a357..f538b1d 100644 --- a/udp/flow.go +++ b/udp/flow.go @@ -29,7 +29,8 @@ type Flow struct { isAlive bool congestion Congestion - v proxy.MacVerifier + verifiers []proxy.MacVerifier + generators []proxy.MacGenerator inboundDatagrams chan []byte } @@ -38,11 +39,12 @@ func (f Flow) String() string { return fmt.Sprintf("UdpInbound{%v -> %v}", f.raddr, f.writer.LocalAddr()) } -func newFlow(c Congestion, v proxy.MacVerifier) Flow { +func newFlow(c Congestion, vs []proxy.MacVerifier, gs []proxy.MacGenerator) Flow { return Flow{ inboundDatagrams: make(chan []byte), congestion: c, - v: v, + verifiers: vs, + generators: gs, } } @@ -50,7 +52,7 @@ func (f *Flow) IsAlive() bool { return f.isAlive } -func (f *Flow) Consume(ctx context.Context, pp proxy.Packet, g proxy.MacGenerator) error { +func (f *Flow) Consume(ctx context.Context, pp proxy.Packet) error { if !f.isAlive { return shared.ErrDeadConnection } @@ -73,18 +75,18 @@ func (f *Flow) Consume(ctx context.Context, pp proxy.Packet, g proxy.MacGenerato nack: f.congestion.NextNack(), } - return f.sendPacket(p, g) + return f.sendPacket(p) } -func (f *Flow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { +func (f *Flow) Produce(ctx context.Context) (proxy.Packet, error) { if !f.isAlive { return nil, shared.ErrDeadConnection } - return f.produceInternal(ctx, v, true) + return f.produceInternal(ctx, true) } -func (f *Flow) produceInternal(ctx context.Context, v proxy.MacVerifier, mustReturn bool) (proxy.Packet, error) { +func (f *Flow) produceInternal(ctx context.Context, mustReturn bool) (proxy.Packet, error) { for once := true; mustReturn || once; once = false { log.Println(f.congestion) @@ -95,12 +97,17 @@ func (f *Flow) produceInternal(ctx context.Context, v proxy.MacVerifier, mustRet return nil, ctx.Err() } - b, err := proxy.StripMac(received, v) - if err != nil { - return nil, err + for i := range f.verifiers { + v := f.verifiers[len(f.verifiers)-i-1] + + var err error + received, err = proxy.StripMac(received, v) + if err != nil { + return nil, err + } } - p, err := UnmarshalPacket(b) + p, err := UnmarshalPacket(received) if err != nil { return nil, err } @@ -109,7 +116,7 @@ func (f *Flow) produceInternal(ctx context.Context, v proxy.MacVerifier, mustRet f.congestion.ReceivedPacket(p.seq, p.nack, p.ack) // 12 bytes for header + the MAC + a timestamp - if len(b) == 12+f.v.CodeLength()+8 { + if len(p.Contents()) == 0 { log.Println("handled keepalive/ack only packet") continue } @@ -129,9 +136,12 @@ func (f *Flow) queueDatagram(ctx context.Context, p []byte) error { } } -func (f *Flow) sendPacket(p proxy.Packet, g proxy.MacGenerator) error { +func (f *Flow) sendPacket(p proxy.Packet) error { b := p.Marshal() - b = proxy.AppendMac(b, g) + + for _, g := range f.generators { + b = proxy.AppendMac(b, g) + } if f.raddr == nil { _, err := f.writer.Write(b) @@ -142,7 +152,7 @@ func (f *Flow) sendPacket(p proxy.Packet, g proxy.MacGenerator) error { } } -func (f *Flow) earlyUpdateLoop(ctx context.Context, g proxy.MacGenerator, keepalive time.Duration) { +func (f *Flow) earlyUpdateLoop(ctx context.Context, keepalive time.Duration) { for f.isAlive { seq, err := f.congestion.AwaitEarlyUpdate(ctx, keepalive) if err != nil { @@ -156,7 +166,7 @@ func (f *Flow) earlyUpdateLoop(ctx context.Context, g proxy.MacGenerator, keepal nack: f.congestion.NextNack(), } - err = f.sendPacket(p, g) + err = f.sendPacket(p) if err != nil { fmt.Printf("error sending early update packet: `%v`\n", err) } diff --git a/udp/flow_test.go b/udp/flow_test.go index d5bc2b1..0c61220 100644 --- a/udp/flow_test.go +++ b/udp/flow_test.go @@ -107,7 +107,7 @@ func TestFlow_Produce(t *testing.T) { flowA.isAlive = true go func() { - err := flowA.readQueuePacket(context.Background(), testConn.SideB()) + _, err := flowA.readPacket(context.Background(), testConn.SideB()) assert.Nil(t, err) }() p, err := flowA.Produce(context.Background()) diff --git a/udp/inbound_flow.go b/udp/inbound_flow.go index b802252..768dc44 100644 --- a/udp/inbound_flow.go +++ b/udp/inbound_flow.go @@ -9,16 +9,14 @@ import ( ) type InboundFlow struct { - g proxy.MacGenerator inboundDatagrams chan []byte mu sync.RWMutex Flow } -func newInboundFlow(f Flow, g proxy.MacGenerator) (*InboundFlow, error) { +func newInboundFlow(f Flow) (*InboundFlow, error) { fi := InboundFlow{ - g: g, inboundDatagrams: make(chan []byte), Flow: f, } @@ -95,8 +93,14 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { return ctx.Err() } - if recv, err = proxy.StripMac(recv, f.v); err != nil { - return err + + for i := range f.verifiers { + v := f.verifiers[len(f.verifiers)-i-1] + + recv, err = proxy.StripMac(recv, v) + if err != nil { + return err + } } var resp, data []byte @@ -109,7 +113,7 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { } if resp != nil { - if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { + if err = f.sendPacket(proxy.SimplePacket(resp)); err != nil { return err } } @@ -124,16 +128,16 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { return nil } -func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { +func (f *InboundFlow) Consume(ctx context.Context, p proxy.Packet) error { f.mu.RLock() defer f.mu.RUnlock() - return f.Flow.Consume(ctx, p, g) + return f.Flow.Consume(ctx, p) } -func (f *InboundFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { +func (f *InboundFlow) Produce(ctx context.Context) (proxy.Packet, error) { f.mu.RLock() defer f.mu.RUnlock() - return f.Flow.Produce(ctx, v) + return f.Flow.Produce(ctx) } diff --git a/udp/listener.go b/udp/listener.go index 6c344b6..6286e7a 100644 --- a/udp/listener.go +++ b/udp/listener.go @@ -88,7 +88,7 @@ func NewListener( f.raddr = addr f.isAlive = true - fi, err := newInboundFlow(f, g) + fi, err := newInboundFlow(f) if err != nil { log.Println(err) continue diff --git a/udp/outbound_flow.go b/udp/outbound_flow.go index ed68a8a..ae6751c 100644 --- a/udp/outbound_flow.go +++ b/udp/outbound_flow.go @@ -25,16 +25,15 @@ type OutboundFlow struct { func InitiateFlow( local func() string, remote string, - v proxy.MacVerifier, - g proxy.MacGenerator, + vs []proxy.MacVerifier, + gs []proxy.MacGenerator, c Congestion, keepalive time.Duration, ) (*OutboundFlow, error) { f := OutboundFlow{ Local: local, Remote: remote, - Flow: newFlow(c, v), - g: g, + Flow: newFlow(c, vs, gs), keepalive: keepalive, } @@ -85,7 +84,7 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error { return err } - if err = f.sendPacket(proxy.SimplePacket(i), f.g); err != nil { + if err = f.sendPacket(proxy.SimplePacket(i)); err != nil { return err } @@ -99,8 +98,13 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error { return err } - if recv, err = proxy.StripMac(recv, f.v); err != nil { - return err + for i := range f.verifiers { + v := f.verifiers[len(f.verifiers)-i-1] + + recv, err = proxy.StripMac(recv, v) + if err != nil { + return err + } } var resp, data []byte @@ -113,7 +117,7 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error { } if resp != nil { - if err = f.sendPacket(proxy.SimplePacket(resp), f.g); err != nil { + if err = f.sendPacket(proxy.SimplePacket(resp)); err != nil { return err } } @@ -162,16 +166,16 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error { return nil } -func (f *OutboundFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error { +func (f *OutboundFlow) Consume(ctx context.Context, p proxy.Packet) error { f.mu.RLock() defer f.mu.RUnlock() - return f.Flow.Consume(ctx, p, g) + return f.Flow.Consume(ctx, p) } -func (f *OutboundFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) { +func (f *OutboundFlow) Produce(ctx context.Context) (proxy.Packet, error) { f.mu.RLock() defer f.mu.RUnlock() - return f.Flow.Produce(ctx, v) + return f.Flow.Produce(ctx) } -- 2.47.0 From 1e8c7243a16e6bbc6b19c28cb9fe747f91e04eba Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 13 May 2021 22:49:12 +0100 Subject: [PATCH 13/17] formatting --- udp/inbound_flow.go | 1 - 1 file changed, 1 deletion(-) diff --git a/udp/inbound_flow.go b/udp/inbound_flow.go index 768dc44..896fdc3 100644 --- a/udp/inbound_flow.go +++ b/udp/inbound_flow.go @@ -93,7 +93,6 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error { return ctx.Err() } - for i := range f.verifiers { v := f.verifiers[len(f.verifiers)-i-1] -- 2.47.0 From 9258bfec779568686024f13d92db403a544755ef Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 13 May 2021 22:58:05 +0100 Subject: [PATCH 14/17] fixed missing packet queue --- udp/flow_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/udp/flow_test.go b/udp/flow_test.go index 0c61220..ebcc94b 100644 --- a/udp/flow_test.go +++ b/udp/flow_test.go @@ -107,7 +107,9 @@ func TestFlow_Produce(t *testing.T) { flowA.isAlive = true go func() { - _, err := flowA.readPacket(context.Background(), testConn.SideB()) + p, err := flowA.readPacket(context.Background(), testConn.SideB()) + assert.Nil(t, err) + err = flowA.queueDatagram(context.Background(), p) assert.Nil(t, err) }() p, err := flowA.Produce(context.Background()) @@ -143,7 +145,9 @@ func TestFlow_Produce(t *testing.T) { flowA.isAlive = true go func() { - _, err := flowA.readPacket(context.Background(), testConn.SideB()) + p, err := flowA.readPacket(context.Background(), testConn.SideB()) + assert.Nil(t, err) + err = flowA.queueDatagram(context.Background(), p) assert.Nil(t, err) }() p, err := flowA.Produce(context.Background()) -- 2.47.0 From e1241b2cf0e670964a11ceddbae5cb0d7d853435 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 13 May 2021 23:04:00 +0100 Subject: [PATCH 15/17] initialised newreno in tests --- udp/congestion/newreno/newreno_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/udp/congestion/newreno/newreno_test.go b/udp/congestion/newreno/newreno_test.go index 813e400..e771cfd 100644 --- a/udp/congestion/newreno/newreno_test.go +++ b/udp/congestion/newreno/newreno_test.go @@ -21,8 +21,8 @@ type newRenoTest struct { halfRtt time.Duration } -func newNewRenoTest(rtt time.Duration) *newRenoTest { - return &newRenoTest{ +func newNewRenoTest(ctx context.Context, rtt time.Duration) *newRenoTest { + nr := &newRenoTest{ sideA: NewNewReno(), sideB: NewNewReno(), @@ -34,6 +34,12 @@ func newNewRenoTest(rtt time.Duration) *newRenoTest { halfRtt: rtt / 2, } + + p, _ := nr.sideA.Initial(ctx) + p, _, _ = nr.sideB.Handle(ctx, p) + p, _, _ = nr.sideA.Handle(ctx, p) + + return nr } func (n *newRenoTest) Start(ctx context.Context) { @@ -151,7 +157,7 @@ func TestNewReno_Congestion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := newNewRenoTest(rtt) + c := newNewRenoTest(ctx, rtt) c.Start(ctx) c.RunSideA(ctx) c.RunSideB(ctx) @@ -189,7 +195,7 @@ func TestNewReno_Congestion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := newNewRenoTest(rtt) + c := newNewRenoTest(ctx, rtt) c.Start(ctx) c.RunSideA(ctx) c.RunSideB(ctx) @@ -233,7 +239,7 @@ func TestNewReno_Congestion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := newNewRenoTest(rtt) + c := newNewRenoTest(ctx, rtt) c.Start(ctx) c.RunSideA(ctx) c.RunSideB(ctx) @@ -293,7 +299,7 @@ func TestNewReno_Congestion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c := newNewRenoTest(rtt) + c := newNewRenoTest(ctx, rtt) c.Start(ctx) c.RunSideA(ctx) c.RunSideB(ctx) -- 2.47.0 From c81793029417808fe5ec34168407251381423db0 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 13 May 2021 23:25:09 +0100 Subject: [PATCH 16/17] completed new reno test with exchange --- udp/congestion/newreno/newreno_test.go | 53 +++++++++++++++++--------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/udp/congestion/newreno/newreno_test.go b/udp/congestion/newreno/newreno_test.go index e771cfd..dc5fc30 100644 --- a/udp/congestion/newreno/newreno_test.go +++ b/udp/congestion/newreno/newreno_test.go @@ -39,6 +39,9 @@ func newNewRenoTest(ctx context.Context, rtt time.Duration) *newRenoTest { p, _, _ = nr.sideB.Handle(ctx, p) p, _, _ = nr.sideA.Handle(ctx, p) + nr.sideB.ReceivedPacket(0, nr.sideA.NextAck(), nr.sideA.NextNack()) + nr.sideA.ReceivedPacket(0, nr.sideB.NextAck(), nr.sideB.NextNack()) + return nr } @@ -162,6 +165,9 @@ func TestNewReno_Congestion(t *testing.T) { c.RunSideA(ctx) c.RunSideB(ctx) + sideAinitialAck := c.sideA.ack + sideBinitialAck := c.sideB.ack + // ACT for i := 0; i < numPackets; i++ { // sleep to simulate preparing packet @@ -181,10 +187,10 @@ func TestNewReno_Congestion(t *testing.T) { // ASSERT assert.Equal(t, uint32(0), c.sideA.nack) - assert.Equal(t, uint32(0), c.sideA.ack) + assert.Equal(t, sideAinitialAck, c.sideA.ack) assert.Equal(t, uint32(0), c.sideB.nack) - assert.Equal(t, uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) }) t.Run("SequenceLoss", func(t *testing.T) { @@ -200,13 +206,16 @@ func TestNewReno_Congestion(t *testing.T) { c.RunSideA(ctx) c.RunSideB(ctx) + sideAinitialAck := c.sideA.ack + sideBinitialAck := c.sideB.ack + // ACT - for i := 0; i < numPackets; i++ { + for i := 1; i <= numPackets; i++ { // sleep to simulate preparing packet time.Sleep(1 * time.Millisecond) seq, _ := c.sideA.Sequence(ctx) - if seq == 20 { + if i == 20 { // Simulate packet loss of sequence 20 continue } @@ -223,10 +232,10 @@ func TestNewReno_Congestion(t *testing.T) { // ASSERT assert.Equal(t, uint32(0), c.sideA.nack) - assert.Equal(t, uint32(0), c.sideA.ack) + assert.Equal(t, sideAinitialAck, c.sideA.ack) - assert.Equal(t, uint32(20), c.sideB.nack) - assert.Equal(t, uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck + uint32(20), c.sideB.nack) + assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) }) }) @@ -244,11 +253,14 @@ func TestNewReno_Congestion(t *testing.T) { c.RunSideA(ctx) c.RunSideB(ctx) + sideAinitialAck := c.sideA.ack + sideBinitialAck := c.sideB.ack + // ACT done := make(chan struct{}) go func() { - for i := 0; i < numPackets; i++ { + for i := 1; i <= numPackets; i++ { time.Sleep(1 * time.Millisecond) seq, _ := c.sideA.Sequence(ctx) @@ -263,7 +275,7 @@ func TestNewReno_Congestion(t *testing.T) { }() go func() { - for i := 0; i < numPackets; i++ { + for i := 1; i <= numPackets; i++ { time.Sleep(1 * time.Millisecond) seq, _ := c.sideB.Sequence(ctx) @@ -285,10 +297,10 @@ func TestNewReno_Congestion(t *testing.T) { // ASSERT assert.Equal(t, uint32(0), c.sideA.nack) - assert.Equal(t, uint32(numPackets), c.sideA.ack) + assert.Equal(t, sideAinitialAck + uint32(numPackets), c.sideA.ack) assert.Equal(t, uint32(0), c.sideB.nack) - assert.Equal(t, uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) }) t.Run("SequenceLoss", func(t *testing.T) { @@ -304,15 +316,18 @@ func TestNewReno_Congestion(t *testing.T) { c.RunSideA(ctx) c.RunSideB(ctx) + sideAinitialAck := c.sideA.ack + sideBinitialAck := c.sideB.ack + // ACT done := make(chan struct{}) go func() { - for i := 0; i < numPackets; i++ { + for i := 1; i <= numPackets; i++ { time.Sleep(1 * time.Millisecond) seq, _ := c.sideA.Sequence(ctx) - if seq == 9 { + if i == 9 { // Simulate packet loss of sequence 9 continue } @@ -328,11 +343,11 @@ func TestNewReno_Congestion(t *testing.T) { }() go func() { - for i := 0; i < numPackets; i++ { + for i := 1; i <= numPackets; i++ { time.Sleep(1 * time.Millisecond) seq, _ := c.sideB.Sequence(ctx) - if seq == 13 { + if i == 13 { // Simulate packet loss of sequence 13 continue } @@ -354,11 +369,11 @@ func TestNewReno_Congestion(t *testing.T) { // ASSERT - assert.Equal(t, uint32(13), c.sideA.nack) - assert.Equal(t, uint32(numPackets), c.sideA.ack) + assert.Equal(t, sideAinitialAck + uint32(13), c.sideA.nack) + assert.Equal(t, sideAinitialAck + uint32(numPackets), c.sideA.ack) - assert.Equal(t, uint32(9), c.sideB.nack) - assert.Equal(t, uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck + uint32(9), c.sideB.nack) + assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) }) }) } -- 2.47.0 From 259d9ad3a88289b687ea0e5215fbd3616dcb90ca Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 13 May 2021 23:25:28 +0100 Subject: [PATCH 17/17] formatting --- udp/congestion/newreno/newreno_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/udp/congestion/newreno/newreno_test.go b/udp/congestion/newreno/newreno_test.go index dc5fc30..5ebe91a 100644 --- a/udp/congestion/newreno/newreno_test.go +++ b/udp/congestion/newreno/newreno_test.go @@ -190,7 +190,7 @@ func TestNewReno_Congestion(t *testing.T) { assert.Equal(t, sideAinitialAck, c.sideA.ack) assert.Equal(t, uint32(0), c.sideB.nack) - assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck+uint32(numPackets), c.sideB.ack) }) t.Run("SequenceLoss", func(t *testing.T) { @@ -234,8 +234,8 @@ func TestNewReno_Congestion(t *testing.T) { assert.Equal(t, uint32(0), c.sideA.nack) assert.Equal(t, sideAinitialAck, c.sideA.ack) - assert.Equal(t, sideBinitialAck + uint32(20), c.sideB.nack) - assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck+uint32(20), c.sideB.nack) + assert.Equal(t, sideBinitialAck+uint32(numPackets), c.sideB.ack) }) }) @@ -297,10 +297,10 @@ func TestNewReno_Congestion(t *testing.T) { // ASSERT assert.Equal(t, uint32(0), c.sideA.nack) - assert.Equal(t, sideAinitialAck + uint32(numPackets), c.sideA.ack) + assert.Equal(t, sideAinitialAck+uint32(numPackets), c.sideA.ack) assert.Equal(t, uint32(0), c.sideB.nack) - assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck+uint32(numPackets), c.sideB.ack) }) t.Run("SequenceLoss", func(t *testing.T) { @@ -369,11 +369,11 @@ func TestNewReno_Congestion(t *testing.T) { // ASSERT - assert.Equal(t, sideAinitialAck + uint32(13), c.sideA.nack) - assert.Equal(t, sideAinitialAck + uint32(numPackets), c.sideA.ack) + assert.Equal(t, sideAinitialAck+uint32(13), c.sideA.nack) + assert.Equal(t, sideAinitialAck+uint32(numPackets), c.sideA.ack) - assert.Equal(t, sideBinitialAck + uint32(9), c.sideB.nack) - assert.Equal(t, sideBinitialAck + uint32(numPackets), c.sideB.ack) + assert.Equal(t, sideBinitialAck+uint32(9), c.sideB.nack) + assert.Equal(t, sideBinitialAck+uint32(numPackets), c.sideB.ack) }) }) } -- 2.47.0