diff --git a/quic/v3/metrics.go b/quic/v3/metrics.go index e51e3683..42135268 100644 --- a/quic/v3/metrics.go +++ b/quic/v3/metrics.go @@ -9,28 +9,59 @@ import ( ) const ( - namespace = "cloudflared" - subsystem = "udp" + namespace = "cloudflared" + subsystem_udp = "udp" + subsystem_icmp = "icmp" commandMetricLabel = "command" + reasonMetricLabel = "reason" ) +type DroppedReason int + +const ( + DroppedWriteFailed DroppedReason = iota + DroppedWriteDeadlineExceeded + DroppedWriteFull + DroppedWriteFlowUnknown + DroppedReadFailed + // Origin payloads that are too large to proxy. + DroppedReadTooLarge +) + +var droppedReason = map[DroppedReason]string{ + DroppedWriteFailed: "write_failed", + DroppedWriteDeadlineExceeded: "write_deadline_exceeded", + DroppedWriteFull: "write_full", + DroppedWriteFlowUnknown: "write_flow_unknown", + DroppedReadFailed: "read_failed", + DroppedReadTooLarge: "read_too_large", +} + +func (dr DroppedReason) String() string { + return droppedReason[dr] +} + type Metrics interface { IncrementFlows(connIndex uint8) DecrementFlows(connIndex uint8) - PayloadTooLarge(connIndex uint8) + FailedFlow(connIndex uint8) RetryFlowResponse(connIndex uint8) MigrateFlow(connIndex uint8) UnsupportedRemoteCommand(connIndex uint8, command string) + DroppedUDPDatagram(connIndex uint8, reason DroppedReason) + DroppedICMPPackets(connIndex uint8, reason DroppedReason) } type metrics struct { activeUDPFlows *prometheus.GaugeVec totalUDPFlows *prometheus.CounterVec - payloadTooLarge *prometheus.CounterVec retryFlowResponses *prometheus.CounterVec migratedFlows *prometheus.CounterVec unsupportedRemoteCommands *prometheus.CounterVec + droppedUDPDatagrams *prometheus.CounterVec + droppedICMPPackets *prometheus.CounterVec + failedFlows *prometheus.CounterVec } func (m *metrics) IncrementFlows(connIndex uint8) { @@ -42,8 +73,8 @@ func (m *metrics) DecrementFlows(connIndex uint8) { m.activeUDPFlows.WithLabelValues(fmt.Sprintf("%d", connIndex)).Dec() } -func (m *metrics) PayloadTooLarge(connIndex uint8) { - m.payloadTooLarge.WithLabelValues(fmt.Sprintf("%d", connIndex)).Inc() +func (m *metrics) FailedFlow(connIndex uint8) { + m.failedFlows.WithLabelValues(fmt.Sprintf("%d", connIndex)).Inc() } func (m *metrics) RetryFlowResponse(connIndex uint8) { @@ -58,52 +89,74 @@ func (m *metrics) UnsupportedRemoteCommand(connIndex uint8, command string) { m.unsupportedRemoteCommands.WithLabelValues(fmt.Sprintf("%d", connIndex), command).Inc() } +func (m *metrics) DroppedUDPDatagram(connIndex uint8, reason DroppedReason) { + m.droppedUDPDatagrams.WithLabelValues(fmt.Sprintf("%d", connIndex), reason.String()).Inc() +} + +func (m *metrics) DroppedICMPPackets(connIndex uint8, reason DroppedReason) { + m.droppedICMPPackets.WithLabelValues(fmt.Sprintf("%d", connIndex), reason.String()).Inc() +} + func NewMetrics(registerer prometheus.Registerer) Metrics { m := &metrics{ activeUDPFlows: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, - Subsystem: subsystem, + Subsystem: subsystem_udp, Name: "active_flows", Help: "Concurrent count of UDP flows that are being proxied to any origin", }, []string{quic.ConnectionIndexMetricLabel}), totalUDPFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, - Subsystem: subsystem, + Subsystem: subsystem_udp, Name: "total_flows", Help: "Total count of UDP flows that have been proxied to any origin", }, []string{quic.ConnectionIndexMetricLabel}), - payloadTooLarge: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter + failedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, - Subsystem: subsystem, - Name: "payload_too_large", - Help: "Total count of UDP flows that have had origin payloads that are too large to proxy", + Subsystem: subsystem_udp, + Name: "failed_flows", + Help: "Total count of flows that errored and closed", }, []string{quic.ConnectionIndexMetricLabel}), retryFlowResponses: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, - Subsystem: subsystem, + Subsystem: subsystem_udp, Name: "retry_flow_responses", Help: "Total count of UDP flows that have had to send their registration response more than once", }, []string{quic.ConnectionIndexMetricLabel}), migratedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, - Subsystem: subsystem, + Subsystem: subsystem_udp, Name: "migrated_flows", Help: "Total count of UDP flows have been migrated across local connections", }, []string{quic.ConnectionIndexMetricLabel}), - unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{ + unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, - Subsystem: subsystem, + Subsystem: subsystem_udp, Name: "unsupported_remote_command_total", - Help: "Total count of unsupported remote RPC commands for the ", + Help: "Total count of unsupported remote RPC commands called", }, []string{quic.ConnectionIndexMetricLabel, commandMetricLabel}), + droppedUDPDatagrams: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter + Namespace: namespace, + Subsystem: subsystem_udp, + Name: "dropped_datagrams", + Help: "Total count of UDP dropped datagrams", + }, []string{quic.ConnectionIndexMetricLabel, reasonMetricLabel}), + droppedICMPPackets: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter + Namespace: namespace, + Subsystem: subsystem_icmp, + Name: "dropped_packets", + Help: "Total count of ICMP dropped datagrams", + }, []string{quic.ConnectionIndexMetricLabel, reasonMetricLabel}), } registerer.MustRegister( m.activeUDPFlows, m.totalUDPFlows, - m.payloadTooLarge, + m.failedFlows, m.retryFlowResponses, m.migratedFlows, m.unsupportedRemoteCommands, + m.droppedUDPDatagrams, + m.droppedICMPPackets, ) return m } diff --git a/quic/v3/metrics_test.go b/quic/v3/metrics_test.go index 80e815e5..6296ae07 100644 --- a/quic/v3/metrics_test.go +++ b/quic/v3/metrics_test.go @@ -1,10 +1,15 @@ package v3_test +import v3 "github.com/cloudflare/cloudflared/quic/v3" + type noopMetrics struct{} -func (noopMetrics) IncrementFlows(connIndex uint8) {} -func (noopMetrics) DecrementFlows(connIndex uint8) {} -func (noopMetrics) PayloadTooLarge(connIndex uint8) {} -func (noopMetrics) RetryFlowResponse(connIndex uint8) {} -func (noopMetrics) MigrateFlow(connIndex uint8) {} -func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {} +func (noopMetrics) IncrementFlows(connIndex uint8) {} +func (noopMetrics) DecrementFlows(connIndex uint8) {} +func (noopMetrics) FailedFlow(connIndex uint8) {} +func (noopMetrics) PayloadTooLarge(connIndex uint8) {} +func (noopMetrics) RetryFlowResponse(connIndex uint8) {} +func (noopMetrics) MigrateFlow(connIndex uint8) {} +func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {} +func (noopMetrics) DroppedUDPDatagram(connIndex uint8, reason v3.DroppedReason) {} +func (noopMetrics) DroppedICMPPackets(connIndex uint8, reason v3.DroppedReason) {} diff --git a/quic/v3/muxer.go b/quic/v3/muxer.go index 0d0411d1..44ba15d6 100644 --- a/quic/v3/muxer.go +++ b/quic/v3/muxer.go @@ -368,6 +368,7 @@ func (c *datagramConn) handleSessionRegistrationRateLimited(datagram *UDPSession func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) { s, err := c.sessionManager.GetSession(datagram.RequestID) if err != nil { + c.metrics.DroppedUDPDatagram(c.index, DroppedWriteFlowUnknown) logger.Err(err).Msgf("unable to find flow") return } @@ -384,6 +385,7 @@ func (c *datagramConn) handleICMPPacket(datagram *ICMPDatagram) { case c.icmpDatagramChan <- datagram: default: // If the ICMP datagram channel is full, drop any additional incoming. + c.metrics.DroppedICMPPackets(c.index, DroppedWriteFull) c.logger.Warn().Msg("failed to write icmp packet to origin: dropped") } } @@ -413,6 +415,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) { defer c.icmpDecoderPool.Put(cachedDecoder) decoder, ok := cachedDecoder.(*packet.ICMPDecoder) if !ok { + c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed) c.logger.Error().Msg("Could not get ICMPDecoder from the pool. Dropping packet") return } @@ -420,6 +423,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) { icmp, err := decoder.Decode(rawPacket) if err != nil { + c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed) c.logger.Err(err).Msgf("unable to marshal icmp packet") return } @@ -427,6 +431,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) { // If the ICMP packet's TTL is expired, we won't send it to the origin and immediately return a TTL Exceeded Message if icmp.TTL <= 1 { if err := c.SendICMPTTLExceed(icmp, rawPacket); err != nil { + c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed) c.logger.Err(err).Msg("failed to return ICMP TTL exceed error") } return @@ -438,6 +443,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) { // connection context which will have no tracing information available. err = c.icmpRouter.Request(c.conn.Context(), icmp, newPacketResponder(c, c.index)) if err != nil { + c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed) c.logger.Err(err). Str(logSrcKey, icmp.Src.String()). Str(logDstKey, icmp.Dst.String()). diff --git a/quic/v3/session.go b/quic/v3/session.go index f37839fe..6a9b093f 100644 --- a/quic/v3/session.go +++ b/quic/v3/session.go @@ -199,12 +199,12 @@ func (s *session) readLoop() { return } if n < 0 { + s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedReadFailed) s.log.Warn().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was negative and was dropped") continue } if n > maxDatagramPayloadLen { - connectionIndex := s.ConnectionID() - s.metrics.PayloadTooLarge(connectionIndex) + s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedReadTooLarge) s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped") continue } @@ -227,6 +227,7 @@ func (s *session) Write(payload []byte) { select { case s.writeChan <- payload: default: + s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteFull) s.log.Error().Msg("failed to write flow payload to origin: dropped") } } @@ -244,6 +245,7 @@ func (s *session) writeLoop() { if err != nil { // Check if this is a write deadline exceeded to the connection if errors.Is(err, os.ErrDeadlineExceeded) { + s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteDeadlineExceeded) s.log.Warn().Err(err).Msg("flow (write) deadline exceeded: dropping packet") continue } @@ -257,6 +259,7 @@ func (s *session) writeLoop() { } // Write must return a non-nil error if it returns n < len(p). https://pkg.go.dev/io#Writer if n < len(payload) { + s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteFailed) s.log.Err(io.ErrShortWrite).Msg("failed to write the full flow payload to origin") continue } @@ -330,6 +333,7 @@ func (s *session) waitForCloseCondition(ctx context.Context, closeAfterIdle time case reason := <-s.errChan: // Any error returned here is from the read or write loops indicating that it can no longer process datagrams // and as such the session needs to close. + s.metrics.FailedFlow(s.ConnectionID()) return reason case <-checkIdleTimer.C: // The check idle timer will only return after an idle period since the last active