Refactored for multiple macverifiers/generators
Some checks failed
continuous-integration/drone/push Build is failing

Tests not fixed yet.
This commit is contained in:
Jake Hillion 2021-05-13 21:56:25 +01:00
parent fa3e4e0d79
commit 75f02802ad
8 changed files with 181 additions and 99 deletions

View File

@ -7,6 +7,7 @@ import (
"mpbl3p/crypto"
"mpbl3p/crypto/sharedkey"
"mpbl3p/proxy"
"mpbl3p/replay"
"mpbl3p/tcp"
"mpbl3p/udp"
"mpbl3p/udp/congestion"
@ -16,13 +17,19 @@ import (
func (c Configuration) Build(ctx context.Context, source proxy.Source, sink proxy.Sink) (*proxy.Proxy, error) {
p := proxy.NewProxy(0)
var g func() proxy.MacGenerator
var v func() proxy.MacVerifier
var gs []func() proxy.MacGenerator
var vs []func() proxy.MacVerifier
if c.Host.ReplayProtection {
rp := replay.NewAntiReplay()
gs = append(gs, func() proxy.MacGenerator { return rp })
vs = append(vs, func() proxy.MacVerifier { return rp })
}
switch c.Host.Crypto {
case "None":
g = func() proxy.MacGenerator { return crypto.None{} }
v = func() proxy.MacVerifier { return crypto.None{} }
gs = append(gs, func() proxy.MacGenerator { return crypto.None{} })
vs = append(vs, func() proxy.MacVerifier { return crypto.None{} })
case "Blake2s":
key, err := base64.StdEncoding.DecodeString(c.Host.SharedKey)
if err != nil {
@ -31,14 +38,14 @@ func (c Configuration) Build(ctx context.Context, source proxy.Source, sink prox
if _, err := sharedkey.NewBlake2s(key); err != nil {
return nil, err
}
g = func() proxy.MacGenerator {
gs = append(gs, func() proxy.MacGenerator {
g, _ := sharedkey.NewBlake2s(key)
return g
}
v = func() proxy.MacVerifier {
})
vs = append(vs, func() proxy.MacVerifier {
v, _ := sharedkey.NewBlake2s(key)
return v
}
})
}
p.Source = source
@ -47,11 +54,11 @@ func (c Configuration) Build(ctx context.Context, source proxy.Source, sink prox
for _, peer := range c.Peers {
switch peer.Method {
case "TCP":
if err := buildTcp(ctx, p, peer, g, v); err != nil {
if err := buildTcp(ctx, p, peer, gs, vs); err != nil {
return nil, err
}
case "UDP":
if err := buildUdp(ctx, p, peer, g, v); err != nil {
if err := buildUdp(ctx, p, peer, gs, vs); err != nil {
return nil, err
}
}
@ -60,7 +67,13 @@ func (c Configuration) Build(ctx context.Context, source proxy.Source, sink prox
return p, nil
}
func buildTcp(ctx context.Context, p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() proxy.MacVerifier) error {
func buildTcp(
ctx context.Context,
p *proxy.Proxy,
peer Peer,
gs []func() proxy.MacGenerator,
vs []func() proxy.MacVerifier,
) error {
var laddr func() string
if peer.LocalPort == 0 {
laddr = func() string { return fmt.Sprintf("%s:", peer.GetLocalHost()) }
@ -69,23 +82,23 @@ func buildTcp(ctx context.Context, p *proxy.Proxy, peer Peer, g func() proxy.Mac
}
if peer.RemoteHost != "" {
f, err := tcp.InitiateFlow(laddr, fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort))
f, err := tcp.InitiateFlow(laddr, fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort), initiateVerifiers(vs), initiateGenerators(gs))
if err != nil {
return err
}
if !peer.DisableConsumer {
p.AddConsumer(ctx, f, g())
p.AddConsumer(ctx, f)
}
if !peer.DisableProducer {
p.AddProducer(ctx, f, v())
p.AddProducer(ctx, f)
}
return nil
}
err := tcp.NewListener(ctx, p, laddr(), v, g, !peer.DisableConsumer, !peer.DisableProducer)
err := tcp.NewListener(ctx, p, laddr(), vs, gs, !peer.DisableConsumer, !peer.DisableProducer)
if err != nil {
return err
}
@ -93,7 +106,13 @@ func buildTcp(ctx context.Context, p *proxy.Proxy, peer Peer, g func() proxy.Mac
return nil
}
func buildUdp(ctx context.Context, p *proxy.Proxy, peer Peer, g func() proxy.MacGenerator, v func() proxy.MacVerifier) error {
func buildUdp(
ctx context.Context,
p *proxy.Proxy,
peer Peer,
gs []func() proxy.MacGenerator,
vs []func() proxy.MacVerifier,
) error {
var laddr func() string
if peer.LocalPort == 0 {
laddr = func() string { return fmt.Sprintf("%s:", peer.GetLocalHost()) }
@ -115,8 +134,8 @@ func buildUdp(ctx context.Context, p *proxy.Proxy, peer Peer, g func() proxy.Mac
f, err := udp.InitiateFlow(
laddr,
fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort),
v(),
g(),
initiateVerifiers(vs),
initiateGenerators(gs),
c(),
time.Duration(peer.KeepAlive)*time.Second,
)
@ -126,19 +145,35 @@ func buildUdp(ctx context.Context, p *proxy.Proxy, peer Peer, g func() proxy.Mac
}
if !peer.DisableConsumer {
p.AddConsumer(ctx, f, g())
p.AddConsumer(ctx, f)
}
if !peer.DisableProducer {
p.AddProducer(ctx, f, v())
p.AddProducer(ctx, f)
}
return nil
}
err := udp.NewListener(ctx, p, laddr(), v, g, c, !peer.DisableConsumer, !peer.DisableProducer)
err := udp.NewListener(ctx, p, laddr(), vs, gs, c, !peer.DisableConsumer, !peer.DisableProducer)
if err != nil {
return err
}
return nil
}
func initiateVerifiers(vs []func()proxy.MacVerifier) (out []proxy.MacVerifier) {
out = make([]proxy.MacVerifier, len(vs))
for i, v := range vs {
out[i] = v()
}
return
}
func initiateGenerators(gs []func()proxy.MacGenerator) (out []proxy.MacGenerator) {
out = make([]proxy.MacGenerator, len(gs))
for i, g := range gs {
out[i] = g()
}
return
}

View File

@ -41,6 +41,7 @@ type Host struct {
Crypto string `validate:"required,oneof=None Blake2s"`
SharedKey string `validate:"required_if=Crypto Blake2s"`
MTU uint `validate:"required,min=576"`
ReplayProtection bool
}
type Peer struct {

View File

@ -1,9 +1,6 @@
package proxy
import (
"encoding/binary"
"time"
)
import "mpbl3p/shared"
type Packet interface {
Marshal() []byte
@ -22,17 +19,15 @@ func (p SimplePacket) Contents() []byte {
}
func AppendMac(b []byte, g MacGenerator) []byte {
footer := make([]byte, 8)
unixTime := uint64(time.Now().Unix())
binary.LittleEndian.PutUint64(footer, unixTime)
b = append(b, footer...)
mac := g.Generate(b)
return append(b, mac...)
}
func StripMac(b []byte, v MacVerifier) ([]byte, error) {
if len(b) < v.CodeLength() {
return nil, shared.ErrNotEnoughBytes
}
data := b[:len(b)-v.CodeLength()]
sum := b[len(b)-v.CodeLength():]
@ -40,7 +35,5 @@ func StripMac(b []byte, v MacVerifier) ([]byte, error) {
return nil, err
}
// TODO: Verify timestamp
return data[:len(data)-8], nil
return data, nil
}

View File

@ -9,12 +9,12 @@ import (
type Producer interface {
IsAlive() bool
Produce(context.Context, MacVerifier) (Packet, error)
Produce(context.Context) (Packet, error)
}
type Consumer interface {
IsAlive() bool
Consume(context.Context, Packet, MacGenerator) error
Consume(context.Context, Packet) error
}
type Reconnectable interface {
@ -67,7 +67,7 @@ func (p Proxy) Start() {
}()
}
func (p Proxy) AddConsumer(ctx context.Context, c Consumer, g MacGenerator) {
func (p Proxy) AddConsumer(ctx context.Context, c Consumer) {
go func() {
_, reconnectable := c.(Reconnectable)
@ -94,7 +94,7 @@ func (p Proxy) AddConsumer(ctx context.Context, c Consumer, g MacGenerator) {
log.Printf("closed consumer `%v` (context)\n", c)
return
case packet := <-p.proxyChan:
if err := c.Consume(ctx, packet, g); err != nil {
if err := c.Consume(ctx, packet); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Printf("closed consumer `%v` (context)\n", c)
return
@ -110,7 +110,7 @@ func (p Proxy) AddConsumer(ctx context.Context, c Consumer, g MacGenerator) {
}()
}
func (p Proxy) AddProducer(ctx context.Context, pr Producer, v MacVerifier) {
func (p Proxy) AddProducer(ctx context.Context, pr Producer) {
go func() {
_, reconnectable := pr.(Reconnectable)
@ -136,7 +136,7 @@ func (p Proxy) AddProducer(ctx context.Context, pr Producer, v MacVerifier) {
}
for pr.IsAlive() {
if packet, err := pr.Produce(ctx, v); err != nil {
if packet, err := pr.Produce(ctx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Printf("closed producer `%v` (context)\n", pr)
return

View File

@ -42,10 +42,16 @@ type Flow struct {
toConsume, produced chan []byte
consumeErrors, produceErrors chan error
generators []proxy.MacGenerator
verifiers []proxy.MacVerifier
}
func NewFlow() Flow {
func NewFlow(vs []proxy.MacVerifier, gs []proxy.MacGenerator) Flow {
return Flow{
verifiers: vs,
generators: gs,
toConsume: make(chan []byte),
produced: make(chan []byte),
consumeErrors: make(chan error),
@ -53,11 +59,14 @@ func NewFlow() Flow {
}
}
func NewFlowConn(ctx context.Context, conn Conn) Flow {
func NewFlowConn(ctx context.Context, conn Conn, vs []proxy.MacVerifier, gs []proxy.MacGenerator) Flow {
f := Flow{
conn: conn,
isAlive: true,
generators: gs,
verifiers: vs,
toConsume: make(chan []byte),
produced: make(chan []byte),
consumeErrors: make(chan error),
@ -78,12 +87,12 @@ func (f *Flow) IsAlive() bool {
return f.isAlive
}
func InitiateFlow(local func() string, remote string) (*InitiatedFlow, error) {
func InitiateFlow(local func() string, remote string, vs []proxy.MacVerifier, gs []proxy.MacGenerator) (*InitiatedFlow, error) {
f := InitiatedFlow{
Local: local,
Remote: remote,
Flow: NewFlow(),
Flow: NewFlow(vs, gs),
}
return &f, nil
@ -125,21 +134,21 @@ func (f *InitiatedFlow) Reconnect(ctx context.Context) error {
return nil
}
func (f *InitiatedFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error {
func (f *InitiatedFlow) Consume(ctx context.Context, p proxy.Packet) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(ctx, p, g)
return f.Flow.Consume(ctx, p)
}
func (f *InitiatedFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) {
func (f *InitiatedFlow) Produce(ctx context.Context) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(ctx, v)
return f.Flow.Produce(ctx)
}
func (f *Flow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error {
func (f *Flow) Consume(ctx context.Context, p proxy.Packet) error {
if !f.isAlive {
return shared.ErrDeadConnection
}
@ -151,8 +160,10 @@ func (f *Flow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator
default:
}
marshalled := p.Marshal()
data := proxy.AppendMac(marshalled, g)
data := p.Marshal()
for _, g := range f.generators {
data = proxy.AppendMac(data, g)
}
prefixedData := make([]byte, len(data)+4)
binary.LittleEndian.PutUint32(prefixedData, uint32(len(data)))
@ -167,7 +178,7 @@ func (f *Flow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator
return nil
}
func (f *Flow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) {
func (f *Flow) Produce(ctx context.Context) (proxy.Packet, error) {
if !f.isAlive {
return nil, shared.ErrDeadConnection
}
@ -183,12 +194,16 @@ func (f *Flow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet,
return nil, err
}
b, err := proxy.StripMac(data, v)
for _, v := range f.verifiers {
var err error
data, err = proxy.StripMac(data, v)
if err != nil {
return nil, err
}
}
return proxy.SimplePacket(b), nil
return proxy.SimplePacket(data), nil
}
func (f *Flow) consumeMarshalled(ctx context.Context) {

View File

@ -7,7 +7,15 @@ import (
"net"
)
func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() proxy.MacVerifier, g func() proxy.MacGenerator, enableConsumers bool, enableProducers bool) error {
func NewListener(
ctx context.Context,
p *proxy.Proxy,
local string,
vs []func() proxy.MacVerifier,
gs []func() proxy.MacGenerator,
enableConsumers bool,
enableProducers bool,
) error {
laddr, err := net.ResolveTCPAddr("tcp", local)
if err != nil {
return err
@ -29,15 +37,24 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro
panic(err)
}
f := NewFlowConn(ctx, conn)
var verifiers = make([]proxy.MacVerifier, len(vs))
for i, v := range vs {
verifiers[i] = v()
}
var generators = make([]proxy.MacGenerator, len(gs))
for i, g := range gs {
generators[i] = g()
}
f := NewFlowConn(ctx, conn, verifiers, generators)
log.Printf("received new tcp connection: %v\n", f)
if enableConsumers {
p.AddConsumer(ctx, &f, g())
p.AddConsumer(ctx, &f)
}
if enableProducers {
p.AddProducer(ctx, &f, v())
p.AddProducer(ctx, &f)
}
}
}()

View File

@ -26,7 +26,6 @@ type InitiatedFlow struct {
Local func() string
Remote string
g proxy.MacGenerator
keepalive time.Duration
mu sync.RWMutex
@ -45,7 +44,8 @@ type Flow struct {
startup bool
congestion Congestion
v proxy.MacVerifier
verifiers []proxy.MacVerifier
generators []proxy.MacGenerator
inboundDatagrams chan []byte
}
@ -57,27 +57,27 @@ func (f Flow) String() string {
func InitiateFlow(
local func() string,
remote string,
v proxy.MacVerifier,
g proxy.MacGenerator,
vs []proxy.MacVerifier,
gs []proxy.MacGenerator,
c Congestion,
keepalive time.Duration,
) (*InitiatedFlow, error) {
f := InitiatedFlow{
Local: local,
Remote: remote,
Flow: newFlow(c, v),
g: g,
Flow: newFlow(c, vs, gs),
keepalive: keepalive,
}
return &f, nil
}
func newFlow(c Congestion, v proxy.MacVerifier) Flow {
func newFlow(c Congestion, vs []proxy.MacVerifier, gs []proxy.MacGenerator) Flow {
return Flow{
inboundDatagrams: make(chan []byte),
congestion: c,
v: v,
verifiers: vs,
generators: gs,
}
}
@ -126,15 +126,15 @@ func (f *InitiatedFlow) Reconnect(ctx context.Context) error {
data: proxy.SimplePacket(nil),
}
_ = f.sendPacket(p, f.g)
_ = f.sendPacket(p)
time.Sleep(1 * time.Second)
}
}()
go func() {
_, _ = f.produceInternal(ctx, f.v, false)
_, _ = f.produceInternal(ctx, false)
}()
go f.earlyUpdateLoop(ctx, f.g, f.keepalive)
go f.earlyUpdateLoop(ctx, f.keepalive)
if err := f.readQueuePacket(ctx, conn); err != nil {
return err
@ -163,25 +163,25 @@ func (f *InitiatedFlow) Reconnect(ctx context.Context) error {
return nil
}
func (f *InitiatedFlow) Consume(ctx context.Context, p proxy.Packet, g proxy.MacGenerator) error {
func (f *InitiatedFlow) Consume(ctx context.Context, p proxy.Packet) error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Consume(ctx, p, g)
return f.Flow.Consume(ctx, p)
}
func (f *InitiatedFlow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) {
func (f *InitiatedFlow) Produce(ctx context.Context) (proxy.Packet, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.Flow.Produce(ctx, v)
return f.Flow.Produce(ctx)
}
func (f *Flow) IsAlive() bool {
return f.isAlive
}
func (f *Flow) Consume(ctx context.Context, pp proxy.Packet, g proxy.MacGenerator) error {
func (f *Flow) Consume(ctx context.Context, pp proxy.Packet) error {
if !f.isAlive {
return shared.ErrDeadConnection
}
@ -204,18 +204,18 @@ func (f *Flow) Consume(ctx context.Context, pp proxy.Packet, g proxy.MacGenerato
nack: f.congestion.NextNack(),
}
return f.sendPacket(p, g)
return f.sendPacket(p)
}
func (f *Flow) Produce(ctx context.Context, v proxy.MacVerifier) (proxy.Packet, error) {
func (f *Flow) Produce(ctx context.Context) (proxy.Packet, error) {
if !f.isAlive {
return nil, shared.ErrDeadConnection
}
return f.produceInternal(ctx, v, true)
return f.produceInternal(ctx, true)
}
func (f *Flow) produceInternal(ctx context.Context, v proxy.MacVerifier, mustReturn bool) (proxy.Packet, error) {
func (f *Flow) produceInternal(ctx context.Context, mustReturn bool) (proxy.Packet, error) {
for once := true; mustReturn || once; once = false {
log.Println(f.congestion)
@ -226,12 +226,15 @@ func (f *Flow) produceInternal(ctx context.Context, v proxy.MacVerifier, mustRet
return nil, ctx.Err()
}
b, err := proxy.StripMac(received, v)
for _, v := range f.verifiers {
var err error
received, err = proxy.StripMac(received, v)
if err != nil {
return nil, err
}
}
p, err := UnmarshalPacket(b)
p, err := UnmarshalPacket(received)
if err != nil {
return nil, err
}
@ -240,7 +243,7 @@ func (f *Flow) produceInternal(ctx context.Context, v proxy.MacVerifier, mustRet
f.congestion.ReceivedPacket(p.seq, p.nack, p.ack)
// 12 bytes for header + the MAC + a timestamp
if len(b) == 12+f.v.CodeLength()+8 {
if len(p.Contents()) == 0 {
log.Println("handled keepalive/ack only packet")
continue
}
@ -260,9 +263,12 @@ func (f *Flow) queueDatagram(ctx context.Context, p []byte) error {
}
}
func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error {
func (f *Flow) sendPacket(p Packet) error {
b := p.Marshal()
for _, g := range f.generators {
b = proxy.AppendMac(b, g)
}
if f.raddr == nil {
_, err := f.writer.Write(b)
@ -273,7 +279,7 @@ func (f *Flow) sendPacket(p Packet, g proxy.MacGenerator) error {
}
}
func (f *Flow) earlyUpdateLoop(ctx context.Context, g proxy.MacGenerator, keepalive time.Duration) {
func (f *Flow) earlyUpdateLoop(ctx context.Context, keepalive time.Duration) {
for f.isAlive {
seq, err := f.congestion.AwaitEarlyUpdate(ctx, keepalive)
if err != nil {
@ -287,7 +293,7 @@ func (f *Flow) earlyUpdateLoop(ctx context.Context, g proxy.MacGenerator, keepal
nack: f.congestion.NextNack(),
}
err = f.sendPacket(p, g)
err = f.sendPacket(p)
if err != nil {
fmt.Printf("error sending early update packet: `%v`\n", err)
}

View File

@ -27,7 +27,16 @@ func fromUdpAddress(address net.UDPAddr) ComparableUdpAddress {
}
}
func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() proxy.MacVerifier, g func() proxy.MacGenerator, c func() Congestion, enableConsumers bool, enableProducers bool) error {
func NewListener(
ctx context.Context,
p *proxy.Proxy,
local string,
vs []func() proxy.MacVerifier,
gs []func() proxy.MacGenerator,
c func() Congestion,
enableConsumers bool,
enableProducers bool,
) error {
laddr, err := net.ResolveUDPAddr("udp", local)
if err != nil {
return err
@ -70,10 +79,16 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro
continue
}
v := v()
g := g()
var verifiers = make([]proxy.MacVerifier, len(vs))
for i, v := range vs {
verifiers[i] = v()
}
var generators = make([]proxy.MacGenerator, len(gs))
for i, g := range gs {
generators[i] = g()
}
f := newFlow(c(), v)
f := newFlow(c(), verifiers, generators)
f.writer = pconn
f.raddr = addr
@ -81,15 +96,15 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro
log.Printf("received new udp connection: %v\n", f)
go f.earlyUpdateLoop(ctx, g, 0)
go f.earlyUpdateLoop(ctx, 0)
receivedConnections[raddr] = &f
if enableConsumers {
p.AddConsumer(ctx, &f, g)
p.AddConsumer(ctx, &f)
}
if enableProducers {
p.AddProducer(ctx, &f, v)
p.AddProducer(ctx, &f)
}
log.Println("handling...")