code changes
This commit is contained in:
parent
7ad99cd814
commit
54ac76ab58
@ -75,12 +75,13 @@ func (p Proxy) AddConsumer(ctx context.Context, c Consumer, g MacGenerator) {
|
||||
if reconnectable {
|
||||
var err error
|
||||
for once := true; err != nil || once; once = false {
|
||||
log.Printf("attempting to connect consumer `%v`\n", c)
|
||||
err = c.(Reconnectable).Reconnect(ctx)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
if err := ctx.Err(); err != nil {
|
||||
log.Printf("closed consumer `%v` (context)\n", c)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("attempting to connect consumer `%v`\n", c)
|
||||
err = c.(Reconnectable).Reconnect(ctx)
|
||||
if !once {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
@ -118,12 +119,13 @@ func (p Proxy) AddProducer(ctx context.Context, pr Producer, v MacVerifier) {
|
||||
if reconnectable {
|
||||
var err error
|
||||
for once := true; err != nil || once; once = false {
|
||||
log.Printf("attempting to connect producer `%v`\n", pr)
|
||||
err = pr.(Reconnectable).Reconnect(ctx)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
if err := ctx.Err(); err != nil {
|
||||
log.Printf("closed producer `%v` (context)\n", pr)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("attempting to connect producer `%v`\n", pr)
|
||||
err = pr.(Reconnectable).Reconnect(ctx)
|
||||
if !once {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
@ -45,23 +45,24 @@ func (c *NewReno) Handle(ctx context.Context, in []byte) (out []byte, data []byt
|
||||
rcvSeq := binary.LittleEndian.Uint32(in[8:12])
|
||||
|
||||
// verify
|
||||
var ack, seq uint32
|
||||
|
||||
if rcvNack != 0 {
|
||||
return nil, nil, shared.ErrBadExchange
|
||||
}
|
||||
|
||||
var seq uint32
|
||||
|
||||
if c.wasInitial {
|
||||
if rcvAck == c.inFlight[0].sequence {
|
||||
ack = rcvSeq
|
||||
c.alive = true
|
||||
c.ack, c.lastAck = rcvSeq, rcvSeq
|
||||
c.alive, c.inFlight = true, nil
|
||||
} else {
|
||||
return nil, nil, shared.ErrBadExchange
|
||||
}
|
||||
} else { // if !c.wasInitial
|
||||
if rcvAck == 0 {
|
||||
// theirs is a syn packet
|
||||
ack = rcvSeq
|
||||
c.ack, c.lastAck = rcvSeq, rcvSeq
|
||||
|
||||
select {
|
||||
case seq = <-c.sequence:
|
||||
case <-ctx.Done():
|
||||
@ -70,8 +71,7 @@ func (c *NewReno) Handle(ctx context.Context, in []byte) (out []byte, data []byt
|
||||
|
||||
c.inFlight = []flightInfo{{time.Now(), seq}}
|
||||
} else if len(c.inFlight) == 1 && rcvAck == c.inFlight[0].sequence {
|
||||
ack = rcvSeq
|
||||
c.alive = true
|
||||
c.alive, c.inFlight = true, nil
|
||||
} else {
|
||||
return nil, nil, shared.ErrBadExchange
|
||||
}
|
||||
@ -79,7 +79,7 @@ func (c *NewReno) Handle(ctx context.Context, in []byte) (out []byte, data []byt
|
||||
|
||||
// respond
|
||||
b := make([]byte, 12)
|
||||
binary.LittleEndian.PutUint32(b[0:4], ack)
|
||||
binary.LittleEndian.PutUint32(b[0:4], c.ack)
|
||||
binary.LittleEndian.PutUint32(b[8:12], seq)
|
||||
|
||||
return b, nil, nil
|
||||
|
@ -40,7 +40,11 @@ func (f *InboundFlow) processPackets(ctx context.Context) {
|
||||
f.mu.Lock()
|
||||
|
||||
var err error
|
||||
for once := true; once || err == nil; once = false {
|
||||
for once := true; err != nil || once; once = false {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = f.handleExchanges(ctx)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
@ -79,7 +83,7 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error {
|
||||
var exchangeData [][]byte
|
||||
|
||||
for _, e := range exchanges {
|
||||
for once := true; once || !e.Complete(); once = false {
|
||||
for once := true; !e.Complete() || once; once = false {
|
||||
if err := func() (err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
@ -91,6 +95,10 @@ func (f *InboundFlow) handleExchanges(ctx context.Context) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if recv, err = proxy.StripMac(recv, f.v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var resp, data []byte
|
||||
if resp, data, err = e.Handle(ctx, recv); err != nil {
|
||||
return err
|
||||
|
@ -16,9 +16,7 @@ type ComparableUdpAddress struct {
|
||||
|
||||
func fromUdpAddress(address net.UDPAddr) ComparableUdpAddress {
|
||||
var ip [16]byte
|
||||
for i, b := range []byte(address.IP) {
|
||||
ip[i] = b
|
||||
}
|
||||
copy(ip[:], address.IP)
|
||||
|
||||
return ComparableUdpAddress{
|
||||
IP: ip,
|
||||
@ -56,9 +54,9 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro
|
||||
}
|
||||
|
||||
raddr := fromUdpAddress(*addr)
|
||||
if f, exists := receivedConnections[raddr]; exists {
|
||||
if fi, exists := receivedConnections[raddr]; exists {
|
||||
log.Println("existing flow. queuing...")
|
||||
if err := f.queueDatagram(ctx, buf[:n]); err != nil {
|
||||
if err := fi.queueDatagram(ctx, buf[:n]); err != nil {
|
||||
log.Println("error")
|
||||
continue
|
||||
}
|
||||
@ -92,7 +90,7 @@ func NewListener(ctx context.Context, p *proxy.Proxy, local string, v func() pro
|
||||
p.AddProducer(ctx, fi, v)
|
||||
|
||||
log.Println("handling...")
|
||||
if err := f.queueDatagram(ctx, buf[:n]); err != nil {
|
||||
if err := fi.queueDatagram(ctx, buf[:n]); err != nil {
|
||||
return
|
||||
}
|
||||
log.Println("handled")
|
||||
|
@ -89,7 +89,7 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for once := true; once || !e.Complete(); once = false {
|
||||
for once := true; !e.Complete() || once; once = false {
|
||||
if err := func() error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
@ -99,6 +99,10 @@ func (f *OutboundFlow) Reconnect(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if recv, err = proxy.StripMac(recv, f.v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var resp, data []byte
|
||||
if resp, data, err = e.Handle(ctx, recv); err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user