TUN-8422: Add metrics for capnp method calls

Adds new suite of metrics to capture the following for capnp rpcs operations:
- Method calls
- Method call failures
- Method call latencies

Each of the operations is labeled by the handler that serves the method and
the method of operation invoked. Additionally, each of these are split
between if the operation was called by a client or served.
This commit is contained in:
Devin Carr 2024-05-28 14:14:25 -07:00
parent 654a326098
commit 30197e7dfa
8 changed files with 251 additions and 24 deletions

View File

@ -43,7 +43,6 @@ type localConfigMetrics struct {
}
type tunnelMetrics struct {
timerRetries prometheus.Gauge
serverLocations *prometheus.GaugeVec
// locationLock is a mutex for oldServerLocations
locationLock sync.Mutex
@ -351,15 +350,6 @@ func initTunnelMetrics() *tunnelMetrics {
)
prometheus.MustRegister(maxConcurrentRequestsPerTunnel)
timerRetries := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: TunnelSubsystem,
Name: "timer_retries",
Help: "Unacknowledged heart beats count",
})
prometheus.MustRegister(timerRetries)
serverLocations := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
@ -416,7 +406,6 @@ func initTunnelMetrics() *tunnelMetrics {
prometheus.MustRegister(registerSuccess)
return &tunnelMetrics{
timerRetries: timerRetries,
serverLocations: serverLocations,
oldServerLocations: make(map[string]string),
muxerMetrics: newMuxerMetrics(),

View File

@ -0,0 +1,143 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
const (
metricsNamespace = "cloudflared"
rpcSubsystem = "rpc"
)
// CloudflaredServer operation labels
// CloudflaredServer is an extension of SessionManager with additional methods, but it's helpful
// to visualize it separately in the metrics since they are technically different client/servers.
const (
Cloudflared = "cloudflared"
)
// ConfigurationManager operation labels
const (
ConfigurationManager = "config"
OperationUpdateConfiguration = "update_configuration"
)
// SessionManager operation labels
const (
SessionManager = "session"
OperationRegisterUdpSession = "register_udp_session"
OperationUnregisterUdpSession = "unregister_udp_session"
)
// RegistrationServer operation labels
const (
Registration = "registration"
OperationRegisterConnection = "register_connection"
OperationUnregisterConnection = "unregister_connection"
OperationUpdateLocalConfiguration = "update_local_configuration"
)
type rpcMetrics struct {
serverOperations *prometheus.CounterVec
serverFailures *prometheus.CounterVec
serverOperationsLatency *prometheus.HistogramVec
ClientOperations *prometheus.CounterVec
ClientFailures *prometheus.CounterVec
ClientOperationsLatency *prometheus.HistogramVec
}
var CapnpMetrics *rpcMetrics = &rpcMetrics{
serverOperations: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: rpcSubsystem,
Name: "server_operations",
Help: "Number of rpc methods by handler served",
},
[]string{"handler", "method"},
),
serverFailures: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: rpcSubsystem,
Name: "server_failures",
Help: "Number of rpc methods failures by handler served",
},
[]string{"handler", "method"},
),
serverOperationsLatency: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: rpcSubsystem,
Name: "server_latency_secs",
Help: "Latency of rpc methods by handler served",
// Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds:
// 50ms, 150ms, 450ms, 1350ms, 4050ms
Buckets: prometheus.ExponentialBuckets(0.05, 3, 5),
},
[]string{"handler", "method"},
),
ClientOperations: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: rpcSubsystem,
Name: "client_operations",
Help: "Number of rpc methods by handler requested",
},
[]string{"handler", "method"},
),
ClientFailures: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: rpcSubsystem,
Name: "client_failures",
Help: "Number of rpc method failures by handler requested",
},
[]string{"handler", "method"},
),
ClientOperationsLatency: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: rpcSubsystem,
Name: "client_latency_secs",
Help: "Latency of rpc methods by handler requested",
// Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds:
// 50ms, 150ms, 450ms, 1350ms, 4050ms
Buckets: prometheus.ExponentialBuckets(0.05, 3, 5),
},
[]string{"handler", "method"},
),
}
func ObserveServerHandler(inner func() error, handler, method string) error {
defer CapnpMetrics.serverOperations.WithLabelValues(handler, method).Inc()
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) {
CapnpMetrics.serverOperationsLatency.WithLabelValues(handler, method).Observe(s)
}))
defer timer.ObserveDuration()
err := inner()
if err != nil {
CapnpMetrics.serverFailures.WithLabelValues(handler, method).Inc()
}
return err
}
func NewClientOperationLatencyObserver(server string, method string) *prometheus.Timer {
return prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) {
CapnpMetrics.ClientOperationsLatency.WithLabelValues(server, method).Observe(s)
}))
}
func init() {
prometheus.MustRegister(CapnpMetrics.serverOperations)
prometheus.MustRegister(CapnpMetrics.serverFailures)
prometheus.MustRegister(CapnpMetrics.serverOperationsLatency)
prometheus.MustRegister(CapnpMetrics.ClientOperations)
prometheus.MustRegister(CapnpMetrics.ClientFailures)
prometheus.MustRegister(CapnpMetrics.ClientOperationsLatency)
}

View File

@ -8,10 +8,12 @@ import (
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/server"
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
"github.com/cloudflare/cloudflared/tunnelrpc/proto"
)
type ConfigurationManager interface {
// UpdateConfiguration is the call provided to cloudflared to load the latest remote configuration.
UpdateConfiguration(ctx context.Context, version int32, config []byte) *UpdateConfigurationResponse
}
@ -24,6 +26,10 @@ func ConfigurationManager_ServerToClient(c ConfigurationManager) proto.Configura
}
func (i ConfigurationManager_PogsImpl) UpdateConfiguration(p proto.ConfigurationManager_updateConfiguration) error {
return metrics.ObserveServerHandler(func() error { return i.updateConfiguration(p) }, metrics.ConfigurationManager, metrics.OperationUpdateConfiguration)
}
func (i ConfigurationManager_PogsImpl) updateConfiguration(p proto.ConfigurationManager_updateConfiguration) error {
server.Ack(p.Options)
version := p.Params.Version()

View File

@ -12,12 +12,18 @@ import (
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/server"
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
"github.com/cloudflare/cloudflared/tunnelrpc/proto"
)
type RegistrationServer interface {
// RegisterConnection is the call typically handled by the edge to initiate and authenticate a new connection
// for cloudflared.
RegisterConnection(ctx context.Context, auth TunnelAuth, tunnelID uuid.UUID, connIndex byte, options *ConnectionOptions) (*ConnectionDetails, error)
// UnregisterConnection is the call typically handled by the edge to close an existing connection for cloudflared.
UnregisterConnection(ctx context.Context)
// UpdateLocalConfiguration is the call typically handled by the edge for cloudflared to provide the current
// configuration it is operating with.
UpdateLocalConfiguration(ctx context.Context, config []byte) error
}
@ -30,6 +36,10 @@ func RegistrationServer_ServerToClient(s RegistrationServer) proto.RegistrationS
}
func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServer_registerConnection) error {
return metrics.ObserveServerHandler(func() error { return i.registerConnection(p) }, metrics.Registration, metrics.OperationRegisterConnection)
}
func (i RegistrationServer_PogsImpl) registerConnection(p proto.RegistrationServer_registerConnection) error {
server.Ack(p.Options)
auth, err := p.Params.Auth()
@ -83,13 +93,18 @@ func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServ
}
func (i RegistrationServer_PogsImpl) UnregisterConnection(p proto.RegistrationServer_unregisterConnection) error {
server.Ack(p.Options)
i.impl.UnregisterConnection(p.Ctx)
return nil
return metrics.ObserveServerHandler(func() error {
server.Ack(p.Options)
i.impl.UnregisterConnection(p.Ctx)
return nil // No metrics will be reported for failure as this method has no return value
}, metrics.Registration, metrics.OperationUnregisterConnection)
}
func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error {
func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(p proto.RegistrationServer_updateLocalConfiguration) error {
return metrics.ObserveServerHandler(func() error { return i.updateLocalConfiguration(p) }, metrics.Registration, metrics.OperationUpdateLocalConfiguration)
}
func (i RegistrationServer_PogsImpl) updateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error {
server.Ack(c.Options)
configBytes, err := c.Params.Config()

View File

@ -11,6 +11,7 @@ import (
"zombiezen.com/go/capnproto2/rpc"
"zombiezen.com/go/capnproto2/server"
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
"github.com/cloudflare/cloudflared/tunnelrpc/proto"
)
@ -32,6 +33,10 @@ func SessionManager_ServerToClient(s SessionManager) proto.SessionManager {
}
func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_registerUdpSession) error {
return metrics.ObserveServerHandler(func() error { return i.registerUdpSession(p) }, metrics.SessionManager, metrics.OperationRegisterUdpSession)
}
func (i SessionManager_PogsImpl) registerUdpSession(p proto.SessionManager_registerUdpSession) error {
server.Ack(p.Options)
sessionIDRaw, err := p.Params.SessionId()
@ -78,6 +83,10 @@ func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_regis
}
func (i SessionManager_PogsImpl) UnregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error {
return metrics.ObserveServerHandler(func() error { return i.unregisterUdpSession(p) }, metrics.SessionManager, metrics.OperationUnregisterUdpSession)
}
func (i SessionManager_PogsImpl) unregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error {
server.Ack(p.Options)
sessionIDRaw, err := p.Params.SessionId()

View File

@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/cloudflare/cloudflared/tunnelrpc"
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
@ -43,19 +44,43 @@ func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, reques
func (c *CloudflaredClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
defer cancel()
return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationRegisterUdpSession)
defer timer.ObserveDuration()
resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc()
}
return resp, err
}
func (c *CloudflaredClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
defer cancel()
return c.client.UnregisterUdpSession(ctx, sessionID, message)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUnregisterUdpSession)
defer timer.ObserveDuration()
err := c.client.UnregisterUdpSession(ctx, sessionID, message)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc()
}
return err
}
func (c *CloudflaredClient) UpdateConfiguration(ctx context.Context, version int32, config []byte) (*pogs.UpdateConfigurationResponse, error) {
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
defer cancel()
return c.client.UpdateConfiguration(ctx, version, config)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUpdateConfiguration)
defer timer.ObserveDuration()
resp, err := c.client.UpdateConfiguration(ctx, version, config)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc()
}
return resp, err
}
func (c *CloudflaredClient) Close() {

View File

@ -11,6 +11,7 @@ import (
"zombiezen.com/go/capnproto2/rpc"
"github.com/cloudflare/cloudflared/tunnelrpc"
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
@ -41,13 +42,29 @@ func NewSessionClient(ctx context.Context, stream io.ReadWriteCloser, requestTim
func (c *SessionClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
defer cancel()
return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationRegisterUdpSession)
defer timer.ObserveDuration()
resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc()
}
return resp, err
}
func (c *SessionClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
defer cancel()
return c.client.UnregisterUdpSession(ctx, sessionID, message)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationUnregisterUdpSession)
defer timer.ObserveDuration()
err := c.client.UnregisterUdpSession(ctx, sessionID, message)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc()
}
return err
}
func (c *SessionClient) Close() {

View File

@ -9,6 +9,7 @@ import (
"github.com/google/uuid"
"zombiezen.com/go/capnproto2/rpc"
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
@ -53,19 +54,41 @@ func (r *registrationClient) RegisterConnection(
) (*pogs.ConnectionDetails, error) {
ctx, cancel := context.WithTimeout(ctx, r.requestTimeout)
defer cancel()
return r.client.RegisterConnection(ctx, auth, tunnelID, connIndex, options)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationRegisterConnection)
defer timer.ObserveDuration()
conn, err := r.client.RegisterConnection(ctx, auth, tunnelID, connIndex, options)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc()
}
return conn, err
}
func (r *registrationClient) SendLocalConfiguration(ctx context.Context, config []byte) error {
ctx, cancel := context.WithTimeout(ctx, r.requestTimeout)
defer cancel()
return r.client.SendLocalConfiguration(ctx, config)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUpdateLocalConfiguration)
defer timer.ObserveDuration()
err := r.client.SendLocalConfiguration(ctx, config)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc()
}
return err
}
func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
ctx, cancel := context.WithTimeout(ctx, gracePeriod)
defer cancel()
_ = r.client.UnregisterConnection(ctx)
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc()
timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUnregisterConnection)
defer timer.ObserveDuration()
err := r.client.UnregisterConnection(ctx)
if err != nil {
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc()
}
}
func (r *registrationClient) Close() {