TUN-9882: Improve metrics for datagram v3

Adds new metrics for:
- Dropped UDP datagrams for reads and write paths
- Dropped ICMP packets for write paths
- Failures that preemptively close UDP flows

Closes TUN-9882
This commit is contained in:
Devin Carr 2025-10-08 12:17:23 -07:00
parent 51c5ef726c
commit 1cc15c6ffa
4 changed files with 94 additions and 26 deletions

View File

@ -10,27 +10,58 @@ import (
const ( const (
namespace = "cloudflared" namespace = "cloudflared"
subsystem = "udp" subsystem_udp = "udp"
subsystem_icmp = "icmp"
commandMetricLabel = "command" commandMetricLabel = "command"
reasonMetricLabel = "reason"
) )
type DroppedReason int
const (
DroppedWriteFailed DroppedReason = iota
DroppedWriteDeadlineExceeded
DroppedWriteFull
DroppedWriteFlowUnknown
DroppedReadFailed
// Origin payloads that are too large to proxy.
DroppedReadTooLarge
)
var droppedReason = map[DroppedReason]string{
DroppedWriteFailed: "write_failed",
DroppedWriteDeadlineExceeded: "write_deadline_exceeded",
DroppedWriteFull: "write_full",
DroppedWriteFlowUnknown: "write_flow_unknown",
DroppedReadFailed: "read_failed",
DroppedReadTooLarge: "read_too_large",
}
func (dr DroppedReason) String() string {
return droppedReason[dr]
}
type Metrics interface { type Metrics interface {
IncrementFlows(connIndex uint8) IncrementFlows(connIndex uint8)
DecrementFlows(connIndex uint8) DecrementFlows(connIndex uint8)
PayloadTooLarge(connIndex uint8) FailedFlow(connIndex uint8)
RetryFlowResponse(connIndex uint8) RetryFlowResponse(connIndex uint8)
MigrateFlow(connIndex uint8) MigrateFlow(connIndex uint8)
UnsupportedRemoteCommand(connIndex uint8, command string) UnsupportedRemoteCommand(connIndex uint8, command string)
DroppedUDPDatagram(connIndex uint8, reason DroppedReason)
DroppedICMPPackets(connIndex uint8, reason DroppedReason)
} }
type metrics struct { type metrics struct {
activeUDPFlows *prometheus.GaugeVec activeUDPFlows *prometheus.GaugeVec
totalUDPFlows *prometheus.CounterVec totalUDPFlows *prometheus.CounterVec
payloadTooLarge *prometheus.CounterVec
retryFlowResponses *prometheus.CounterVec retryFlowResponses *prometheus.CounterVec
migratedFlows *prometheus.CounterVec migratedFlows *prometheus.CounterVec
unsupportedRemoteCommands *prometheus.CounterVec unsupportedRemoteCommands *prometheus.CounterVec
droppedUDPDatagrams *prometheus.CounterVec
droppedICMPPackets *prometheus.CounterVec
failedFlows *prometheus.CounterVec
} }
func (m *metrics) IncrementFlows(connIndex uint8) { func (m *metrics) IncrementFlows(connIndex uint8) {
@ -42,8 +73,8 @@ func (m *metrics) DecrementFlows(connIndex uint8) {
m.activeUDPFlows.WithLabelValues(fmt.Sprintf("%d", connIndex)).Dec() m.activeUDPFlows.WithLabelValues(fmt.Sprintf("%d", connIndex)).Dec()
} }
func (m *metrics) PayloadTooLarge(connIndex uint8) { func (m *metrics) FailedFlow(connIndex uint8) {
m.payloadTooLarge.WithLabelValues(fmt.Sprintf("%d", connIndex)).Inc() m.failedFlows.WithLabelValues(fmt.Sprintf("%d", connIndex)).Inc()
} }
func (m *metrics) RetryFlowResponse(connIndex uint8) { func (m *metrics) RetryFlowResponse(connIndex uint8) {
@ -58,52 +89,74 @@ func (m *metrics) UnsupportedRemoteCommand(connIndex uint8, command string) {
m.unsupportedRemoteCommands.WithLabelValues(fmt.Sprintf("%d", connIndex), command).Inc() m.unsupportedRemoteCommands.WithLabelValues(fmt.Sprintf("%d", connIndex), command).Inc()
} }
func (m *metrics) DroppedUDPDatagram(connIndex uint8, reason DroppedReason) {
m.droppedUDPDatagrams.WithLabelValues(fmt.Sprintf("%d", connIndex), reason.String()).Inc()
}
func (m *metrics) DroppedICMPPackets(connIndex uint8, reason DroppedReason) {
m.droppedICMPPackets.WithLabelValues(fmt.Sprintf("%d", connIndex), reason.String()).Inc()
}
func NewMetrics(registerer prometheus.Registerer) Metrics { func NewMetrics(registerer prometheus.Registerer) Metrics {
m := &metrics{ m := &metrics{
activeUDPFlows: prometheus.NewGaugeVec(prometheus.GaugeOpts{ activeUDPFlows: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem_udp,
Name: "active_flows", Name: "active_flows",
Help: "Concurrent count of UDP flows that are being proxied to any origin", Help: "Concurrent count of UDP flows that are being proxied to any origin",
}, []string{quic.ConnectionIndexMetricLabel}), }, []string{quic.ConnectionIndexMetricLabel}),
totalUDPFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter totalUDPFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem_udp,
Name: "total_flows", Name: "total_flows",
Help: "Total count of UDP flows that have been proxied to any origin", Help: "Total count of UDP flows that have been proxied to any origin",
}, []string{quic.ConnectionIndexMetricLabel}), }, []string{quic.ConnectionIndexMetricLabel}),
payloadTooLarge: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter failedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem_udp,
Name: "payload_too_large", Name: "failed_flows",
Help: "Total count of UDP flows that have had origin payloads that are too large to proxy", Help: "Total count of flows that errored and closed",
}, []string{quic.ConnectionIndexMetricLabel}), }, []string{quic.ConnectionIndexMetricLabel}),
retryFlowResponses: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter retryFlowResponses: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem_udp,
Name: "retry_flow_responses", Name: "retry_flow_responses",
Help: "Total count of UDP flows that have had to send their registration response more than once", Help: "Total count of UDP flows that have had to send their registration response more than once",
}, []string{quic.ConnectionIndexMetricLabel}), }, []string{quic.ConnectionIndexMetricLabel}),
migratedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter migratedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem_udp,
Name: "migrated_flows", Name: "migrated_flows",
Help: "Total count of UDP flows have been migrated across local connections", Help: "Total count of UDP flows have been migrated across local connections",
}, []string{quic.ConnectionIndexMetricLabel}), }, []string{quic.ConnectionIndexMetricLabel}),
unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{ unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem_udp,
Name: "unsupported_remote_command_total", Name: "unsupported_remote_command_total",
Help: "Total count of unsupported remote RPC commands for the ", Help: "Total count of unsupported remote RPC commands called",
}, []string{quic.ConnectionIndexMetricLabel, commandMetricLabel}), }, []string{quic.ConnectionIndexMetricLabel, commandMetricLabel}),
droppedUDPDatagrams: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: subsystem_udp,
Name: "dropped_datagrams",
Help: "Total count of UDP dropped datagrams",
}, []string{quic.ConnectionIndexMetricLabel, reasonMetricLabel}),
droppedICMPPackets: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: subsystem_icmp,
Name: "dropped_packets",
Help: "Total count of ICMP dropped datagrams",
}, []string{quic.ConnectionIndexMetricLabel, reasonMetricLabel}),
} }
registerer.MustRegister( registerer.MustRegister(
m.activeUDPFlows, m.activeUDPFlows,
m.totalUDPFlows, m.totalUDPFlows,
m.payloadTooLarge, m.failedFlows,
m.retryFlowResponses, m.retryFlowResponses,
m.migratedFlows, m.migratedFlows,
m.unsupportedRemoteCommands, m.unsupportedRemoteCommands,
m.droppedUDPDatagrams,
m.droppedICMPPackets,
) )
return m return m
} }

View File

@ -1,10 +1,15 @@
package v3_test package v3_test
import v3 "github.com/cloudflare/cloudflared/quic/v3"
type noopMetrics struct{} type noopMetrics struct{}
func (noopMetrics) IncrementFlows(connIndex uint8) {} func (noopMetrics) IncrementFlows(connIndex uint8) {}
func (noopMetrics) DecrementFlows(connIndex uint8) {} func (noopMetrics) DecrementFlows(connIndex uint8) {}
func (noopMetrics) FailedFlow(connIndex uint8) {}
func (noopMetrics) PayloadTooLarge(connIndex uint8) {} func (noopMetrics) PayloadTooLarge(connIndex uint8) {}
func (noopMetrics) RetryFlowResponse(connIndex uint8) {} func (noopMetrics) RetryFlowResponse(connIndex uint8) {}
func (noopMetrics) MigrateFlow(connIndex uint8) {} func (noopMetrics) MigrateFlow(connIndex uint8) {}
func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {} func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {}
func (noopMetrics) DroppedUDPDatagram(connIndex uint8, reason v3.DroppedReason) {}
func (noopMetrics) DroppedICMPPackets(connIndex uint8, reason v3.DroppedReason) {}

View File

@ -368,6 +368,7 @@ func (c *datagramConn) handleSessionRegistrationRateLimited(datagram *UDPSession
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) { func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) {
s, err := c.sessionManager.GetSession(datagram.RequestID) s, err := c.sessionManager.GetSession(datagram.RequestID)
if err != nil { if err != nil {
c.metrics.DroppedUDPDatagram(c.index, DroppedWriteFlowUnknown)
logger.Err(err).Msgf("unable to find flow") logger.Err(err).Msgf("unable to find flow")
return return
} }
@ -384,6 +385,7 @@ func (c *datagramConn) handleICMPPacket(datagram *ICMPDatagram) {
case c.icmpDatagramChan <- datagram: case c.icmpDatagramChan <- datagram:
default: default:
// If the ICMP datagram channel is full, drop any additional incoming. // If the ICMP datagram channel is full, drop any additional incoming.
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFull)
c.logger.Warn().Msg("failed to write icmp packet to origin: dropped") c.logger.Warn().Msg("failed to write icmp packet to origin: dropped")
} }
} }
@ -413,6 +415,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) {
defer c.icmpDecoderPool.Put(cachedDecoder) defer c.icmpDecoderPool.Put(cachedDecoder)
decoder, ok := cachedDecoder.(*packet.ICMPDecoder) decoder, ok := cachedDecoder.(*packet.ICMPDecoder)
if !ok { if !ok {
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
c.logger.Error().Msg("Could not get ICMPDecoder from the pool. Dropping packet") c.logger.Error().Msg("Could not get ICMPDecoder from the pool. Dropping packet")
return return
} }
@ -420,6 +423,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) {
icmp, err := decoder.Decode(rawPacket) icmp, err := decoder.Decode(rawPacket)
if err != nil { if err != nil {
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
c.logger.Err(err).Msgf("unable to marshal icmp packet") c.logger.Err(err).Msgf("unable to marshal icmp packet")
return return
} }
@ -427,6 +431,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) {
// If the ICMP packet's TTL is expired, we won't send it to the origin and immediately return a TTL Exceeded Message // If the ICMP packet's TTL is expired, we won't send it to the origin and immediately return a TTL Exceeded Message
if icmp.TTL <= 1 { if icmp.TTL <= 1 {
if err := c.SendICMPTTLExceed(icmp, rawPacket); err != nil { if err := c.SendICMPTTLExceed(icmp, rawPacket); err != nil {
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
c.logger.Err(err).Msg("failed to return ICMP TTL exceed error") c.logger.Err(err).Msg("failed to return ICMP TTL exceed error")
} }
return return
@ -438,6 +443,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) {
// connection context which will have no tracing information available. // connection context which will have no tracing information available.
err = c.icmpRouter.Request(c.conn.Context(), icmp, newPacketResponder(c, c.index)) err = c.icmpRouter.Request(c.conn.Context(), icmp, newPacketResponder(c, c.index))
if err != nil { if err != nil {
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
c.logger.Err(err). c.logger.Err(err).
Str(logSrcKey, icmp.Src.String()). Str(logSrcKey, icmp.Src.String()).
Str(logDstKey, icmp.Dst.String()). Str(logDstKey, icmp.Dst.String()).

View File

@ -199,12 +199,12 @@ func (s *session) readLoop() {
return return
} }
if n < 0 { if n < 0 {
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedReadFailed)
s.log.Warn().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was negative and was dropped") s.log.Warn().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was negative and was dropped")
continue continue
} }
if n > maxDatagramPayloadLen { if n > maxDatagramPayloadLen {
connectionIndex := s.ConnectionID() s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedReadTooLarge)
s.metrics.PayloadTooLarge(connectionIndex)
s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped") s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped")
continue continue
} }
@ -227,6 +227,7 @@ func (s *session) Write(payload []byte) {
select { select {
case s.writeChan <- payload: case s.writeChan <- payload:
default: default:
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteFull)
s.log.Error().Msg("failed to write flow payload to origin: dropped") s.log.Error().Msg("failed to write flow payload to origin: dropped")
} }
} }
@ -244,6 +245,7 @@ func (s *session) writeLoop() {
if err != nil { if err != nil {
// Check if this is a write deadline exceeded to the connection // Check if this is a write deadline exceeded to the connection
if errors.Is(err, os.ErrDeadlineExceeded) { if errors.Is(err, os.ErrDeadlineExceeded) {
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteDeadlineExceeded)
s.log.Warn().Err(err).Msg("flow (write) deadline exceeded: dropping packet") s.log.Warn().Err(err).Msg("flow (write) deadline exceeded: dropping packet")
continue continue
} }
@ -257,6 +259,7 @@ func (s *session) writeLoop() {
} }
// Write must return a non-nil error if it returns n < len(p). https://pkg.go.dev/io#Writer // Write must return a non-nil error if it returns n < len(p). https://pkg.go.dev/io#Writer
if n < len(payload) { if n < len(payload) {
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteFailed)
s.log.Err(io.ErrShortWrite).Msg("failed to write the full flow payload to origin") s.log.Err(io.ErrShortWrite).Msg("failed to write the full flow payload to origin")
continue continue
} }
@ -330,6 +333,7 @@ func (s *session) waitForCloseCondition(ctx context.Context, closeAfterIdle time
case reason := <-s.errChan: case reason := <-s.errChan:
// Any error returned here is from the read or write loops indicating that it can no longer process datagrams // Any error returned here is from the read or write loops indicating that it can no longer process datagrams
// and as such the session needs to close. // and as such the session needs to close.
s.metrics.FailedFlow(s.ConnectionID())
return reason return reason
case <-checkIdleTimer.C: case <-checkIdleTimer.C:
// The check idle timer will only return after an idle period since the last active // The check idle timer will only return after an idle period since the last active