diff --git a/connection/metrics.go b/connection/metrics.go index 385708db..c80bf46a 100644 --- a/connection/metrics.go +++ b/connection/metrics.go @@ -43,7 +43,6 @@ type localConfigMetrics struct { } type tunnelMetrics struct { - timerRetries prometheus.Gauge serverLocations *prometheus.GaugeVec // locationLock is a mutex for oldServerLocations locationLock sync.Mutex @@ -351,15 +350,6 @@ func initTunnelMetrics() *tunnelMetrics { ) prometheus.MustRegister(maxConcurrentRequestsPerTunnel) - timerRetries := prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: MetricsNamespace, - Subsystem: TunnelSubsystem, - Name: "timer_retries", - Help: "Unacknowledged heart beats count", - }) - prometheus.MustRegister(timerRetries) - serverLocations := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: MetricsNamespace, @@ -416,7 +406,6 @@ func initTunnelMetrics() *tunnelMetrics { prometheus.MustRegister(registerSuccess) return &tunnelMetrics{ - timerRetries: timerRetries, serverLocations: serverLocations, oldServerLocations: make(map[string]string), muxerMetrics: newMuxerMetrics(), diff --git a/tunnelrpc/metrics/metrics.go b/tunnelrpc/metrics/metrics.go new file mode 100644 index 00000000..72dbb8aa --- /dev/null +++ b/tunnelrpc/metrics/metrics.go @@ -0,0 +1,143 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + metricsNamespace = "cloudflared" + rpcSubsystem = "rpc" +) + +// CloudflaredServer operation labels +// CloudflaredServer is an extension of SessionManager with additional methods, but it's helpful +// to visualize it separately in the metrics since they are technically different client/servers. +const ( + Cloudflared = "cloudflared" +) + +// ConfigurationManager operation labels +const ( + ConfigurationManager = "config" + + OperationUpdateConfiguration = "update_configuration" +) + +// SessionManager operation labels +const ( + SessionManager = "session" + + OperationRegisterUdpSession = "register_udp_session" + OperationUnregisterUdpSession = "unregister_udp_session" +) + +// RegistrationServer operation labels +const ( + Registration = "registration" + + OperationRegisterConnection = "register_connection" + OperationUnregisterConnection = "unregister_connection" + OperationUpdateLocalConfiguration = "update_local_configuration" +) + +type rpcMetrics struct { + serverOperations *prometheus.CounterVec + serverFailures *prometheus.CounterVec + serverOperationsLatency *prometheus.HistogramVec + + ClientOperations *prometheus.CounterVec + ClientFailures *prometheus.CounterVec + ClientOperationsLatency *prometheus.HistogramVec +} + +var CapnpMetrics *rpcMetrics = &rpcMetrics{ + serverOperations: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "server_operations", + Help: "Number of rpc methods by handler served", + }, + []string{"handler", "method"}, + ), + serverFailures: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "server_failures", + Help: "Number of rpc methods failures by handler served", + }, + []string{"handler", "method"}, + ), + serverOperationsLatency: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "server_latency_secs", + Help: "Latency of rpc methods by handler served", + // Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds: + // 50ms, 150ms, 450ms, 1350ms, 4050ms + Buckets: prometheus.ExponentialBuckets(0.05, 3, 5), + }, + []string{"handler", "method"}, + ), + ClientOperations: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "client_operations", + Help: "Number of rpc methods by handler requested", + }, + []string{"handler", "method"}, + ), + ClientFailures: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "client_failures", + Help: "Number of rpc method failures by handler requested", + }, + []string{"handler", "method"}, + ), + ClientOperationsLatency: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: rpcSubsystem, + Name: "client_latency_secs", + Help: "Latency of rpc methods by handler requested", + // Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds: + // 50ms, 150ms, 450ms, 1350ms, 4050ms + Buckets: prometheus.ExponentialBuckets(0.05, 3, 5), + }, + []string{"handler", "method"}, + ), +} + +func ObserveServerHandler(inner func() error, handler, method string) error { + defer CapnpMetrics.serverOperations.WithLabelValues(handler, method).Inc() + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) { + CapnpMetrics.serverOperationsLatency.WithLabelValues(handler, method).Observe(s) + })) + defer timer.ObserveDuration() + + err := inner() + if err != nil { + CapnpMetrics.serverFailures.WithLabelValues(handler, method).Inc() + } + return err +} + +func NewClientOperationLatencyObserver(server string, method string) *prometheus.Timer { + return prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) { + CapnpMetrics.ClientOperationsLatency.WithLabelValues(server, method).Observe(s) + })) +} + +func init() { + prometheus.MustRegister(CapnpMetrics.serverOperations) + prometheus.MustRegister(CapnpMetrics.serverFailures) + prometheus.MustRegister(CapnpMetrics.serverOperationsLatency) + prometheus.MustRegister(CapnpMetrics.ClientOperations) + prometheus.MustRegister(CapnpMetrics.ClientFailures) + prometheus.MustRegister(CapnpMetrics.ClientOperationsLatency) +} diff --git a/tunnelrpc/pogs/configuration_manager.go b/tunnelrpc/pogs/configuration_manager.go index 5b99cbe7..4daae802 100644 --- a/tunnelrpc/pogs/configuration_manager.go +++ b/tunnelrpc/pogs/configuration_manager.go @@ -8,10 +8,12 @@ import ( "zombiezen.com/go/capnproto2/rpc" "zombiezen.com/go/capnproto2/server" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/proto" ) type ConfigurationManager interface { + // UpdateConfiguration is the call provided to cloudflared to load the latest remote configuration. UpdateConfiguration(ctx context.Context, version int32, config []byte) *UpdateConfigurationResponse } @@ -24,6 +26,10 @@ func ConfigurationManager_ServerToClient(c ConfigurationManager) proto.Configura } func (i ConfigurationManager_PogsImpl) UpdateConfiguration(p proto.ConfigurationManager_updateConfiguration) error { + return metrics.ObserveServerHandler(func() error { return i.updateConfiguration(p) }, metrics.ConfigurationManager, metrics.OperationUpdateConfiguration) +} + +func (i ConfigurationManager_PogsImpl) updateConfiguration(p proto.ConfigurationManager_updateConfiguration) error { server.Ack(p.Options) version := p.Params.Version() diff --git a/tunnelrpc/pogs/registration_server.go b/tunnelrpc/pogs/registration_server.go index e0f6b1e8..9908c113 100644 --- a/tunnelrpc/pogs/registration_server.go +++ b/tunnelrpc/pogs/registration_server.go @@ -12,12 +12,18 @@ import ( "zombiezen.com/go/capnproto2/rpc" "zombiezen.com/go/capnproto2/server" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/proto" ) type RegistrationServer interface { + // RegisterConnection is the call typically handled by the edge to initiate and authenticate a new connection + // for cloudflared. RegisterConnection(ctx context.Context, auth TunnelAuth, tunnelID uuid.UUID, connIndex byte, options *ConnectionOptions) (*ConnectionDetails, error) + // UnregisterConnection is the call typically handled by the edge to close an existing connection for cloudflared. UnregisterConnection(ctx context.Context) + // UpdateLocalConfiguration is the call typically handled by the edge for cloudflared to provide the current + // configuration it is operating with. UpdateLocalConfiguration(ctx context.Context, config []byte) error } @@ -30,6 +36,10 @@ func RegistrationServer_ServerToClient(s RegistrationServer) proto.RegistrationS } func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServer_registerConnection) error { + return metrics.ObserveServerHandler(func() error { return i.registerConnection(p) }, metrics.Registration, metrics.OperationRegisterConnection) +} + +func (i RegistrationServer_PogsImpl) registerConnection(p proto.RegistrationServer_registerConnection) error { server.Ack(p.Options) auth, err := p.Params.Auth() @@ -83,13 +93,18 @@ func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServ } func (i RegistrationServer_PogsImpl) UnregisterConnection(p proto.RegistrationServer_unregisterConnection) error { - server.Ack(p.Options) - - i.impl.UnregisterConnection(p.Ctx) - return nil + return metrics.ObserveServerHandler(func() error { + server.Ack(p.Options) + i.impl.UnregisterConnection(p.Ctx) + return nil // No metrics will be reported for failure as this method has no return value + }, metrics.Registration, metrics.OperationUnregisterConnection) } -func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error { +func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(p proto.RegistrationServer_updateLocalConfiguration) error { + return metrics.ObserveServerHandler(func() error { return i.updateLocalConfiguration(p) }, metrics.Registration, metrics.OperationUpdateLocalConfiguration) +} + +func (i RegistrationServer_PogsImpl) updateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error { server.Ack(c.Options) configBytes, err := c.Params.Config() diff --git a/tunnelrpc/pogs/session_manager.go b/tunnelrpc/pogs/session_manager.go index 4abea2bf..b5552983 100644 --- a/tunnelrpc/pogs/session_manager.go +++ b/tunnelrpc/pogs/session_manager.go @@ -11,6 +11,7 @@ import ( "zombiezen.com/go/capnproto2/rpc" "zombiezen.com/go/capnproto2/server" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/proto" ) @@ -32,6 +33,10 @@ func SessionManager_ServerToClient(s SessionManager) proto.SessionManager { } func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_registerUdpSession) error { + return metrics.ObserveServerHandler(func() error { return i.registerUdpSession(p) }, metrics.SessionManager, metrics.OperationRegisterUdpSession) +} + +func (i SessionManager_PogsImpl) registerUdpSession(p proto.SessionManager_registerUdpSession) error { server.Ack(p.Options) sessionIDRaw, err := p.Params.SessionId() @@ -78,6 +83,10 @@ func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_regis } func (i SessionManager_PogsImpl) UnregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error { + return metrics.ObserveServerHandler(func() error { return i.unregisterUdpSession(p) }, metrics.SessionManager, metrics.OperationUnregisterUdpSession) +} + +func (i SessionManager_PogsImpl) unregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error { server.Ack(p.Options) sessionIDRaw, err := p.Params.SessionId() diff --git a/tunnelrpc/quic/cloudflared_client.go b/tunnelrpc/quic/cloudflared_client.go index a5397d17..7cefd388 100644 --- a/tunnelrpc/quic/cloudflared_client.go +++ b/tunnelrpc/quic/cloudflared_client.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "github.com/cloudflare/cloudflared/tunnelrpc" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -43,19 +44,43 @@ func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, reques func (c *CloudflaredClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationRegisterUdpSession) + defer timer.ObserveDuration() + + resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc() + } + return resp, err } func (c *CloudflaredClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.UnregisterUdpSession(ctx, sessionID, message) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUnregisterUdpSession) + defer timer.ObserveDuration() + + err := c.client.UnregisterUdpSession(ctx, sessionID, message) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc() + } + return err } func (c *CloudflaredClient) UpdateConfiguration(ctx context.Context, version int32, config []byte) (*pogs.UpdateConfigurationResponse, error) { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.UpdateConfiguration(ctx, version, config) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUpdateConfiguration) + defer timer.ObserveDuration() + + resp, err := c.client.UpdateConfiguration(ctx, version, config) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc() + } + return resp, err } func (c *CloudflaredClient) Close() { diff --git a/tunnelrpc/quic/session_client.go b/tunnelrpc/quic/session_client.go index a8641e43..05c49910 100644 --- a/tunnelrpc/quic/session_client.go +++ b/tunnelrpc/quic/session_client.go @@ -11,6 +11,7 @@ import ( "zombiezen.com/go/capnproto2/rpc" "github.com/cloudflare/cloudflared/tunnelrpc" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -41,13 +42,29 @@ func NewSessionClient(ctx context.Context, stream io.ReadWriteCloser, requestTim func (c *SessionClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationRegisterUdpSession) + defer timer.ObserveDuration() + + resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc() + } + return resp, err } func (c *SessionClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error { ctx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() - return c.client.UnregisterUdpSession(ctx, sessionID, message) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationUnregisterUdpSession) + defer timer.ObserveDuration() + + err := c.client.UnregisterUdpSession(ctx, sessionID, message) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc() + } + return err } func (c *SessionClient) Close() { diff --git a/tunnelrpc/registration_client.go b/tunnelrpc/registration_client.go index 96aef963..e27ad610 100644 --- a/tunnelrpc/registration_client.go +++ b/tunnelrpc/registration_client.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "zombiezen.com/go/capnproto2/rpc" + "github.com/cloudflare/cloudflared/tunnelrpc/metrics" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -53,19 +54,41 @@ func (r *registrationClient) RegisterConnection( ) (*pogs.ConnectionDetails, error) { ctx, cancel := context.WithTimeout(ctx, r.requestTimeout) defer cancel() - return r.client.RegisterConnection(ctx, auth, tunnelID, connIndex, options) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationRegisterConnection) + defer timer.ObserveDuration() + + conn, err := r.client.RegisterConnection(ctx, auth, tunnelID, connIndex, options) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc() + } + return conn, err } func (r *registrationClient) SendLocalConfiguration(ctx context.Context, config []byte) error { ctx, cancel := context.WithTimeout(ctx, r.requestTimeout) defer cancel() - return r.client.SendLocalConfiguration(ctx, config) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUpdateLocalConfiguration) + defer timer.ObserveDuration() + + err := r.client.SendLocalConfiguration(ctx, config) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc() + } + return err } func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) { ctx, cancel := context.WithTimeout(ctx, gracePeriod) defer cancel() - _ = r.client.UnregisterConnection(ctx) + defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc() + timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUnregisterConnection) + defer timer.ObserveDuration() + err := r.client.UnregisterConnection(ctx) + if err != nil { + metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc() + } } func (r *registrationClient) Close() {