diff --git a/ingress/origin_dialer.go b/ingress/origin_dialer.go index 36ade327..98600770 100644 --- a/ingress/origin_dialer.go +++ b/ingress/origin_dialer.go @@ -11,6 +11,8 @@ import ( "github.com/rs/zerolog" ) +const writeDeadlineUDP = 200 * time.Millisecond + // OriginTCPDialer provides a TCP dial operation to a requested address. type OriginTCPDialer interface { DialTCP(ctx context.Context, addr netip.AddrPort) (net.Conn, error) @@ -141,6 +143,21 @@ func (d *Dialer) DialUDP(dest netip.AddrPort) (net.Conn, error) { if err != nil { return nil, fmt.Errorf("unable to dial udp to origin %s: %w", dest, err) } - - return conn, nil + return &writeDeadlineConn{ + Conn: conn, + }, nil +} + +// writeDeadlineConn is a wrapper around a net.Conn that sets a write deadline of 200ms. +// This is to prevent the socket from blocking on the write operation if it were to occur. However, +// we typically never expect this to occur except under high load or kernel issues. +type writeDeadlineConn struct { + net.Conn +} + +func (w *writeDeadlineConn) Write(b []byte) (int, error) { + if err := w.SetWriteDeadline(time.Now().Add(writeDeadlineUDP)); err != nil { + return 0, err + } + return w.Conn.Write(b) } diff --git a/quic/v3/session.go b/quic/v3/session.go index 74a34542..f37839fe 100644 --- a/quic/v3/session.go +++ b/quic/v3/session.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "os" "sync" "sync/atomic" "time" @@ -241,6 +242,11 @@ func (s *session) writeLoop() { case payload := <-s.writeChan: n, err := s.origin.Write(payload) if err != nil { + // Check if this is a write deadline exceeded to the connection + if errors.Is(err, os.ErrDeadlineExceeded) { + s.log.Warn().Err(err).Msg("flow (write) deadline exceeded: dropping packet") + continue + } if isConnectionClosed(err) { s.log.Debug().Msgf("flow (write) connection closed: %v", err) }