TUN-9322: Add metric for unsupported RPC commands for datagram v3

Additionally adds support for the connection index as a label for the
datagram v3 specific tunnel metrics.

Closes TUN-9322
This commit is contained in:
Devin Carr 2025-05-13 16:11:09 +00:00
parent ce27840573
commit 02705c44b2
10 changed files with 133 additions and 96 deletions

View File

@ -2,11 +2,11 @@ package connection
import (
"context"
"fmt"
"net"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/quic-go/quic-go"
"github.com/rs/zerolog"
@ -16,10 +16,17 @@ import (
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
var (
ErrUnsupportedRPCUDPRegistration = errors.New("datagram v3 does not support RegisterUdpSession RPC")
ErrUnsupportedRPCUDPUnregistration = errors.New("datagram v3 does not support UnregisterUdpSession RPC")
)
type datagramV3Connection struct {
conn quic.Connection
conn quic.Connection
index uint8
// datagramMuxer mux/demux datagrams from quic connection
datagramMuxer cfdquic.DatagramConn
metrics cfdquic.Metrics
logger *zerolog.Logger
}
@ -40,7 +47,9 @@ func NewDatagramV3Connection(ctx context.Context,
return &datagramV3Connection{
conn,
index,
datagramMuxer,
metrics,
logger,
}
}
@ -50,9 +59,11 @@ func (d *datagramV3Connection) Serve(ctx context.Context) error {
}
func (d *datagramV3Connection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
return nil, fmt.Errorf("datagram v3 does not support RegisterUdpSession RPC")
d.metrics.UnsupportedRemoteCommand(d.index, "register_udp_session")
return nil, ErrUnsupportedRPCUDPRegistration
}
func (d *datagramV3Connection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
return fmt.Errorf("datagram v3 does not support UnregisterUdpSession RPC")
d.metrics.UnsupportedRemoteCommand(d.index, "unregister_udp_session")
return ErrUnsupportedRPCUDPUnregistration
}

View File

@ -84,7 +84,7 @@ func (s *Session) waitForCloseCondition(ctx context.Context, closeAfterIdle time
// Closing dstConn cancels read so dstToTransport routine in Serve() can return
defer s.dstConn.Close()
if closeAfterIdle == 0 {
// provide deafult is caller doesn't specify one
// provide default is caller doesn't specify one
closeAfterIdle = defaultCloseIdleAfter
}

View File

@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
@ -54,22 +55,22 @@ func testSessionReturns(t *testing.T, closeBy closeMethod, closeAfterIdle time.D
closedByRemote, err := session.Serve(ctx, closeAfterIdle)
switch closeBy {
case closeByContext:
require.Equal(t, context.Canceled, err)
require.False(t, closedByRemote)
assert.Equal(t, context.Canceled, err)
assert.False(t, closedByRemote)
case closeByCallingClose:
require.Equal(t, localCloseReason, err)
require.Equal(t, localCloseReason.byRemote, closedByRemote)
assert.Equal(t, localCloseReason, err)
assert.Equal(t, localCloseReason.byRemote, closedByRemote)
case closeByTimeout:
require.Equal(t, SessionIdleErr(closeAfterIdle), err)
require.False(t, closedByRemote)
assert.Equal(t, SessionIdleErr(closeAfterIdle), err)
assert.False(t, closedByRemote)
}
close(sessionDone)
}()
go func() {
n, err := session.transportToDst(payload)
require.NoError(t, err)
require.Equal(t, len(payload), n)
assert.NoError(t, err)
assert.Equal(t, len(payload), n)
}()
readBuffer := make([]byte, len(payload)+1)
@ -84,6 +85,8 @@ func testSessionReturns(t *testing.T, closeBy closeMethod, closeAfterIdle time.D
cancel()
case closeByCallingClose:
session.close(localCloseReason)
default:
// ignore
}
<-sessionDone
@ -128,7 +131,7 @@ func testActiveSessionNotClosed(t *testing.T, readFromDst bool, writeToDst bool)
ctx, cancel := context.WithCancel(context.Background())
errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
session.Serve(ctx, closeAfterIdle)
_, _ = session.Serve(ctx, closeAfterIdle)
if time.Now().Before(startTime.Add(activeTime)) {
return fmt.Errorf("session closed while it's still active")
}

View File

@ -11,12 +11,15 @@ import (
)
const (
namespace = "quic"
namespace = "quic"
ConnectionIndexMetricLabel = "conn_index"
frameTypeMetricLabel = "frame_type"
packetTypeMetricLabel = "packet_type"
reasonMetricLabel = "reason"
)
var (
clientConnLabels = []string{"conn_index"}
clientMetrics = struct {
clientMetrics = struct {
totalConnections prometheus.Counter
closedConnections prometheus.Counter
maxUDPPayloadSize *prometheus.GaugeVec
@ -35,7 +38,7 @@ var (
congestionState *prometheus.GaugeVec
}{
totalConnections: prometheus.NewCounter(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "total_connections",
@ -43,7 +46,7 @@ var (
},
),
closedConnections: prometheus.NewCounter(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "closed_connections",
@ -57,70 +60,70 @@ var (
Name: "max_udp_payload",
Help: "Maximum UDP payload size in bytes for a QUIC packet",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
sentFrames: prometheus.NewCounterVec(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "sent_frames",
Help: "Number of frames that have been sent through a connection",
},
append(clientConnLabels, "frame_type"),
[]string{ConnectionIndexMetricLabel, frameTypeMetricLabel},
),
sentBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "sent_bytes",
Help: "Number of bytes that have been sent through a connection",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
receivedFrames: prometheus.NewCounterVec(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "received_frames",
Help: "Number of frames that have been received through a connection",
},
append(clientConnLabels, "frame_type"),
[]string{ConnectionIndexMetricLabel, frameTypeMetricLabel},
),
receivedBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "receive_bytes",
Help: "Number of bytes that have been received through a connection",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
bufferedPackets: prometheus.NewCounterVec(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "buffered_packets",
Help: "Number of bytes that have been buffered on a connection",
},
append(clientConnLabels, "packet_type"),
[]string{ConnectionIndexMetricLabel, packetTypeMetricLabel},
),
droppedPackets: prometheus.NewCounterVec(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "dropped_packets",
Help: "Number of bytes that have been dropped on a connection",
},
append(clientConnLabels, "packet_type", "reason"),
[]string{ConnectionIndexMetricLabel, packetTypeMetricLabel, reasonMetricLabel},
),
lostPackets: prometheus.NewCounterVec(
prometheus.CounterOpts{
prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "lost_packets",
Help: "Number of packets that have been lost from a connection",
},
append(clientConnLabels, "reason"),
[]string{ConnectionIndexMetricLabel, reasonMetricLabel},
),
minRTT: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -129,7 +132,7 @@ var (
Name: "min_rtt",
Help: "Lowest RTT measured on a connection in millisec",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
latestRTT: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -138,7 +141,7 @@ var (
Name: "latest_rtt",
Help: "Latest RTT measured on a connection",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
smoothedRTT: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -147,7 +150,7 @@ var (
Name: "smoothed_rtt",
Help: "Calculated smoothed RTT measured on a connection in millisec",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
mtu: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -156,7 +159,7 @@ var (
Name: "mtu",
Help: "Current maximum transmission unit (MTU) of a connection",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
congestionWindow: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -165,7 +168,7 @@ var (
Name: "congestion_window",
Help: "Current congestion window size",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
congestionState: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@ -174,13 +177,13 @@ var (
Name: "congestion_state",
Help: "Current congestion control state. See https://pkg.go.dev/github.com/quic-go/quic-go@v0.45.0/logging#CongestionState for what each value maps to",
},
clientConnLabels,
[]string{ConnectionIndexMetricLabel},
),
}
registerClient = sync.Once{}
packetTooBigDropped = prometheus.NewCounter(prometheus.CounterOpts{
packetTooBigDropped = prometheus.NewCounter(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: "client",
Name: "packet_too_big_dropped",

View File

@ -2,82 +2,98 @@ package v3
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/cloudflare/cloudflared/quic"
)
const (
namespace = "cloudflared"
subsystem = "udp"
commandMetricLabel = "command"
)
type Metrics interface {
IncrementFlows()
DecrementFlows()
PayloadTooLarge()
RetryFlowResponse()
MigrateFlow()
IncrementFlows(connIndex uint8)
DecrementFlows(connIndex uint8)
PayloadTooLarge(connIndex uint8)
RetryFlowResponse(connIndex uint8)
MigrateFlow(connIndex uint8)
UnsupportedRemoteCommand(connIndex uint8, command string)
}
type metrics struct {
activeUDPFlows prometheus.Gauge
totalUDPFlows prometheus.Counter
payloadTooLarge prometheus.Counter
retryFlowResponses prometheus.Counter
migratedFlows prometheus.Counter
activeUDPFlows *prometheus.GaugeVec
totalUDPFlows *prometheus.CounterVec
payloadTooLarge *prometheus.CounterVec
retryFlowResponses *prometheus.CounterVec
migratedFlows *prometheus.CounterVec
unsupportedRemoteCommands *prometheus.CounterVec
}
func (m *metrics) IncrementFlows() {
m.totalUDPFlows.Inc()
m.activeUDPFlows.Inc()
func (m *metrics) IncrementFlows(connIndex uint8) {
m.totalUDPFlows.WithLabelValues(string(connIndex)).Inc()
m.activeUDPFlows.WithLabelValues(string(connIndex)).Inc()
}
func (m *metrics) DecrementFlows() {
m.activeUDPFlows.Dec()
func (m *metrics) DecrementFlows(connIndex uint8) {
m.activeUDPFlows.WithLabelValues(string(connIndex)).Dec()
}
func (m *metrics) PayloadTooLarge() {
m.payloadTooLarge.Inc()
func (m *metrics) PayloadTooLarge(connIndex uint8) {
m.payloadTooLarge.WithLabelValues(string(connIndex)).Inc()
}
func (m *metrics) RetryFlowResponse() {
m.retryFlowResponses.Inc()
func (m *metrics) RetryFlowResponse(connIndex uint8) {
m.retryFlowResponses.WithLabelValues(string(connIndex)).Inc()
}
func (m *metrics) MigrateFlow() {
m.migratedFlows.Inc()
func (m *metrics) MigrateFlow(connIndex uint8) {
m.migratedFlows.WithLabelValues(string(connIndex)).Inc()
}
func (m *metrics) UnsupportedRemoteCommand(connIndex uint8, command string) {
m.unsupportedRemoteCommands.WithLabelValues(string(connIndex), command).Inc()
}
func NewMetrics(registerer prometheus.Registerer) Metrics {
m := &metrics{
activeUDPFlows: prometheus.NewGauge(prometheus.GaugeOpts{
activeUDPFlows: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "active_flows",
Help: "Concurrent count of UDP flows that are being proxied to any origin",
}),
totalUDPFlows: prometheus.NewCounter(prometheus.CounterOpts{
}, []string{quic.ConnectionIndexMetricLabel}),
totalUDPFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: subsystem,
Name: "total_flows",
Help: "Total count of UDP flows that have been proxied to any origin",
}),
payloadTooLarge: prometheus.NewCounter(prometheus.CounterOpts{
}, []string{quic.ConnectionIndexMetricLabel}),
payloadTooLarge: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: subsystem,
Name: "payload_too_large",
Help: "Total count of UDP flows that have had origin payloads that are too large to proxy",
}),
retryFlowResponses: prometheus.NewCounter(prometheus.CounterOpts{
}, []string{quic.ConnectionIndexMetricLabel}),
retryFlowResponses: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: subsystem,
Name: "retry_flow_responses",
Help: "Total count of UDP flows that have had to send their registration response more than once",
}),
migratedFlows: prometheus.NewCounter(prometheus.CounterOpts{
}, []string{quic.ConnectionIndexMetricLabel}),
migratedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
Namespace: namespace,
Subsystem: subsystem,
Name: "migrated_flows",
Help: "Total count of UDP flows have been migrated across local connections",
}),
}, []string{quic.ConnectionIndexMetricLabel}),
unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "unsupported_remote_command_total",
Help: "Total count of unsupported remote RPC commands for the ",
}, []string{quic.ConnectionIndexMetricLabel, commandMetricLabel}),
}
registerer.MustRegister(
m.activeUDPFlows,
@ -85,6 +101,7 @@ func NewMetrics(registerer prometheus.Registerer) Metrics {
m.payloadTooLarge,
m.retryFlowResponses,
m.migratedFlows,
m.unsupportedRemoteCommands,
)
return m
}

View File

@ -2,8 +2,9 @@ package v3_test
type noopMetrics struct{}
func (noopMetrics) IncrementFlows() {}
func (noopMetrics) DecrementFlows() {}
func (noopMetrics) PayloadTooLarge() {}
func (noopMetrics) RetryFlowResponse() {}
func (noopMetrics) MigrateFlow() {}
func (noopMetrics) IncrementFlows(connIndex uint8) {}
func (noopMetrics) DecrementFlows(connIndex uint8) {}
func (noopMetrics) PayloadTooLarge(connIndex uint8) {}
func (noopMetrics) RetryFlowResponse(connIndex uint8) {}
func (noopMetrics) MigrateFlow(connIndex uint8) {}
func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {}

View File

@ -264,10 +264,10 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
return
}
log = log.With().Str(logSrcKey, session.LocalAddr().String()).Logger()
c.metrics.IncrementFlows()
c.metrics.IncrementFlows(c.index)
// Make sure to eventually remove the session from the session manager when the session is closed
defer c.sessionManager.UnregisterSession(session.ID())
defer c.metrics.DecrementFlows()
defer c.metrics.DecrementFlows(c.index)
// Respond that we are able to process the new session
err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk)
@ -315,7 +315,7 @@ func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logge
// The session is already running in another routine so we want to restart the idle timeout since no proxied
// packets have come down yet.
session.ResetIdleTimer()
c.metrics.RetryFlowResponse()
c.metrics.RetryFlowResponse(c.index)
logger.Debug().Msgf("flow registration response retry")
}

View File

@ -781,12 +781,12 @@ func newICMPDatagram(pk *packet.ICMP) []byte {
// Cancel the provided context and make sure it closes with the expected cancellation error
func assertContextClosed(t *testing.T, ctx context.Context, done <-chan error, cancel context.CancelCauseFunc) {
cancel(expectedContextCanceled)
cancel(errExpectedContextCanceled)
err := <-done
if !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
if !errors.Is(context.Cause(ctx), expectedContextCanceled) {
if !errors.Is(context.Cause(ctx), errExpectedContextCanceled) {
t.Fatal(err)
}
}

View File

@ -27,11 +27,11 @@ const (
)
// SessionCloseErr indicates that the session's Close method was called.
var SessionCloseErr error = errors.New("flow was closed directly")
var SessionCloseErr error = errors.New("flow was closed directly") //nolint:errname
// SessionIdleErr is returned when the session was closed because there was no communication
// in either direction over the session for the timeout period.
type SessionIdleErr struct {
type SessionIdleErr struct { //nolint:errname
timeout time.Duration
}
@ -149,7 +149,8 @@ func (s *session) Migrate(eyeball DatagramConn, ctx context.Context, logger *zer
}
// The session is already running so we want to restart the idle timeout since no proxied packets have come down yet.
s.markActive()
s.metrics.MigrateFlow()
connectionIndex := eyeball.ID()
s.metrics.MigrateFlow(connectionIndex)
}
func (s *session) Serve(ctx context.Context) error {
@ -160,7 +161,7 @@ func (s *session) Serve(ctx context.Context) error {
// To perform a zero copy write when passing the datagram to the connection, we prepare the buffer with
// the required datagram header information. We can reuse this buffer for this session since the header is the
// same for the each read.
MarshalPayloadHeaderTo(s.id, readBuffer[:DatagramPayloadHeaderLen])
_ = MarshalPayloadHeaderTo(s.id, readBuffer[:DatagramPayloadHeaderLen])
for {
// Read from the origin UDP socket
n, err := s.origin.Read(readBuffer[DatagramPayloadHeaderLen:])
@ -177,7 +178,8 @@ func (s *session) Serve(ctx context.Context) error {
continue
}
if n > maxDatagramPayloadLen {
s.metrics.PayloadTooLarge()
connectionIndex := s.ConnectionID()
s.metrics.PayloadTooLarge(connectionIndex)
s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped")
continue
}
@ -241,7 +243,7 @@ func (s *session) waitForCloseCondition(ctx context.Context, closeAfterIdle time
// Closing the session at the end cancels read so Serve() can return
defer s.Close()
if closeAfterIdle == 0 {
// provide deafult is caller doesn't specify one
// Provided that the default caller doesn't specify one
closeAfterIdle = defaultCloseIdleAfter
}

View File

@ -17,7 +17,7 @@ import (
)
var (
expectedContextCanceled = errors.New("expected context canceled")
errExpectedContextCanceled = errors.New("expected context canceled")
testOriginAddr = net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:0"))
testLocalAddr = net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:0"))
@ -40,7 +40,7 @@ func testSessionWrite(t *testing.T, payload []byte) {
serverRead := make(chan []byte, 1)
go func() {
read := make([]byte, 1500)
server.Read(read[:])
_, _ = server.Read(read[:])
serverRead <- read
}()
// Create session and write to origin
@ -110,12 +110,12 @@ func testSessionServe_Origin(t *testing.T, payload []byte) {
case data := <-eyeball.recvData:
// check received data matches provided from origin
expectedData := makePayload(1500)
v3.MarshalPayloadHeaderTo(testRequestID, expectedData[:])
_ = v3.MarshalPayloadHeaderTo(testRequestID, expectedData[:])
copy(expectedData[17:], payload)
if !slices.Equal(expectedData[:v3.DatagramPayloadHeaderLen+len(payload)], data) {
t.Fatal("expected datagram did not equal expected")
}
cancel(expectedContextCanceled)
cancel(errExpectedContextCanceled)
case err := <-ctx.Done():
// we expect the payload to return before the context to cancel on the session
t.Fatal(err)
@ -125,7 +125,7 @@ func testSessionServe_Origin(t *testing.T, payload []byte) {
if !errors.Is(err, context.Canceled) {
t.Fatal(err)
}
if !errors.Is(context.Cause(ctx), expectedContextCanceled) {
if !errors.Is(context.Cause(ctx), errExpectedContextCanceled) {
t.Fatal(err)
}
}
@ -198,7 +198,7 @@ func TestSessionServe_Migrate(t *testing.T) {
// Origin sends data
payload2 := []byte{0xde}
pipe1.Write(payload2)
_, _ = pipe1.Write(payload2)
// Expect write to eyeball2
data := <-eyeball2.recvData
@ -249,13 +249,13 @@ func TestSessionServe_Migrate_CloseContext2(t *testing.T) {
t.Fatalf("expected session to still be running")
default:
}
if context.Cause(eyeball1Ctx) != contextCancelErr {
if !errors.Is(context.Cause(eyeball1Ctx), contextCancelErr) {
t.Fatalf("first eyeball context should be cancelled manually: %+v", context.Cause(eyeball1Ctx))
}
// Origin sends data
payload2 := []byte{0xde}
pipe1.Write(payload2)
_, _ = pipe1.Write(payload2)
// Expect write to eyeball2
data := <-eyeball2.recvData