From 30197e7dfab87e1c16b1a01c09180610c15f078a Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Tue, 28 May 2024 14:14:25 -0700 Subject: [PATCH] TUN-8422: Add metrics for capnp method calls Adds new suite of metrics to capture the following for capnp rpcs operations: - Method calls - Method call failures - Method call latencies Each of the operations is labeled by the handler that serves the method and the method of operation invoked. Additionally, each of these are split between if the operation was called by a client or served. --- connection/metrics.go | 11 -- tunnelrpc/metrics/metrics.go | 143 ++++++++++++++++++++++++ tunnelrpc/pogs/configuration_manager.go | 6 + tunnelrpc/pogs/registration_server.go | 25 ++++- tunnelrpc/pogs/session_manager.go | 9 ++ tunnelrpc/quic/cloudflared_client.go | 31 ++++- tunnelrpc/quic/session_client.go | 21 +++- tunnelrpc/registration_client.go | 29 ++++- 8 files changed, 251 insertions(+), 24 deletions(-) create mode 100644 tunnelrpc/metrics/metrics.go diff --git a/connection/metrics.go b/connection/metrics.go index 385708db..c80bf46a 100644 --- a/connection/metrics.go +++ b/connection/metrics.go @@ -43,7 +43,6 @@ type localConfigMetrics struct { } type tunnelMetrics struct { - timerRetries prometheus.Gauge serverLocations *prometheus.GaugeVec // locationLock is a mutex for oldServerLocations locationLock sync.Mutex @@ -351,15 +350,6 @@ func initTunnelMetrics() *tunnelMetrics { ) prometheus.MustRegister(maxConcurrentRequestsPerTunnel) - timerRetries := prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: MetricsNamespace, - Subsystem: TunnelSubsystem, - Name: "timer_retries", - Help: "Unacknowledged heart beats count", - }) - prometheus.MustRegister(timerRetries) - serverLocations := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: MetricsNamespace, @@ -416,7 +406,6 @@ func initTunnelMetrics() *tunnelMetrics { prometheus.MustRegister(registerSuccess) return &tunnelMetrics{ - timerRetries: timerRetries, serverLocations: serverLocations, oldServerLocations: make(map[string]string), muxerMetrics: newMuxerMetrics(), diff --git a/tunnelrpc/metrics/metrics.go b/tunnelrpc/metrics/metrics.go new file mode 100644 index 00000000..72dbb8aa --- /dev/null +++ b/tunnelrpc/metrics/metrics.go @@ -0,0 +1,143 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + metricsNamespace = "cloudflared" + rpcSubsystem = "rpc" +) + +// CloudflaredServer operation labels +// CloudflaredServer is an extension of SessionManager with additional methods, but it's helpful +// to visualize it separately in the metrics since they are technically different client/servers. +const ( + Cloudflared = "cloudflared" +) + +// ConfigurationManager operation labels +const ( + ConfigurationManager = "config" + + OperationUpdateConfiguration = "update_configuration" +) + +// SessionManager operation labels +const ( + SessionManager = "session" + + OperationRegisterUdpSession = "register_udp_session" + OperationUnregisterUdpSession = "unregister_udp_session" +) + +// RegistrationServer operation labels +const ( + Registration = "registration" + + OperationRegisterConnection = "register_connection" + OperationUnregisterConnection = "unregister_connection" + OperationUpdateLocalConfiguration = "update_local_configuration" +) + +type rpcMetrics struct { + serverOperations *prometheus.CounterVec + serverFailures *prometheus.CounterVec + serverOperationsLatency *prometheus.HistogramVec + + ClientOperations *prometheus.CounterVec + ClientFailures *prometheus.CounterVec + ClientOperationsLatency *prometheus.HistogramVec +} + +var CapnpMetrics *rpcMetrics = &rpcMetrics{ + serverOperations: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "server_operations", + Help: "Number of rpc methods by handler served", + }, + []string{"handler", "method"}, + ), + serverFailures: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "server_failures", + Help: "Number of rpc methods failures by handler served", + }, + []string{"handler", "method"}, + ), + serverOperationsLatency: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "server_latency_secs", + Help: "Latency of rpc methods by handler served", + // Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds: + // 50ms, 150ms, 450ms, 1350ms, 4050ms + Buckets: prometheus.ExponentialBuckets(0.05, 3, 5), + }, + []string{"handler", "method"}, + ), + ClientOperations: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "client_operations", + Help: "Number of rpc methods by handler requested", + }, + []string{"handler", "method"}, + ), + ClientFailures: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "client_failures", + Help: "Number of rpc method failures by handler requested", + }, + []string{"handler", "method"}, + ), + ClientOperationsLatency: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "client_latency_secs", + Help: "Latency of rpc methods by handler requested", + // Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds: + // 50ms, 150ms, 450ms, 1350ms, 4050ms + Buckets: prometheus.ExponentialBuckets(0.05, 3, 5), + }, + []string{"handler", "method"}, + ), +} + +func ObserveServerHandler(inner func() error, handler, method string) error { + defer CapnpMetrics.serverOperations.WithLabelValues(handler, method).Inc() + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) { + CapnpMetrics.serverOperationsLatency.WithLabelValues(handler, method).Observe(s) + })) + defer timer.ObserveDuration() + + err := inner() + if err != nil { + CapnpMetrics.serverFailures.WithLabelValues(handler, method).Inc() + } + return err +} + +func NewClientOperationLatencyObserver(server string, method string) *prometheus.Timer { + return prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) { + CapnpMetrics.ClientOperationsLatency.WithLabelValues(server, method).Observe(s) + })) +} + +func init() { + prometheus.MustRegister(CapnpMetrics.serverOperations) + prometheus.MustRegister(CapnpMetrics.serverFailures) + prometheus.MustRegister(CapnpMetrics.serverOperationsLatency) + prometheus.MustRegister(CapnpMetrics.ClientOperations) + prometheus.MustRegister(CapnpMetrics.ClientFailures) + prometheus.MustRegister(CapnpMetrics.ClientOperationsLatency) +} diff --git a/tunnelrpc/pogs/configuration_manager.go b/tunnelrpc/pogs/configuration_manager.go index 5b99cbe7..4daae802 100644 --- a/tunnelrpc/pogs/configuration_manager.go +++ b/tunnelrpc/pogs/configuration_manager.go @@ -8,10 +8,12 @@ import ( "zombiezen.com/go/capnproto2/rpc" "zombiezen.com/go/capnproto2/server" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/proto" ) type ConfigurationManager interface { + // UpdateConfiguration is the call provided to cloudflared to load the latest remote configuration. UpdateConfiguration(ctx context.Context, version int32, config []byte) *UpdateConfigurationResponse } @@ -24,6 +26,10 @@ func ConfigurationManager_ServerToClient(c ConfigurationManager) proto.Configura } func (i ConfigurationManager_PogsImpl) UpdateConfiguration(p proto.ConfigurationManager_updateConfiguration) error { + return metrics.ObserveServerHandler(func() error { return i.updateConfiguration(p) }, metrics.ConfigurationManager, metrics.OperationUpdateConfiguration) +} + +func (i ConfigurationManager_PogsImpl) updateConfiguration(p proto.ConfigurationManager_updateConfiguration) error { server.Ack(p.Options) version := p.Params.Version() diff --git a/tunnelrpc/pogs/registration_server.go b/tunnelrpc/pogs/registration_server.go index e0f6b1e8..9908c113 100644 --- a/tunnelrpc/pogs/registration_server.go +++ b/tunnelrpc/pogs/registration_server.go @@ -12,12 +12,18 @@ import ( "zombiezen.com/go/capnproto2/rpc" "zombiezen.com/go/capnproto2/server" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/proto" ) type RegistrationServer interface { + // RegisterConnection is the call typically handled by the edge to initiate and authenticate a new connection + // for cloudflared. RegisterConnection(ctx context.Context, auth TunnelAuth, tunnelID uuid.UUID, connIndex byte, options *ConnectionOptions) (*ConnectionDetails, error) + // UnregisterConnection is the call typically handled by the edge to close an existing connection for cloudflared. UnregisterConnection(ctx context.Context) + // UpdateLocalConfiguration is the call typically handled by the edge for cloudflared to provide the current + // configuration it is operating with. UpdateLocalConfiguration(ctx context.Context, config []byte) error } @@ -30,6 +36,10 @@ func RegistrationServer_ServerToClient(s RegistrationServer) proto.RegistrationS } func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServer_registerConnection) error { + return metrics.ObserveServerHandler(func() error { return i.registerConnection(p) }, metrics.Registration, metrics.OperationRegisterConnection) +} + +func (i RegistrationServer_PogsImpl) registerConnection(p proto.RegistrationServer_registerConnection) error { server.Ack(p.Options) auth, err := p.Params.Auth() @@ -83,13 +93,18 @@ func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServ } func (i RegistrationServer_PogsImpl) UnregisterConnection(p proto.RegistrationServer_unregisterConnection) error { - server.Ack(p.Options) - - i.impl.UnregisterConnection(p.Ctx) - return nil + return metrics.ObserveServerHandler(func() error { + server.Ack(p.Options) + i.impl.UnregisterConnection(p.Ctx) + return nil // No metrics will be reported for failure as this method has no return value + }, metrics.Registration, metrics.OperationUnregisterConnection) } -func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error { +func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(p proto.RegistrationServer_updateLocalConfiguration) error { + return metrics.ObserveServerHandler(func() error { return i.updateLocalConfiguration(p) }, metrics.Registration, metrics.OperationUpdateLocalConfiguration) +} + +func (i RegistrationServer_PogsImpl) updateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error { server.Ack(c.Options) configBytes, err := c.Params.Config() diff --git a/tunnelrpc/pogs/session_manager.go b/tunnelrpc/pogs/session_manager.go index 4abea2bf..b5552983 100644 --- a/tunnelrpc/pogs/session_manager.go +++ b/tunnelrpc/pogs/session_manager.go @@ -11,6 +11,7 @@ import ( "zombiezen.com/go/capnproto2/rpc" "zombiezen.com/go/capnproto2/server" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/proto" ) @@ -32,6 +33,10 @@ func SessionManager_ServerToClient(s SessionManager) proto.SessionManager { } func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_registerUdpSession) error { + return metrics.ObserveServerHandler(func() error { return i.registerUdpSession(p) }, metrics.SessionManager, metrics.OperationRegisterUdpSession) +} + +func (i SessionManager_PogsImpl) registerUdpSession(p proto.SessionManager_registerUdpSession) error { server.Ack(p.Options) sessionIDRaw, err := p.Params.SessionId() @@ -78,6 +83,10 @@ func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_regis } func (i SessionManager_PogsImpl) UnregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error { + return metrics.ObserveServerHandler(func() error { return i.unregisterUdpSession(p) }, metrics.SessionManager, metrics.OperationUnregisterUdpSession) +} + +func (i SessionManager_PogsImpl) unregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error { server.Ack(p.Options) sessionIDRaw, err := p.Params.SessionId() diff --git a/tunnelrpc/quic/cloudflared_client.go b/tunnelrpc/quic/cloudflared_client.go index a5397d17..7cefd388 100644 --- a/tunnelrpc/quic/cloudflared_client.go +++ b/tunnelrpc/quic/cloudflared_client.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "github.com/cloudflare/cloudflared/tunnelrpc" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -43,19 +44,43 @@ func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, reques func (c *CloudflaredClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationRegisterUdpSession) + defer timer.ObserveDuration() + + resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc() + } + return resp, err } func (c *CloudflaredClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.UnregisterUdpSession(ctx, sessionID, message) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUnregisterUdpSession) + defer timer.ObserveDuration() + + err := c.client.UnregisterUdpSession(ctx, sessionID, message) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc() + } + return err } func (c *CloudflaredClient) UpdateConfiguration(ctx context.Context, version int32, config []byte) (*pogs.UpdateConfigurationResponse, error) { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.UpdateConfiguration(ctx, version, config) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUpdateConfiguration) + defer timer.ObserveDuration() + + resp, err := c.client.UpdateConfiguration(ctx, version, config) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc() + } + return resp, err } func (c *CloudflaredClient) Close() { diff --git a/tunnelrpc/quic/session_client.go b/tunnelrpc/quic/session_client.go index a8641e43..05c49910 100644 --- a/tunnelrpc/quic/session_client.go +++ b/tunnelrpc/quic/session_client.go @@ -11,6 +11,7 @@ import ( "zombiezen.com/go/capnproto2/rpc" "github.com/cloudflare/cloudflared/tunnelrpc" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -41,13 +42,29 @@ func NewSessionClient(ctx context.Context, stream io.ReadWriteCloser, requestTim func (c *SessionClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationRegisterUdpSession) + defer timer.ObserveDuration() + + resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc() + } + return resp, err } func (c *SessionClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.UnregisterUdpSession(ctx, sessionID, message) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationUnregisterUdpSession) + defer timer.ObserveDuration() + + err := c.client.UnregisterUdpSession(ctx, sessionID, message) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc() + } + return err } func (c *SessionClient) Close() { diff --git a/tunnelrpc/registration_client.go b/tunnelrpc/registration_client.go index 96aef963..e27ad610 100644 --- a/tunnelrpc/registration_client.go +++ b/tunnelrpc/registration_client.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "zombiezen.com/go/capnproto2/rpc" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -53,19 +54,41 @@ func (r *registrationClient) RegisterConnection( ) (*pogs.ConnectionDetails, error) { ctx, cancel := context.WithTimeout(ctx, r.requestTimeout) defer cancel() - return r.client.RegisterConnection(ctx, auth, tunnelID, connIndex, options) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationRegisterConnection) + defer timer.ObserveDuration() + + conn, err := r.client.RegisterConnection(ctx, auth, tunnelID, connIndex, options) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc() + } + return conn, err } func (r *registrationClient) SendLocalConfiguration(ctx context.Context, config []byte) error { ctx, cancel := context.WithTimeout(ctx, r.requestTimeout) defer cancel() - return r.client.SendLocalConfiguration(ctx, config) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUpdateLocalConfiguration) + defer timer.ObserveDuration() + + err := r.client.SendLocalConfiguration(ctx, config) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc() + } + return err } func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) { ctx, cancel := context.WithTimeout(ctx, gracePeriod) defer cancel() - _ = r.client.UnregisterConnection(ctx) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUnregisterConnection) + defer timer.ObserveDuration() + err := r.client.UnregisterConnection(ctx) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc() + } } func (r *registrationClient) Close() {