dissertation-2-code/tcp/receiver.go

98 lines
1.4 KiB
Go

package tcp
import (
"log"
"mpbl3p/proxy"
"mpbl3p/utils"
"net"
)
type Receiver struct {
flows map[int]Flow
inPackets chan []byte
outPackets chan []byte
}
func NewReceiver(local string) (*Receiver, error) {
r := Receiver{flows: make(map[int]Flow)}
laddr, err := net.ResolveTCPAddr("tcp", local)
if err != nil {
return nil, err
}
listener, err := net.ListenTCP("tcp", laddr)
if err != nil {
return nil, err
}
go func() {
for {
conn, err := listener.AcceptTCP()
if err != nil {
panic(err)
}
r.addFlow(newFlowConn(conn))
}
}()
return &r, nil
}
func (r *Receiver) IsAlive() bool {
return true
}
func (r *Receiver) Consume(p proxy.Packet, g proxy.MacGenerator) error {
r.inPackets <- p.Marshal(g)
return nil
}
func (r *Receiver) Produce(v proxy.MacVerifier) (proxy.Packet, error) {
return proxy.UnmarshalPacket(<-r.outPackets, v)
}
func (r *Receiver) addFlow(flow Flow) {
i := <-utils.NextId
r.flows[i] = flow
isAlive := make(chan bool)
notAlive := func() {
select {
case isAlive <- false:
default:
}
}
go func() {
for flow.IsAlive() {
d, err := flow.produceMarshalled()
if err != nil {
log.Println(err)
continue
}
r.inPackets <- d
}
notAlive()
}()
go func() {
for flow.IsAlive() {
d := <-r.outPackets
err := flow.consumeMarshalled(d)
if err != nil {
log.Println(err)
// continue
}
}
notAlive()
}()
go func() {
<-isAlive
delete(r.flows, i)
}()
}