From 2791c20b55db8ca0b19b3e752b286d88c43ccbbb Mon Sep 17 00:00:00 2001 From: Nate Franzen Date: Mon, 2 Jul 2018 18:26:10 -0700 Subject: [PATCH] pass metricsLabels as high-level parameters --- cmd/cloudflared/main.go | 3 ++- origin/metrics.go | 12 +++++++++ origin/supervisor.go | 22 ++++++++-------- origin/tunnel.go | 56 +++++++++++++++++++++++++++-------------- 4 files changed, 62 insertions(+), 31 deletions(-) diff --git a/cmd/cloudflared/main.go b/cmd/cloudflared/main.go index 4daf0689..40b76e8b 100644 --- a/cmd/cloudflared/main.go +++ b/cmd/cloudflared/main.go @@ -482,10 +482,11 @@ func startServer(c *cli.Context, shutdownC, graceShutdownC chan struct{}) error return err } + metricsLabels := map[string]string{"application": "cloudflared"} wg.Add(1) go func() { defer wg.Done() - errC <- origin.StartTunnelDaemon(tunnelConfig, graceShutdownC, connectedSignal) + errC <- origin.StartTunnelDaemon(tunnelConfig, graceShutdownC, connectedSignal, metricsLabels) }() return waitToShutdown(&wg, errC, shutdownC, graceShutdownC, c.Duration("grace-period")) diff --git a/origin/metrics.go b/origin/metrics.go index 844dbc56..8237075d 100644 --- a/origin/metrics.go +++ b/origin/metrics.go @@ -398,3 +398,15 @@ func (t *TunnelMetrics) registerServerLocation(metricLabelValues []string, loc s t.serverLocations.WithLabelValues(labelValues...).Inc() t.oldServerLocations[hashKey] = loc } + +// SetServerLocation is called by the tunnelHandler when the tunnel opens +func (t *TunnelMetrics) SetServerLocation(metricLabelValues []string, loc string) { + labelValues := append(metricLabelValues, loc) + t.serverLocations.WithLabelValues(labelValues...).Set(1) +} + +// UnsetServerLocation is called by the tunnelHandler when the tunnel closes, or at least is known to be closed +func (t *TunnelMetrics) UnsetServerLocation(metricLabelValues []string, loc string) { + labelValues := append(metricLabelValues, loc) + t.serverLocations.WithLabelValues(labelValues...).Set(0) +} diff --git a/origin/supervisor.go b/origin/supervisor.go index 297d5153..11803c76 100644 --- a/origin/supervisor.go +++ b/origin/supervisor.go @@ -50,9 +50,9 @@ func NewSupervisor(config *TunnelConfig) *Supervisor { } } -func (s *Supervisor) Run(ctx context.Context, connectedSignal chan struct{}) error { +func (s *Supervisor) Run(ctx context.Context, connectedSignal chan struct{}, metricsLabels map[string]string) error { logger := s.config.Logger - if err := s.initialize(ctx, connectedSignal); err != nil { + if err := s.initialize(ctx, connectedSignal, metricsLabels); err != nil { return err } var tunnelsWaiting []int @@ -95,7 +95,7 @@ func (s *Supervisor) Run(ctx context.Context, connectedSignal chan struct{}) err case <-backoffTimer: backoffTimer = nil for _, index := range tunnelsWaiting { - go s.startTunnel(ctx, index, s.newConnectedTunnelSignal(index)) + go s.startTunnel(ctx, index, s.newConnectedTunnelSignal(index), metricsLabels) } tunnelsActive += len(tunnelsWaiting) tunnelsWaiting = nil @@ -119,7 +119,7 @@ func (s *Supervisor) Run(ctx context.Context, connectedSignal chan struct{}) err } } -func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct{}) error { +func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct{}, metricsLabels map[string]string) error { logger := s.config.Logger edgeIPs, err := ResolveEdgeIPs(s.config.EdgeAddrs) if err != nil { @@ -134,7 +134,7 @@ func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct s.lastResolve = time.Now() // check entitlement and version too old error before attempting to register more tunnels s.nextUnusedEdgeIP = s.config.HAConnections - go s.startFirstTunnel(ctx, connectedSignal) + go s.startFirstTunnel(ctx, connectedSignal, metricsLabels) select { case <-ctx.Done(): <-s.tunnelErrors @@ -146,7 +146,7 @@ func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct } // At least one successful connection, so start the rest for i := 1; i < s.config.HAConnections; i++ { - go s.startTunnel(ctx, i, make(chan struct{})) + go s.startTunnel(ctx, i, make(chan struct{}), metricsLabels) time.Sleep(registrationInterval) } return nil @@ -154,8 +154,8 @@ func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct // startTunnel starts the first tunnel connection. The resulting error will be sent on // s.tunnelErrors. It will send a signal via connectedSignal if registration succeed -func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal chan struct{}) { - err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(0), 0, connectedSignal) +func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal chan struct{}, metricsLabels map[string]string) { + err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(0), 0, connectedSignal, metricsLabels) defer func() { s.tunnelErrors <- tunnelError{index: 0, err: err} }() @@ -176,14 +176,14 @@ func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal chan default: return } - err = ServeTunnelLoop(ctx, s.config, s.getEdgeIP(0), 0, connectedSignal) + err = ServeTunnelLoop(ctx, s.config, s.getEdgeIP(0), 0, connectedSignal, metricsLabels) } } // startTunnel starts a new tunnel connection. The resulting error will be sent on // s.tunnelErrors. -func (s *Supervisor) startTunnel(ctx context.Context, index int, connectedSignal chan struct{}) { - err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(index), uint8(index), connectedSignal) +func (s *Supervisor) startTunnel(ctx context.Context, index int, connectedSignal chan struct{}, metricsLabels map[string]string) { + err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(index), uint8(index), connectedSignal, metricsLabels) s.tunnelErrors <- tunnelError{index: index, err: err} } diff --git a/origin/tunnel.go b/origin/tunnel.go index 5e3a4a65..9664696b 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -118,7 +118,7 @@ func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP str } } -func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}, connectedSignal chan struct{}) error { +func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}, connectedSignal chan struct{}, metricsLabels map[string]string) error { ctx, cancel := context.WithCancel(context.Background()) go func() { <-shutdownC @@ -126,13 +126,13 @@ func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}, connecte }() // If a user specified negative HAConnections, we will treat it as requesting 1 connection if config.HAConnections > 1 { - return NewSupervisor(config).Run(ctx, connectedSignal) + return NewSupervisor(config).Run(ctx, connectedSignal, metricsLabels) } else { addrs, err := ResolveEdgeIPs(config.EdgeAddrs) if err != nil { return err } - return ServeTunnelLoop(ctx, config, addrs[0], 0, connectedSignal) + return ServeTunnelLoop(ctx, config, addrs[0], 0, connectedSignal, metricsLabels) } } @@ -141,6 +141,7 @@ func ServeTunnelLoop(ctx context.Context, addr *net.TCPAddr, connectionID uint8, connectedSignal chan struct{}, + metricsLabels map[string]string, ) error { logger := config.Logger config.Metrics.incrementHaConnections() @@ -156,7 +157,7 @@ func ServeTunnelLoop(ctx context.Context, // Ensure the above goroutine will terminate if we return without connecting defer connectedFuse.Fuse(false) for { - err, recoverable := ServeTunnel(ctx, config, addr, connectionID, connectedFuse, &backoff) + err, recoverable := ServeTunnel(ctx, config, addr, connectionID, connectedFuse, &backoff, metricsLabels) if recoverable { if duration, ok := backoff.GetBackoffDuration(ctx); ok { logger.Infof("Retrying in %s seconds", duration) @@ -175,6 +176,7 @@ func ServeTunnel( connectionID uint8, connectedFuse *h2mux.BooleanFuse, backoff *BackoffHandler, + metricsLabels map[string]string, ) (err error, recoverable bool) { // Treat panics as recoverable errors defer func() { @@ -195,8 +197,17 @@ func ServeTunnel( tags := make(map[string]string) tags["ha"] = connectionTag + metricsLabelKeys := make([]string, len(metricsLabels)) + metricsLabelValues := make([]string, len(metricsLabels)) + i := 0 + for k, v := range metricsLabels { + metricsLabelKeys[i] = k + metricsLabelValues[i] = v + i++ + } + // Returns error from parsing the origin URL or handshake errors - handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID) + handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID, metricsLabelKeys, metricsLabelValues) if err != nil { errLog := config.Logger.WithError(err) switch err.(type) { @@ -214,7 +225,7 @@ func ServeTunnel( errGroup, serveCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - err := RegisterTunnel(serveCtx, handler.muxer, config, connectionID, originLocalIP) + err := RegisterTunnel(serveCtx, handler, config, connectionID, originLocalIP) if err == nil { connectedFuse.Fuse(true) backoff.SetGracePeriod() @@ -285,8 +296,10 @@ func IsRPCStreamResponse(headers []h2mux.Header) bool { return true } -func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig, connectionID uint8, originLocalIP string) error { +// RegisterTunnel returns the name of the location connected to, or an error +func RegisterTunnel(ctx context.Context, handler *TunnelHandler, config *TunnelConfig, connectionID uint8, originLocalIP string) error { config.Logger.Debug("initiating RPC stream to register") + muxer := handler.muxer stream, err := muxer.OpenStream([]h2mux.Header{ {Name: ":method", Value: "RPC"}, {Name: ":scheme", Value: "capnp"}, @@ -317,11 +330,12 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi config.Hostname, config.RegistrationOptions(connectionID, originLocalIP), ) - LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, config.Logger) if err != nil { // RegisterTunnel RPC failure return clientRegisterTunnelError{cause: err} } + LogServerInfo(serverInfoPromise.Result(), connectionID, handler, config.Logger) + for _, logLine := range registration.LogLines { config.Logger.Info(logLine) } @@ -368,21 +382,21 @@ func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log func LogServerInfo( promise tunnelrpc.ServerInfo_Promise, connectionID uint8, - metrics *TunnelMetrics, + handler *TunnelHandler, logger *log.Logger, ) { serverInfoMessage, err := promise.Struct() if err != nil { logger.WithError(err).Warn("Failed to retrieve server information") - return } serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage) if err != nil { logger.WithError(err).Warn("Failed to retrieve server information") - return } logger.Infof("Connected to %s", serverInfo.LocationName) - // metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName) + + metricsLabels := handler.getCombinedMetricsLabels(uint8ToString(connectionID)) + handler.metrics.registerServerLocation(metricsLabels, serverInfo.LocationName) } func H2RequestHeadersToH1Request(h2 []h2mux.Header, h1 *http.Request) error { @@ -449,19 +463,23 @@ func NewTunnelHandler(ctx context.Context, config *TunnelConfig, addr string, connectionID uint8, + baseMetricsLabelKeys []string, + baseMetricsLabelValues []string, ) (*TunnelHandler, string, error) { originURL, err := validation.ValidateUrl(config.OriginUrl) if err != nil { return nil, "", fmt.Errorf("Unable to parse origin url %#v", originURL) } h := &TunnelHandler{ - originUrl: originURL, - httpClient: config.HTTPTransport, - tlsConfig: config.ClientTlsConfig, - tags: config.Tags, - metrics: config.Metrics, - connectionID: uint8ToString(connectionID), - logger: config.Logger, + originUrl: originURL, + httpClient: config.HTTPTransport, + tlsConfig: config.ClientTlsConfig, + tags: config.Tags, + metrics: config.Metrics, + connectionID: uint8ToString(connectionID), + baseMetricsLabelKeys: baseMetricsLabelKeys, + baseMetricsLabelValues: baseMetricsLabelValues, + logger: config.Logger, } if h.httpClient == nil { h.httpClient = http.DefaultTransport