dissertation-2-code/proxy/proxy.go
Jake Hillion 5363cd76ba
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
Merge branch 'develop' into exchanges
Errors remaining in udp/flow_test.go
2021-05-13 22:46:22 +01:00

162 lines
3.1 KiB
Go

package proxy
import (
"context"
"errors"
"log"
"time"
)
type Producer interface {
IsAlive() bool
Produce(context.Context) (Packet, error)
}
type Consumer interface {
IsAlive() bool
Consume(context.Context, Packet) error
}
type Reconnectable interface {
Reconnect(context.Context) error
}
type Source interface {
Source() (Packet, error)
}
type Sink interface {
Sink(packet Packet) error
}
type Proxy struct {
Source Source
Sink Sink
proxyChan chan Packet
sinkChan chan Packet
}
func NewProxy(bufferSize int) *Proxy {
return &Proxy{
proxyChan: make(chan Packet, bufferSize),
sinkChan: make(chan Packet, bufferSize),
}
}
func (p Proxy) Start() {
go func() {
for {
if packet, err := p.Source.Source(); err != nil {
panic(err)
return
} else {
p.proxyChan <- packet
}
}
}()
go func() {
for {
packet := <-p.sinkChan
if err := p.Sink.Sink(packet); err != nil {
panic(err)
return
}
}
}()
}
func (p Proxy) AddConsumer(ctx context.Context, c Consumer) {
go func() {
_, reconnectable := c.(Reconnectable)
for once := true; reconnectable || once; once = false {
if reconnectable {
var err error
for once := true; err != nil || once; once = false {
if err := ctx.Err(); err != nil {
log.Printf("closed consumer `%v` (context)\n", c)
return
}
log.Printf("attempting to connect consumer `%v`\n", c)
err = c.(Reconnectable).Reconnect(ctx)
if !once {
time.Sleep(time.Second)
}
}
log.Printf("connected consumer `%v`\n", c)
}
for c.IsAlive() {
select {
case <-ctx.Done():
log.Printf("closed consumer `%v` (context)\n", c)
return
case packet := <-p.proxyChan:
if err := c.Consume(ctx, packet); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Printf("closed consumer `%v` (context)\n", c)
return
}
log.Println(err)
break
}
}
}
}
log.Printf("closed consumer `%v`\n", c)
}()
}
func (p Proxy) AddProducer(ctx context.Context, pr Producer) {
go func() {
_, reconnectable := pr.(Reconnectable)
for once := true; reconnectable || once; once = false {
if reconnectable {
var err error
for once := true; err != nil || once; once = false {
if err := ctx.Err(); err != nil {
log.Printf("closed producer `%v` (context)\n", pr)
return
}
log.Printf("attempting to connect producer `%v`\n", pr)
err = pr.(Reconnectable).Reconnect(ctx)
if !once {
time.Sleep(time.Second)
}
if ctx.Err() != nil {
return
}
}
log.Printf("connected producer `%v`\n", pr)
}
for pr.IsAlive() {
if packet, err := pr.Produce(ctx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Printf("closed producer `%v` (context)\n", pr)
return
}
log.Println(err)
break
} else {
select {
case <-ctx.Done():
log.Printf("closed producer `%v` (context)\n", pr)
return
case p.sinkChan <- packet:
}
}
}
}
log.Printf("closed producer `%v`\n", pr)
}()
}