package proxy import ( "context" "errors" "log" "time" ) type Producer interface { IsAlive() bool Produce(context.Context, MacVerifier) (Packet, error) } type Consumer interface { IsAlive() bool Consume(context.Context, Packet, MacGenerator) error } type Reconnectable interface { Reconnect(context.Context) error } type Source interface { Source() (Packet, error) } type Sink interface { Sink(packet Packet) error } type Proxy struct { Source Source Sink Sink proxyChan chan Packet sinkChan chan Packet } func NewProxy(bufferSize int) *Proxy { return &Proxy{ proxyChan: make(chan Packet, bufferSize), sinkChan: make(chan Packet, bufferSize), } } func (p Proxy) Start() { go func() { for { if packet, err := p.Source.Source(); err != nil { panic(err) return } else { p.proxyChan <- packet } } }() go func() { for { packet := <-p.sinkChan if err := p.Sink.Sink(packet); err != nil { panic(err) return } } }() } func (p Proxy) AddConsumer(ctx context.Context, c Consumer, g MacGenerator) { go func() { _, reconnectable := c.(Reconnectable) for once := true; reconnectable || once; once = false { if reconnectable { var err error for once := true; err != nil || once; once = false { log.Printf("attempting to connect consumer `%v`\n", c) err = c.(Reconnectable).Reconnect(ctx) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { log.Printf("closed consumer `%v` (context)\n", c) return } if !once { time.Sleep(time.Second) } } log.Printf("connected consumer `%v`\n", c) } for c.IsAlive() { select { case <-ctx.Done(): log.Printf("closed consumer `%v` (context)\n", c) return case packet := <-p.proxyChan: if err := c.Consume(ctx, packet, g); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { log.Printf("closed consumer `%v` (context)\n", c) return } log.Println(err) break } } } } log.Printf("closed consumer `%v`\n", c) }() } func (p Proxy) AddProducer(ctx context.Context, pr Producer, v MacVerifier) { go func() { _, reconnectable := pr.(Reconnectable) for once := true; reconnectable || once; once = false { if reconnectable { var err error for once := true; err != nil || once; once = false { log.Printf("attempting to connect producer `%v`\n", pr) err = pr.(Reconnectable).Reconnect(ctx) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { log.Printf("closed producer `%v` (context)\n", pr) return } if !once { time.Sleep(time.Second) } if ctx.Err() != nil { return } } log.Printf("connected producer `%v`\n", pr) } for pr.IsAlive() { if packet, err := pr.Produce(ctx, v); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { log.Printf("closed producer `%v` (context)\n", pr) return } log.Println(err) break } else { select { case <-ctx.Done(): log.Printf("closed producer `%v` (context)\n", pr) return case p.sinkChan <- packet: } } } } log.Printf("closed producer `%v`\n", pr) }() }