udp #5

Merged
JakeHillion merged 16 commits from udp into develop 2020-11-28 16:53:00 +00:00
15 changed files with 268 additions and 122 deletions
Showing only changes of commit 3ac4d1f618 - Show all commits

View File

@ -1,7 +1,7 @@
manual:
docker run --rm -v /tmp:/tmp -v ${PWD}:/app -w /app golang:1.15-buster go build -o /tmp/mpbl3p
rsync -p /tmp/mpbl3p 10.21.10.3:
rsync -p /tmp/mpbl3p 10.21.10.4:
rsync -p /tmp/mpbl3p 10.21.12.101:
rsync -p /tmp/mpbl3p 10.21.12.102:
manual-bsd:
GOOS=freebsd go build -o /tmp/mpbl3p

View File

@ -5,6 +5,7 @@ import (
"mpbl3p/proxy"
"mpbl3p/tcp"
"mpbl3p/tun"
"mpbl3p/udp"
)
// TODO: Delete this code as soon as an alternative is available
@ -45,6 +46,11 @@ func (c Configuration) Build() (*proxy.Proxy, error) {
if err != nil {
return nil, err
}
case "UDP":
err := buildUdp(p, peer)
if err != nil {
return nil, err
}
}
}
@ -58,10 +64,14 @@ func buildTcp(p *proxy.Proxy, peer Peer) error {
fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort),
)
if err != nil {
return err
}
p.AddConsumer(f)
p.AddProducer(f, UselessMac{})
return err
return nil
}
err := tcp.NewListener(p, fmt.Sprintf("%s:%d", peer.LocalHost, peer.LocalPort), UselessMac{})
@ -71,3 +81,28 @@ func buildTcp(p *proxy.Proxy, peer Peer) error {
return nil
}
func buildUdp(p *proxy.Proxy, peer Peer) error {
if peer.RemoteHost != "" {
f, err := udp.InitiateFlow(
fmt.Sprintf("%s:", peer.LocalHost),
fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort),
)
if err != nil {
return err
}
p.AddConsumer(f)
p.AddProducer(f, UselessMac{})
return nil
}
err := udp.NewListener(p, fmt.Sprintf("%s:%d", peer.LocalHost, peer.LocalPort), UselessMac{})
if err != nil {
return err
}
return nil
}

View File

@ -16,7 +16,7 @@ type Host struct {
type Peer struct {
PublicKey string `validate:"required"`
Method string `validate:"oneof=TCP"`
Method string `validate:"oneof=TCP,UDP"`
LocalHost string `validate:"omitempty,ip"`
LocalPort uint `validate:"max=65535"`

View File

@ -6,8 +6,7 @@ import (
)
type Packet interface {
Marshal(generator MacGenerator) []byte
Raw() []byte
Marshal() []byte
}
type SimplePacket struct {
@ -24,15 +23,7 @@ func NewSimplePacket(data []byte) Packet {
}
// rebuild a packet from the wrapped format
func UnmarshalPacket(raw []byte, verifier MacVerifier) (Packet, error) {
// the MAC is the last N bytes
data := raw[:len(raw)-verifier.CodeLength()]
sum := raw[len(raw)-verifier.CodeLength():]
if err := verifier.Verify(data, sum); err != nil {
return SimplePacket{}, err
}
func UnmarshalSimplePacket(data []byte) (SimplePacket, error) {
p := SimplePacket{
Data: data[:len(data)-8],
}
@ -44,22 +35,28 @@ func UnmarshalPacket(raw []byte, verifier MacVerifier) (Packet, error) {
}
// get the raw data of the IP packet
func (p SimplePacket) Raw() []byte {
return p.Data
}
// produce the wrapped format of a packet
func (p SimplePacket) Marshal(generator MacGenerator) []byte {
// length of data + length of timestamp (8 byte) + length of checksum
slice := make([]byte, len(p.Data)+8+generator.CodeLength())
copy(slice, p.Data)
func (p SimplePacket) Marshal() []byte {
footer := make([]byte, 8)
unixTime := uint64(p.timestamp.Unix())
binary.LittleEndian.PutUint64(slice[len(p.Data):], unixTime)
binary.LittleEndian.PutUint64(footer, unixTime)
mac := generator.Generate(slice)
copy(slice[len(p.Data)+8:], mac)
return slice
return append(p.Data, footer...)
}
func AppendMac(b []byte, g MacGenerator) []byte {
mac := g.Generate(b)
b = append(b, mac...)
return b
}
func StripMac(b []byte, v MacVerifier) ([]byte, error) {
data := b[:len(b)-v.CodeLength()]
sum := b[len(b)-v.CodeLength():]
if err := v.Verify(data, sum); err != nil {
return nil, err
}
return data, nil
}

View File

@ -26,9 +26,9 @@ func TestUnmarshalPacket(t *testing.T) {
testMarshalled := testPacket.Marshal(testMac)
t.Run("Length", func(t *testing.T) {
p, err := UnmarshalPacket(testMarshalled, testMac)
p, err := UnmarshalSimplePacket(testMarshalled, testMac)
require.Nil(t, err)
assert.Len(t, p.Raw(), len(testContent))
assert.Len(t, p.Marshal(), len(testContent))
})
}

View File

@ -91,7 +91,9 @@ func (f *Flow) Consume(p proxy.Packet, g proxy.MacGenerator) (err error) {
return shared.ErrDeadConnection
}
data := p.Marshal(g)
marshalled := p.Marshal()
data := proxy.AppendMac(marshalled, g)
err = f.consumeMarshalled(data)
if err != nil {
f.isAlive = false
@ -130,7 +132,12 @@ func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
return nil, err
}
return proxy.UnmarshalPacket(data, v)
b, err := proxy.StripMac(data, v)
if err != nil {
return nil, err
}
return proxy.UnmarshalSimplePacket(b)
}
func (f *Flow) produceMarshalled() ([]byte, error) {

View File

@ -48,7 +48,7 @@ func TestFlow_Produce(t *testing.T) {
p, err := flowA.Produce(testMac)
require.Nil(t, err)
assert.Equal(t, len(testContent), len(p.Raw()))
assert.Equal(t, len(testContent), len(p.Marshal()))
})
t.Run("Value", func(t *testing.T) {
@ -61,6 +61,6 @@ func TestFlow_Produce(t *testing.T) {
p, err := flowA.Produce(testMac)
require.Nil(t, err)
assert.Equal(t, testContent, string(p.Raw()))
assert.Equal(t, testContent, string(p.Marshal()))
})
}

View File

@ -79,7 +79,7 @@ func (t *SourceSink) Sink(packet proxy.Packet) error {
t.upMu.Unlock()
}
_, err := t.tun.Write(packet.Raw())
_, err := t.tun.Write(packet.Marshal())
if err != nil {
switch err.(type) {
case *os.PathError:

View File

@ -1,11 +1,12 @@
package udp
type Congestion interface {
Sequence() uint64
Sequence() uint32
ReceivedPacket(seq uint32)
ReceivedAck(uint64)
NextAck() uint64
ReceivedAck(uint32)
NextAck() uint32
ReceivedNack(uint64)
NextNack() uint64
ReceivedNack(uint32)
NextNack() uint32
}

85
udp/congestion/newreno.go Normal file
View File

@ -0,0 +1,85 @@
package congestion
import (
"mpbl3p/utils"
"time"
)
type NewReno struct {
sequence chan uint32
packetTimes map[uint32]time.Time
nextAck uint32
nextNack uint32
fastStart bool
windowSize uint
rtt time.Duration
acksToSend utils.Uint32Heap
}
func NewNewReno() *NewReno {
c := NewReno{
sequence: make(chan uint32),
packetTimes: make(map[uint32]time.Time),
windowSize: 1,
}
go func() {
var s uint32
for {
if s == 0 {
continue
}
c.sequence <- s
s++
}
}()
return &c
}
// It is assumed that ReceivedAck will only be called by one thread
func (c *NewReno) ReceivedAck(ack uint32) {
// RTT
// Update using an exponential average
// GROW
// CASE: exponential. increase window size by one per ack
// CASE: standard. increase window size by one per window of acks
}
// It is assumed that ReceivedNack will only be called by one thread
func (c *NewReno) ReceivedNack(nack uint32) {
// Back off
}
func (c *NewReno) ReceivedPacket(seq uint32) {
c.acksToSend.Insert(seq)
ack, err := c.acksToSend.Extract()
if err != nil {
panic(err)
}
for a, _ := c.acksToSend.Peek(); a == ack+1; {
ack, _ = c.acksToSend.Extract()
}
}
func (c *NewReno) Sequence() uint32 {
s := <-c.sequence
c.packetTimes[s] = time.Now()
return s
}
func (c *NewReno) NextAck() uint32 {
return c.nextAck
}
func (c *NewReno) NextNack() uint32 {
return c.nextNack
}

View File

@ -1,58 +0,0 @@
package congestion
import "time"
type NewReno struct {
sequence chan uint64
packetTimes map[uint64]time.Time
nextAck uint64
nextNack uint64
rtt float64
}
func NewNewReno() *NewReno {
c := NewReno{
sequence: make(chan uint64),
packetTimes: make(map[uint64]time.Time),
}
go func() {
var s uint64
for {
if s == 0 {
continue
}
c.sequence <- s
s++
}
}()
return &c
}
// It is assumed that ReceivedAck will only be called by one thread
func (c *NewReno) ReceivedAck(uint64) {
}
// It is assumed that ReceivedNack will only be called by one thread
func (c *NewReno) ReceivedNack(uint64) {
}
func (c *NewReno) Sequence() uint64 {
s := <-c.sequence
c.packetTimes[s] = time.Now()
return s
}
func (c *NewReno) NextAck() uint64 {
return c.nextAck
}
func (c *NewReno) NextNack() uint64 {
return c.nextNack
}

View File

@ -1,10 +1,9 @@
package udp
import (
"errors"
"mpbl3p/proxy"
"net"
"time"
"sync"
)
type PacketWriter interface {
@ -16,11 +15,12 @@ type PacketConn interface {
ReadFromUDP(b []byte) (int, *net.UDPAddr, error)
}
type Ack struct {
time time.Time
packet uint64
type InitiatedFlow struct {
Local string
Remote string
sent bool
mu sync.RWMutex
Flow
}
type Flow struct {
@ -29,6 +29,31 @@ type Flow struct {
isAlive bool
congestion Congestion
inboundDatagrams chan []byte
}
func newOutboundFlow(c Congestion) *Flow {
return &Flow{
congestion: c,
inboundDatagrams: make(chan []byte),
}
}
func newInboundFlow(c Congestion) *Flow {
return &Flow{
congestion: c,
inboundDatagrams: make(chan []byte),
}
}
func InitiateFlow(local, remote string) (*InitiatedFlow, error) {
f := InitiatedFlow{
Local: local,
Remote: remote,
}
return &f, nil
}
func (f *Flow) IsAlive() bool {
@ -44,14 +69,39 @@ func (f *Flow) Consume(pp proxy.Packet, g proxy.MacGenerator) error {
p.ack = f.congestion.NextAck()
p.nack = f.congestion.NextNack()
return errors.New("not implemented")
b := p.Marshal()
b = proxy.AppendMac(b, g)
_, err := f.writer.WriteToUDP(b, &f.raddr)
return err
}
func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
return proxy.Packet{}, errors.New("not implemented")
b, err := proxy.StripMac(<-f.inboundDatagrams, v)
if err != nil {
return nil, err
}
p, err := UnmarshalPacket(b)
if err != nil {
return nil, err
}
// schedule an ack for this sequence number
f.congestion.ReceivedPacket(p.seq)
// adjust our sending congestion control based on their acks
if p.ack != 0 {
f.congestion.ReceivedAck(p.ack)
}
// adjust our sending congestion control based on their nacks
if p.nack != 0 {
f.congestion.ReceivedNack(p.nack)
}
return p, nil
}
func (f *Flow) handleDatagram(p []byte) {
// TODO: Congestion control - read off the ACKs for the send half
// TODO: Congestion control - schedule ACKs for this side
f.inboundDatagrams <- p
}

View File

@ -1,11 +1,37 @@
package udp
import "mpbl3p/proxy"
import (
"encoding/binary"
"mpbl3p/proxy"
)
type Packet struct {
ack uint64
nack uint64
seq uint64
ack uint32
nack uint32
seq uint32
data proxy.Packet
}
func UnmarshalPacket(b []byte) (p Packet, err error) {
p.ack = binary.LittleEndian.Uint32(b[0:4])
p.nack = binary.LittleEndian.Uint32(b[4:8])
p.seq = binary.LittleEndian.Uint32(b[8:12])
p.data, err = proxy.UnmarshalSimplePacket(b[12:])
if err != nil {
return Packet{}, err
}
return p, nil
}
func (p Packet) Marshal() []byte {
data := p.data.Marshal()
header := make([]byte, 12)
binary.LittleEndian.PutUint32(header[0:4], p.ack)
binary.LittleEndian.PutUint32(header[4:8], p.nack)
binary.LittleEndian.PutUint32(header[8:12], p.seq)
return append(header, data...)
}

View File

@ -5,17 +5,17 @@ import "errors"
var ErrorEmptyHeap = errors.New("attempted to extract from empty heap")
// A MinHeap for Uint64
type Uint64Heap struct {
data []uint64
type Uint32Heap struct {
data []uint32
}
func (h *Uint64Heap) swap(x, y int) {
func (h *Uint32Heap) swap(x, y int) {
h.data[x] = h.data[x] ^ h.data[y]
h.data[y] = h.data[y] ^ h.data[x]
h.data[x] = h.data[x] ^ h.data[y]
}
func (h *Uint64Heap) Insert(new uint64) uint64 {
func (h *Uint32Heap) Insert(new uint32) uint32 {
h.data = append(h.data, new)
child := len(h.data) - 1
@ -32,7 +32,7 @@ func (h *Uint64Heap) Insert(new uint64) uint64 {
return h.data[0]
}
func (h *Uint64Heap) Extract() (uint64, error) {
func (h *Uint32Heap) Extract() (uint32, error) {
if len(h.data) == 0 {
return 0, ErrorEmptyHeap
}
@ -59,6 +59,9 @@ func (h *Uint64Heap) Extract() (uint64, error) {
}
}
func (h *Uint64Heap) Peek() uint64 {
return h.data[0]
func (h *Uint32Heap) Peek() (uint32, error) {
if len(h.data) == 0 {
return 0, ErrorEmptyHeap
}
return h.data[0], nil
}

View File

@ -11,7 +11,7 @@ import (
func SlowHeapSort(in []uint64) []uint64 {
out := make([]uint64, len(in))
var heap Uint64Heap
var heap Uint32Heap
for _, x := range in {
heap.Insert(x)