diff --git a/config/builder.go b/config/builder.go index 608c7eb..01d0fba 100644 --- a/config/builder.go +++ b/config/builder.go @@ -54,8 +54,8 @@ func buildTcp(p *proxy.Proxy, peer Peer) error { fmt.Sprintf("%s:%d", peer.RemoteHost, peer.RemotePort), ) - p.AddConsumer(&f) - p.AddProducer(&f, UselessMac{}) + p.AddConsumer(f) + p.AddProducer(f, UselessMac{}) if err != nil { return err diff --git a/proxy/proxy.go b/proxy/proxy.go index 5777bfb..bf8895c 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1,7 +1,9 @@ package proxy import ( + "fmt" "log" + "time" ) type Producer interface { @@ -71,8 +73,14 @@ func (p Proxy) AddConsumer(c Consumer) { _, reconnectable := c.(Reconnectable) for once := true; reconnectable || once; once = false { - if !once { - for err := c.(Reconnectable).Reconnect(); err != nil; {} + if reconnectable { + var err error + for once := true; err != nil || once; once = false { + fmt.Printf("attempting to connect `%v`\n", c) + err = c.(Reconnectable).Reconnect() + time.Sleep(time.Second) + } + fmt.Printf("connected `%v`\n", c) } for c.IsAlive() { @@ -90,8 +98,14 @@ func (p Proxy) AddProducer(pr Producer, v MacVerifier) { _, reconnectable := pr.(Reconnectable) for once := true; reconnectable || once; once = false { - if !once { - for err := pr.(Reconnectable).Reconnect(); err != nil; {} + if reconnectable { + var err error + for once := true; err != nil || once; once = false { + fmt.Printf("attempting to connect `%v`\n", pr) + err = pr.(Reconnectable).Reconnect() + time.Sleep(time.Second) + } + fmt.Printf("connected `%v`\n", pr) } for pr.IsAlive() { diff --git a/tcp/flow.go b/tcp/flow.go index 02c5c82..e6ec94c 100644 --- a/tcp/flow.go +++ b/tcp/flow.go @@ -6,6 +6,7 @@ import ( "io" "mpbl3p/proxy" "net" + "sync" ) var ErrNotEnoughBytes = errors.New("not enough bytes") @@ -19,27 +20,34 @@ type InitiatedFlow struct { Local string Remote string + mu sync.RWMutex + Flow } type Flow struct { - conn Conn + conn Conn + isDead bool } -func InitiateFlow(local, remote string) (InitiatedFlow, error) { +func InitiateFlow(local, remote string) (*InitiatedFlow, error) { f := InitiatedFlow{ Local: local, Remote: remote, + Flow: Flow{isDead: true}, } - if err := f.Reconnect(); err != nil { - return InitiatedFlow{}, err - } - - return f, nil + return &f, nil } func (f *InitiatedFlow) Reconnect() error { + f.mu.Lock() + defer f.mu.Unlock() + + if !f.isDead { + return nil + } + localAddr, err := net.ResolveTCPAddr("tcp", f.Local) if err != nil { return err @@ -56,17 +64,28 @@ func (f *InitiatedFlow) Reconnect() error { return err } + f.isDead = false return nil } func (f *Flow) IsAlive() bool { - // TODO: Implement this - return true + return !f.isDead } -func (f *Flow) Consume(p proxy.Packet, g proxy.MacGenerator) error { +func (f *InitiatedFlow) Consume(p proxy.Packet, g proxy.MacGenerator) error { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Consume(p, g) +} + +func (f *Flow) Consume(p proxy.Packet, g proxy.MacGenerator) (err error) { data := p.Marshal(g) - return f.consumeMarshalled(data) + err = f.consumeMarshalled(data) + if err != nil { + f.isDead = true + } + return } func (f *Flow) consumeMarshalled(data []byte) error { @@ -78,9 +97,17 @@ func (f *Flow) consumeMarshalled(data []byte) error { return err } +func (f *InitiatedFlow) Produce(v proxy.MacVerifier) (proxy.Packet, error) { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.Flow.Produce(v) +} + func (f *Flow) Produce(v proxy.MacVerifier) (proxy.Packet, error) { data, err := f.produceMarshalled() if err != nil { + f.isDead = true return proxy.Packet{}, err } diff --git a/tcp/listener.go b/tcp/listener.go index 82d03b2..22402b0 100644 --- a/tcp/listener.go +++ b/tcp/listener.go @@ -23,7 +23,7 @@ func NewListener(p *proxy.Proxy, local string, v proxy.MacVerifier) error { panic(err) } - f := Flow{conn} + f := Flow{conn: conn} p.AddConsumer(&f) p.AddProducer(&f, v) }