From 192ae35728b0943cfc99a1fee98772df6db8ffdc Mon Sep 17 00:00:00 2001 From: Adam Chalmers Date: Mon, 26 Nov 2018 11:32:27 -0600 Subject: [PATCH] TUN-1212: Expose tunnel_id in metrics --- origin/metrics.go | 6 ++++-- origin/tunnel.go | 11 ++++++++-- origin/tunnelsforha.go | 49 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 4 deletions(-) create mode 100644 origin/tunnelsforha.go diff --git a/origin/metrics.go b/origin/metrics.go index d69a9009..f7b6b1ae 100644 --- a/origin/metrics.go +++ b/origin/metrics.go @@ -52,6 +52,7 @@ type TunnelMetrics struct { oldServerLocations map[string]string muxerMetrics *muxerMetrics + tunnelsHA tunnelsForHA } func newMuxerMetrics() *muxerMetrics { @@ -355,6 +356,7 @@ func NewTunnelMetrics() *TunnelMetrics { serverLocations: serverLocations, oldServerLocations: make(map[string]string), muxerMetrics: newMuxerMetrics(), + tunnelsHA: NewTunnelsForHA(), } } @@ -375,7 +377,7 @@ func (t *TunnelMetrics) incrementRequests(connectionID string) { var concurrentRequests uint64 var ok bool if concurrentRequests, ok = t.concurrentRequests[connectionID]; ok { - t.concurrentRequests[connectionID] += 1 + t.concurrentRequests[connectionID]++ concurrentRequests++ } else { t.concurrentRequests[connectionID] = 1 @@ -395,7 +397,7 @@ func (t *TunnelMetrics) incrementRequests(connectionID string) { func (t *TunnelMetrics) decrementConcurrentRequests(connectionID string) { t.concurrentRequestsLock.Lock() if _, ok := t.concurrentRequests[connectionID]; ok { - t.concurrentRequests[connectionID] -= 1 + t.concurrentRequests[connectionID]-- } t.concurrentRequestsLock.Unlock() diff --git a/origin/tunnel.go b/origin/tunnel.go index dab1d4a6..28f411bf 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -292,7 +292,13 @@ func IsRPCStreamResponse(headers []h2mux.Header) bool { return true } -func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig, connectionID uint8, originLocalIP string) error { +func RegisterTunnel( + ctx context.Context, + muxer *h2mux.Muxer, + config *TunnelConfig, + connectionID uint8, + originLocalIP string, +) error { config.Logger.Debug("initiating RPC stream to register") stream, err := muxer.OpenStream([]h2mux.Header{ {Name: ":method", Value: "RPC"}, @@ -346,7 +352,8 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi } if registration.TunnelID != "" { - config.Logger.Info("Tunnel ID: " + registration.TunnelID) + config.Metrics.tunnelsHA.AddTunnelID(connectionID, registration.TunnelID) + config.Logger.Infof("Each HA connection's tunnel IDs: %v", config.Metrics.tunnelsHA.String()) } // Print out the user's trial zone URL in a nice box (if they requested and got one) diff --git a/origin/tunnelsforha.go b/origin/tunnelsforha.go new file mode 100644 index 00000000..d838c03d --- /dev/null +++ b/origin/tunnelsforha.go @@ -0,0 +1,49 @@ +package origin + +import ( + "fmt" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// tunnelsForHA maps this cloudflared instance's HA connections to the tunnel IDs they serve. +type tunnelsForHA struct { + sync.Mutex + metrics *prometheus.GaugeVec + entries map[uint8]string +} + +// NewTunnelsForHA initializes the Prometheus metrics etc for a tunnelsForHA. +func NewTunnelsForHA() tunnelsForHA { + metrics := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "tunnel_ids", + Help: "The ID of all tunnels (and their corresponding HA connection ID) running in this instance of cloudflared.", + }, + []string{"tunnel_id", "ha_conn_id"}, + ) + prometheus.MustRegister(metrics) + + return tunnelsForHA{ + metrics: metrics, + entries: make(map[uint8]string), + } +} + +// Track a new tunnel ID, removing the disconnected tunnel (if any) and update metrics. +func (t *tunnelsForHA) AddTunnelID(haConn uint8, tunnelID string) { + t.Lock() + defer t.Unlock() + if oldTunnelID, ok := t.entries[haConn]; ok { + t.metrics.WithLabelValues(oldTunnelID).Dec() + } + t.entries[haConn] = tunnelID + t.metrics.WithLabelValues(tunnelID, fmt.Sprintf("%v", haConn)).Inc() +} + +func (t *tunnelsForHA) String() string { + t.Lock() + defer t.Unlock() + return fmt.Sprintf("%v", t.entries) +}