more refactoring, use metrics namespace, fix label ordering bug
This commit is contained in:
		
							parent
							
								
									09fbd75000
								
							
						
					
					
						commit
						a86f44a8e1
					
				| 
						 | 
				
			
			@ -482,7 +482,10 @@ func startServer(c *cli.Context, shutdownC, graceShutdownC chan struct{}) error
 | 
			
		|||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	metricsLabels := map[string]string{"application": "cloudflared"}
 | 
			
		||||
	metricsLabels := origin.MetricsLabelList{
 | 
			
		||||
		Keys:   []string{"application"},
 | 
			
		||||
		Values: []string{"cloudflared"},
 | 
			
		||||
	}
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer wg.Done()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,6 +11,15 @@ import (
 | 
			
		|||
	"github.com/prometheus/client_golang/prometheus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ArgoTunnelNamespace is a namespace for metrics labels
 | 
			
		||||
const ArgoTunnelNamespace = "argotunnel"
 | 
			
		||||
 | 
			
		||||
// Lists of metrics label keys and values, in matched order
 | 
			
		||||
type MetricsLabelList struct {
 | 
			
		||||
	Keys   []string
 | 
			
		||||
	Values []string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type muxerMetrics struct {
 | 
			
		||||
	rtt              *prometheus.GaugeVec
 | 
			
		||||
	rttMin           *prometheus.GaugeVec
 | 
			
		||||
| 
						 | 
				
			
			@ -32,160 +41,178 @@ type muxerMetrics struct {
 | 
			
		|||
type TunnelMetrics struct {
 | 
			
		||||
	haConnections prometheus.Gauge
 | 
			
		||||
 | 
			
		||||
	requests *prometheus.CounterVec
 | 
			
		||||
	// concurrentRequestsLock is a mutex for concurrentRequests and maxConcurrentRequests
 | 
			
		||||
	concurrentRequestsLock      sync.Mutex
 | 
			
		||||
	concurrentRequestsPerTunnel *prometheus.GaugeVec
 | 
			
		||||
	// concurrentRequests records count of concurrent requests for each tunnel, keyed by hash of label values
 | 
			
		||||
	concurrentRequests             map[uint64]uint64
 | 
			
		||||
	maxConcurrentRequestsPerTunnel *prometheus.GaugeVec
 | 
			
		||||
	// concurrentRequests records max count of concurrent requests for each tunnel, keyed by hash of label values
 | 
			
		||||
	maxConcurrentRequests map[uint64]uint64
 | 
			
		||||
	timerRetries          prometheus.Gauge
 | 
			
		||||
	requests  *prometheus.CounterVec
 | 
			
		||||
	responses *prometheus.CounterVec
 | 
			
		||||
 | 
			
		||||
	responses       *prometheus.CounterVec
 | 
			
		||||
	serverLocations *prometheus.GaugeVec
 | 
			
		||||
	// locationLock is a mutex for oldServerLocations
 | 
			
		||||
	locationLock sync.Mutex
 | 
			
		||||
	// oldServerLocations stores the last server the tunnel was connected to
 | 
			
		||||
	// concurrentRequestsLock is a mutex for concurrentRequests and maxConcurrentRequests
 | 
			
		||||
	// counters are keyed by hash of label values
 | 
			
		||||
	concurrentRequestsLock       sync.Mutex
 | 
			
		||||
	concurrentRequests           *prometheus.GaugeVec
 | 
			
		||||
	concurrentRequestsCounter    map[uint64]uint64
 | 
			
		||||
	maxConcurrentRequests        *prometheus.GaugeVec
 | 
			
		||||
	maxConcurrentRequestsCounter map[uint64]uint64
 | 
			
		||||
 | 
			
		||||
	timerRetries prometheus.Gauge
 | 
			
		||||
 | 
			
		||||
	// oldServerLocations stores the last server the tunnel was connected to, secured by mutex
 | 
			
		||||
	locationLock       sync.Mutex
 | 
			
		||||
	serverLocations    *prometheus.GaugeVec
 | 
			
		||||
	oldServerLocations map[uint64]string
 | 
			
		||||
 | 
			
		||||
	muxerMetrics *muxerMetrics
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newMuxerMetrics(baseMetricsLabels []string) *muxerMetrics {
 | 
			
		||||
func newMuxerMetrics(baseMetricsLabelKeys []string) *muxerMetrics {
 | 
			
		||||
 | 
			
		||||
	connectionIDLabelKeys := append(baseMetricsLabelKeys, "connection_id")
 | 
			
		||||
 | 
			
		||||
	rtt := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_rtt",
 | 
			
		||||
			Help: "Round-trip time in millisecond",
 | 
			
		||||
			Name:      "rtt",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Round-trip time in millisecond",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(rtt)
 | 
			
		||||
 | 
			
		||||
	rttMin := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_rtt_min",
 | 
			
		||||
			Help: "Shortest round-trip time in millisecond",
 | 
			
		||||
			Name:      "rtt_min",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Shortest round-trip time in millisecond",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(rttMin)
 | 
			
		||||
 | 
			
		||||
	rttMax := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_rtt_max",
 | 
			
		||||
			Help: "Longest round-trip time in millisecond",
 | 
			
		||||
			Name:      "rtt_max",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Longest round-trip time in millisecond",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(rttMax)
 | 
			
		||||
 | 
			
		||||
	receiveWindowAve := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_receive_window_ave",
 | 
			
		||||
			Help: "Average receive window size in bytes",
 | 
			
		||||
			Name:      "receive_window_avg",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Average receive window size in bytes",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(receiveWindowAve)
 | 
			
		||||
 | 
			
		||||
	sendWindowAve := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_send_window_ave",
 | 
			
		||||
			Help: "Average send window size in bytes",
 | 
			
		||||
			Name:      "send_window_avg",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Average send window size in bytes",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(sendWindowAve)
 | 
			
		||||
 | 
			
		||||
	receiveWindowMin := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_receive_window_min",
 | 
			
		||||
			Help: "Smallest receive window size in bytes",
 | 
			
		||||
			Name:      "receive_window_min",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Smallest receive window size in bytes",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(receiveWindowMin)
 | 
			
		||||
 | 
			
		||||
	receiveWindowMax := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_receive_window_max",
 | 
			
		||||
			Help: "Largest receive window size in bytes",
 | 
			
		||||
			Name:      "receive_window_max",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Largest receive window size in bytes",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(receiveWindowMax)
 | 
			
		||||
 | 
			
		||||
	sendWindowMin := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_send_window_min",
 | 
			
		||||
			Help: "Smallest send window size in bytes",
 | 
			
		||||
			Name:      "send_window_min",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Smallest send window size in bytes",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(sendWindowMin)
 | 
			
		||||
 | 
			
		||||
	sendWindowMax := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_send_window_max",
 | 
			
		||||
			Help: "Largest send window size in bytes",
 | 
			
		||||
			Name:      "send_window_max",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Largest send window size in bytes",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(sendWindowMax)
 | 
			
		||||
 | 
			
		||||
	inBoundRateCurr := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_inbound_bytes_per_sec_curr",
 | 
			
		||||
			Help: "Current inbounding bytes per second, 0 if there is no incoming connection",
 | 
			
		||||
			Name:      "inbound_bytes_per_sec_curr",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Current inbounding bytes per second, 0 if there is no incoming connection",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(inBoundRateCurr)
 | 
			
		||||
 | 
			
		||||
	inBoundRateMin := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_inbound_bytes_per_sec_min",
 | 
			
		||||
			Help: "Minimum non-zero inbounding bytes per second",
 | 
			
		||||
			Name:      "inbound_bytes_per_sec_min",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Minimum non-zero inbounding bytes per second",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(inBoundRateMin)
 | 
			
		||||
 | 
			
		||||
	inBoundRateMax := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_inbound_bytes_per_sec_max",
 | 
			
		||||
			Help: "Maximum inbounding bytes per second",
 | 
			
		||||
			Name:      "inbound_bytes_per_sec_max",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Maximum inbounding bytes per second",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(inBoundRateMax)
 | 
			
		||||
 | 
			
		||||
	outBoundRateCurr := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_outbound_bytes_per_sec_curr",
 | 
			
		||||
			Help: "Current outbounding bytes per second, 0 if there is no outgoing traffic",
 | 
			
		||||
			Name:      "outbound_bytes_per_sec_curr",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Current outbounding bytes per second, 0 if there is no outgoing traffic",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(outBoundRateCurr)
 | 
			
		||||
 | 
			
		||||
	outBoundRateMin := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_outbound_bytes_per_sec_min",
 | 
			
		||||
			Help: "Minimum non-zero outbounding bytes per second",
 | 
			
		||||
			Name:      "outbound_bytes_per_sec_min",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Minimum non-zero outbounding bytes per second",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(outBoundRateMin)
 | 
			
		||||
 | 
			
		||||
	outBoundRateMax := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_outbound_bytes_per_sec_max",
 | 
			
		||||
			Help: "Maximum outbounding bytes per second",
 | 
			
		||||
			Name:      "outbound_bytes_per_sec_max",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Maximum outbounding bytes per second",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricsLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabelKeys,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(outBoundRateMax)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -231,88 +258,90 @@ func convertRTTMilliSec(t time.Duration) float64 {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
// Metrics that can be collected without asking the edge
 | 
			
		||||
func NewTunnelMetrics(baseMetricLabels []string) *TunnelMetrics {
 | 
			
		||||
func NewTunnelMetrics(baseMetricsLabelKeys []string) *TunnelMetrics {
 | 
			
		||||
 | 
			
		||||
	connectionIDLabels := append(baseMetricsLabelKeys, "connection_id")
 | 
			
		||||
 | 
			
		||||
	haConnections := prometheus.NewGauge(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_ha_connections",
 | 
			
		||||
			Help: "Number of active ha connections",
 | 
			
		||||
			Name:      "ha_connections",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Number of active ha connections",
 | 
			
		||||
		})
 | 
			
		||||
	prometheus.MustRegister(haConnections)
 | 
			
		||||
 | 
			
		||||
	requests := prometheus.NewCounterVec(
 | 
			
		||||
		prometheus.CounterOpts{
 | 
			
		||||
			Name: "argotunnel_requests",
 | 
			
		||||
			Help: "Amount of requests proxied through each tunnel",
 | 
			
		||||
			Name:      "requests",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Amount of requests proxied through each tunnel",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabels,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(requests)
 | 
			
		||||
 | 
			
		||||
	concurrentRequestsPerTunnel := prometheus.NewGaugeVec(
 | 
			
		||||
	concurrentRequests := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_concurrent_requests_per_tunnel",
 | 
			
		||||
			Help: "Concurrent requests proxied through each tunnel",
 | 
			
		||||
			Name:      "concurrent_requests",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Concurrent requests proxied through each tunnel",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabels,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(concurrentRequestsPerTunnel)
 | 
			
		||||
	prometheus.MustRegister(concurrentRequests)
 | 
			
		||||
 | 
			
		||||
	maxConcurrentRequestsPerTunnel := prometheus.NewGaugeVec(
 | 
			
		||||
	maxConcurrentRequests := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_max_concurrent_requests_per_tunnel",
 | 
			
		||||
			Help: "Largest number of concurrent requests proxied through each tunnel so far",
 | 
			
		||||
			Name:      "max_concurrent_requests",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Largest number of concurrent requests proxied through each tunnel so far",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricLabels, "connection_id"),
 | 
			
		||||
		connectionIDLabels,
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(maxConcurrentRequestsPerTunnel)
 | 
			
		||||
	prometheus.MustRegister(maxConcurrentRequests)
 | 
			
		||||
 | 
			
		||||
	timerRetries := prometheus.NewGauge(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_timer_retries",
 | 
			
		||||
			Help: "Unacknowledged heart beats count",
 | 
			
		||||
		})
 | 
			
		||||
			Name:      "timer_retries",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Unacknowledged heartbeat count",
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(timerRetries)
 | 
			
		||||
 | 
			
		||||
	// responseByCode := prometheus.NewCounterVec(
 | 
			
		||||
	// 	prometheus.CounterOpts{
 | 
			
		||||
	// 		Name: "argotunnel_response_by_code",
 | 
			
		||||
	// 		Help: "Count of responses by HTTP status code",
 | 
			
		||||
	// 	},
 | 
			
		||||
	// 	[]string{"status_code"},
 | 
			
		||||
	// )
 | 
			
		||||
	// prometheus.MustRegister(responseByCode)
 | 
			
		||||
 | 
			
		||||
	responses := prometheus.NewCounterVec(
 | 
			
		||||
		prometheus.CounterOpts{
 | 
			
		||||
			Name: "argotunnel_responses",
 | 
			
		||||
			Help: "Count of responses for each tunnel",
 | 
			
		||||
			Name:      "responses",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Count of responses for each tunnel",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricLabels, "connection_id", "status_code"),
 | 
			
		||||
		append(baseMetricsLabelKeys, "connection_id", "status_code"),
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(responses)
 | 
			
		||||
 | 
			
		||||
	serverLocations := prometheus.NewGaugeVec(
 | 
			
		||||
		prometheus.GaugeOpts{
 | 
			
		||||
			Name: "argotunnel_server_locations",
 | 
			
		||||
			Help: "Where each tunnel is connected to. 1 means current location, 0 means previous locations.",
 | 
			
		||||
			Name:      "server_locations",
 | 
			
		||||
			Namespace: ArgoTunnelNamespace,
 | 
			
		||||
			Help:      "Where each tunnel is connected to. 1 means current location, 0 means previous locations.",
 | 
			
		||||
		},
 | 
			
		||||
		append(baseMetricLabels, "connection_id", "location"),
 | 
			
		||||
		append(baseMetricsLabelKeys, "connection_id", "location"),
 | 
			
		||||
	)
 | 
			
		||||
	prometheus.MustRegister(serverLocations)
 | 
			
		||||
 | 
			
		||||
	return &TunnelMetrics{
 | 
			
		||||
		haConnections:                  haConnections,
 | 
			
		||||
		requests:                       requests,
 | 
			
		||||
		concurrentRequestsPerTunnel:    concurrentRequestsPerTunnel,
 | 
			
		||||
		concurrentRequests:             make(map[uint64]uint64),
 | 
			
		||||
		maxConcurrentRequestsPerTunnel: maxConcurrentRequestsPerTunnel,
 | 
			
		||||
		maxConcurrentRequests:          make(map[uint64]uint64),
 | 
			
		||||
		timerRetries:                   timerRetries,
 | 
			
		||||
		haConnections:                haConnections,
 | 
			
		||||
		requests:                     requests,
 | 
			
		||||
		concurrentRequests:           concurrentRequests,
 | 
			
		||||
		concurrentRequestsCounter:    make(map[uint64]uint64),
 | 
			
		||||
		maxConcurrentRequests:        maxConcurrentRequests,
 | 
			
		||||
		maxConcurrentRequestsCounter: make(map[uint64]uint64),
 | 
			
		||||
		timerRetries:                 timerRetries,
 | 
			
		||||
 | 
			
		||||
		responses:          responses,
 | 
			
		||||
		serverLocations:    serverLocations,
 | 
			
		||||
		oldServerLocations: make(map[uint64]string),
 | 
			
		||||
		muxerMetrics:       newMuxerMetrics(baseMetricLabels),
 | 
			
		||||
		muxerMetrics:       newMuxerMetrics(baseMetricsLabelKeys),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -336,43 +365,43 @@ func (t *TunnelMetrics) updateMuxerMetrics(metricLabelValues []string, metrics *
 | 
			
		|||
	t.muxerMetrics.update(metricLabelValues, metrics)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// hashing of labels and locking are necessary in order to calculate concurrent requests
 | 
			
		||||
func (t *TunnelMetrics) incrementRequests(metricLabelValues []string) {
 | 
			
		||||
	t.concurrentRequestsLock.Lock()
 | 
			
		||||
	var concurrentRequests uint64
 | 
			
		||||
	var ok bool
 | 
			
		||||
	hashKey := hashLabelValues(metricLabelValues)
 | 
			
		||||
	if concurrentRequests, ok = t.concurrentRequests[hashKey]; ok {
 | 
			
		||||
		t.concurrentRequests[hashKey] += 1
 | 
			
		||||
	if concurrentRequests, ok = t.concurrentRequestsCounter[hashKey]; ok {
 | 
			
		||||
		t.concurrentRequestsCounter[hashKey]++
 | 
			
		||||
		concurrentRequests++
 | 
			
		||||
	} else {
 | 
			
		||||
		t.concurrentRequests[hashKey] = 1
 | 
			
		||||
		t.concurrentRequestsCounter[hashKey] = 1
 | 
			
		||||
		concurrentRequests = 1
 | 
			
		||||
	}
 | 
			
		||||
	if maxConcurrentRequests, ok := t.maxConcurrentRequests[hashKey]; (ok && maxConcurrentRequests < concurrentRequests) || !ok {
 | 
			
		||||
		t.maxConcurrentRequests[hashKey] = concurrentRequests
 | 
			
		||||
		t.maxConcurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Set(float64(concurrentRequests))
 | 
			
		||||
	if maxConcurrentRequests, ok := t.maxConcurrentRequestsCounter[hashKey]; (ok && maxConcurrentRequests < concurrentRequests) || !ok {
 | 
			
		||||
		t.maxConcurrentRequestsCounter[hashKey] = concurrentRequests
 | 
			
		||||
		t.maxConcurrentRequests.WithLabelValues(metricLabelValues...).Set(float64(concurrentRequests))
 | 
			
		||||
	}
 | 
			
		||||
	t.concurrentRequestsLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	t.requests.WithLabelValues(metricLabelValues...).Inc()
 | 
			
		||||
	t.concurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Inc()
 | 
			
		||||
	t.concurrentRequests.WithLabelValues(metricLabelValues...).Inc()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *TunnelMetrics) decrementConcurrentRequests(metricLabelValues []string) {
 | 
			
		||||
	t.concurrentRequestsLock.Lock()
 | 
			
		||||
	hashKey := hashLabelValues(metricLabelValues)
 | 
			
		||||
	if _, ok := t.concurrentRequests[hashKey]; ok {
 | 
			
		||||
		t.concurrentRequests[hashKey] -= 1
 | 
			
		||||
	if _, ok := t.concurrentRequestsCounter[hashKey]; ok {
 | 
			
		||||
		t.concurrentRequestsCounter[hashKey]--
 | 
			
		||||
	}
 | 
			
		||||
	t.concurrentRequestsLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	t.concurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Dec()
 | 
			
		||||
	t.concurrentRequests.WithLabelValues(metricLabelValues...).Dec()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *TunnelMetrics) incrementResponses(metricLabelValues []string, responseCode int) {
 | 
			
		||||
	labelValues := append(metricLabelValues, strconv.Itoa(responseCode))
 | 
			
		||||
	t.responses.WithLabelValues(labelValues...).Inc()
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *TunnelMetrics) registerServerLocation(metricLabelValues []string, loc string) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -9,8 +9,9 @@ import (
 | 
			
		|||
)
 | 
			
		||||
 | 
			
		||||
// can only be called once
 | 
			
		||||
var testMetrics = make([]string, 0)
 | 
			
		||||
var m = NewTunnelMetrics(testMetrics)
 | 
			
		||||
// There's no TunnelHandler in these tests to keep track of baseMetricsKeys
 | 
			
		||||
var emptyMetricKeys = make([]string, 0)
 | 
			
		||||
var m = NewTunnelMetrics(emptyMetricKeys)
 | 
			
		||||
 | 
			
		||||
func TestConcurrentRequestsSingleTunnel(t *testing.T) {
 | 
			
		||||
	routines := 20
 | 
			
		||||
| 
						 | 
				
			
			@ -27,10 +28,10 @@ func TestConcurrentRequestsSingleTunnel(t *testing.T) {
 | 
			
		|||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	assert.Len(t, m.concurrentRequests, 1)
 | 
			
		||||
	assert.Equal(t, uint64(routines), m.concurrentRequests[hashKey])
 | 
			
		||||
	assert.Len(t, m.maxConcurrentRequests, 1)
 | 
			
		||||
	assert.Equal(t, uint64(routines), m.maxConcurrentRequests[hashKey])
 | 
			
		||||
	assert.Len(t, m.concurrentRequestsCounter, 1)
 | 
			
		||||
	assert.Equal(t, uint64(routines), m.concurrentRequestsCounter[hashKey])
 | 
			
		||||
	assert.Len(t, m.maxConcurrentRequestsCounter, 1)
 | 
			
		||||
	assert.Equal(t, uint64(routines), m.maxConcurrentRequestsCounter[hashKey])
 | 
			
		||||
 | 
			
		||||
	wg.Add(routines / 2)
 | 
			
		||||
	for i := 0; i < routines/2; i++ {
 | 
			
		||||
| 
						 | 
				
			
			@ -40,13 +41,13 @@ func TestConcurrentRequestsSingleTunnel(t *testing.T) {
 | 
			
		|||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	assert.Equal(t, uint64(routines-routines/2), m.concurrentRequests[hashKey])
 | 
			
		||||
	assert.Equal(t, uint64(routines), m.maxConcurrentRequests[hashKey])
 | 
			
		||||
	assert.Equal(t, uint64(routines-routines/2), m.concurrentRequestsCounter[hashKey])
 | 
			
		||||
	assert.Equal(t, uint64(routines), m.maxConcurrentRequestsCounter[hashKey])
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestConcurrentRequestsMultiTunnel(t *testing.T) {
 | 
			
		||||
	m.concurrentRequests = make(map[uint64]uint64)
 | 
			
		||||
	m.maxConcurrentRequests = make(map[uint64]uint64)
 | 
			
		||||
	m.concurrentRequestsCounter = make(map[uint64]uint64)
 | 
			
		||||
	m.maxConcurrentRequestsCounter = make(map[uint64]uint64)
 | 
			
		||||
	tunnels := 20
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	wg.Add(tunnels)
 | 
			
		||||
| 
						 | 
				
			
			@ -62,13 +63,13 @@ func TestConcurrentRequestsMultiTunnel(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	assert.Len(t, m.concurrentRequests, tunnels)
 | 
			
		||||
	assert.Len(t, m.maxConcurrentRequests, tunnels)
 | 
			
		||||
	assert.Len(t, m.concurrentRequestsCounter, tunnels)
 | 
			
		||||
	assert.Len(t, m.maxConcurrentRequestsCounter, tunnels)
 | 
			
		||||
	for i := 0; i < tunnels; i++ {
 | 
			
		||||
		labels := []string{strconv.Itoa(i)}
 | 
			
		||||
		hashKey := hashLabelValues(labels)
 | 
			
		||||
		assert.Equal(t, uint64(i+1), m.concurrentRequests[hashKey])
 | 
			
		||||
		assert.Equal(t, uint64(i+1), m.maxConcurrentRequests[hashKey])
 | 
			
		||||
		assert.Equal(t, uint64(i+1), m.concurrentRequestsCounter[hashKey])
 | 
			
		||||
		assert.Equal(t, uint64(i+1), m.maxConcurrentRequestsCounter[hashKey])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Add(tunnels)
 | 
			
		||||
| 
						 | 
				
			
			@ -83,13 +84,13 @@ func TestConcurrentRequestsMultiTunnel(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	assert.Len(t, m.concurrentRequests, tunnels)
 | 
			
		||||
	assert.Len(t, m.maxConcurrentRequests, tunnels)
 | 
			
		||||
	assert.Len(t, m.concurrentRequestsCounter, tunnels)
 | 
			
		||||
	assert.Len(t, m.maxConcurrentRequestsCounter, tunnels)
 | 
			
		||||
	for i := 0; i < tunnels; i++ {
 | 
			
		||||
		labels := []string{strconv.Itoa(i)}
 | 
			
		||||
		hashKey := hashLabelValues(labels)
 | 
			
		||||
		assert.Equal(t, uint64(0), m.concurrentRequests[hashKey])
 | 
			
		||||
		assert.Equal(t, uint64(i+1), m.maxConcurrentRequests[hashKey])
 | 
			
		||||
		assert.Equal(t, uint64(0), m.concurrentRequestsCounter[hashKey])
 | 
			
		||||
		assert.Equal(t, uint64(i+1), m.maxConcurrentRequestsCounter[hashKey])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -50,7 +50,7 @@ func NewSupervisor(config *TunnelConfig) *Supervisor {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) Run(ctx context.Context, connectedSignal chan struct{}, metricsLabels map[string]string) error {
 | 
			
		||||
func (s *Supervisor) Run(ctx context.Context, connectedSignal chan struct{}, metricsLabels MetricsLabelList) error {
 | 
			
		||||
	logger := s.config.Logger
 | 
			
		||||
	if err := s.initialize(ctx, connectedSignal, metricsLabels); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
| 
						 | 
				
			
			@ -119,7 +119,7 @@ func (s *Supervisor) Run(ctx context.Context, connectedSignal chan struct{}, met
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct{}, metricsLabels map[string]string) error {
 | 
			
		||||
func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct{}, metricsLabels MetricsLabelList) error {
 | 
			
		||||
	logger := s.config.Logger
 | 
			
		||||
	edgeIPs, err := ResolveEdgeIPs(s.config.EdgeAddrs)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -154,7 +154,7 @@ func (s *Supervisor) initialize(ctx context.Context, connectedSignal chan struct
 | 
			
		|||
 | 
			
		||||
// startTunnel starts the first tunnel connection. The resulting error will be sent on
 | 
			
		||||
// s.tunnelErrors. It will send a signal via connectedSignal if registration succeed
 | 
			
		||||
func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal chan struct{}, metricsLabels map[string]string) {
 | 
			
		||||
func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal chan struct{}, metricsLabels MetricsLabelList) {
 | 
			
		||||
	err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(0), 0, connectedSignal, metricsLabels)
 | 
			
		||||
	defer func() {
 | 
			
		||||
		s.tunnelErrors <- tunnelError{index: 0, err: err}
 | 
			
		||||
| 
						 | 
				
			
			@ -182,7 +182,7 @@ func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal chan
 | 
			
		|||
 | 
			
		||||
// startTunnel starts a new tunnel connection. The resulting error will be sent on
 | 
			
		||||
// s.tunnelErrors.
 | 
			
		||||
func (s *Supervisor) startTunnel(ctx context.Context, index int, connectedSignal chan struct{}, metricsLabels map[string]string) {
 | 
			
		||||
func (s *Supervisor) startTunnel(ctx context.Context, index int, connectedSignal chan struct{}, metricsLabels MetricsLabelList) {
 | 
			
		||||
	err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(index), uint8(index), connectedSignal, metricsLabels)
 | 
			
		||||
	s.tunnelErrors <- tunnelError{index: index, err: err}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										111
									
								
								origin/tunnel.go
								
								
								
								
							
							
						
						
									
										111
									
								
								origin/tunnel.go
								
								
								
								
							| 
						 | 
				
			
			@ -118,7 +118,7 @@ func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP str
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}, connectedSignal chan struct{}, metricsLabels map[string]string) error {
 | 
			
		||||
func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}, connectedSignal chan struct{}, metricsLabels MetricsLabelList) error {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	go func() {
 | 
			
		||||
		<-shutdownC
 | 
			
		||||
| 
						 | 
				
			
			@ -141,7 +141,7 @@ func ServeTunnelLoop(ctx context.Context,
 | 
			
		|||
	addr *net.TCPAddr,
 | 
			
		||||
	connectionID uint8,
 | 
			
		||||
	connectedSignal chan struct{},
 | 
			
		||||
	metricsLabels map[string]string,
 | 
			
		||||
	metricsLabels MetricsLabelList,
 | 
			
		||||
) error {
 | 
			
		||||
	logger := config.Logger
 | 
			
		||||
	config.Metrics.incrementHaConnections()
 | 
			
		||||
| 
						 | 
				
			
			@ -176,7 +176,7 @@ func ServeTunnel(
 | 
			
		|||
	connectionID uint8,
 | 
			
		||||
	connectedFuse *h2mux.BooleanFuse,
 | 
			
		||||
	backoff *BackoffHandler,
 | 
			
		||||
	metricsLabels map[string]string,
 | 
			
		||||
	metricsLabels MetricsLabelList,
 | 
			
		||||
) (err error, recoverable bool) {
 | 
			
		||||
	// Treat panics as recoverable errors
 | 
			
		||||
	defer func() {
 | 
			
		||||
| 
						 | 
				
			
			@ -197,17 +197,8 @@ func ServeTunnel(
 | 
			
		|||
	tags := make(map[string]string)
 | 
			
		||||
	tags["ha"] = connectionTag
 | 
			
		||||
 | 
			
		||||
	metricsLabelKeys := make([]string, len(metricsLabels))
 | 
			
		||||
	metricsLabelValues := make([]string, len(metricsLabels))
 | 
			
		||||
	i := 0
 | 
			
		||||
	for k, v := range metricsLabels {
 | 
			
		||||
		metricsLabelKeys[i] = k
 | 
			
		||||
		metricsLabelValues[i] = v
 | 
			
		||||
		i++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Returns error from parsing the origin URL or handshake errors
 | 
			
		||||
	handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID, metricsLabelKeys, metricsLabelValues)
 | 
			
		||||
	handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID, metricsLabels)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		errLog := config.Logger.WithError(err)
 | 
			
		||||
		switch err.(type) {
 | 
			
		||||
| 
						 | 
				
			
			@ -225,7 +216,7 @@ func ServeTunnel(
 | 
			
		|||
	errGroup, serveCtx := errgroup.WithContext(ctx)
 | 
			
		||||
 | 
			
		||||
	errGroup.Go(func() error {
 | 
			
		||||
		err := RegisterTunnel(serveCtx, handler, config, connectionID, originLocalIP)
 | 
			
		||||
		err := RegisterTunnel(serveCtx, handler, config, originLocalIP)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			connectedFuse.Fuse(true)
 | 
			
		||||
			backoff.SetGracePeriod()
 | 
			
		||||
| 
						 | 
				
			
			@ -243,7 +234,7 @@ func ServeTunnel(
 | 
			
		|||
				handler.muxer.Shutdown()
 | 
			
		||||
				return err
 | 
			
		||||
			case <-updateMetricsTickC:
 | 
			
		||||
				handler.UpdateMetrics(connectionTag)
 | 
			
		||||
				handler.updateMetrics()
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
| 
						 | 
				
			
			@ -296,8 +287,7 @@ func IsRPCStreamResponse(headers []h2mux.Header) bool {
 | 
			
		|||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RegisterTunnel returns the name of the location connected to, or an error
 | 
			
		||||
func RegisterTunnel(ctx context.Context, handler *TunnelHandler, config *TunnelConfig, connectionID uint8, originLocalIP string) error {
 | 
			
		||||
func RegisterTunnel(ctx context.Context, handler *TunnelHandler, config *TunnelConfig, originLocalIP string) error {
 | 
			
		||||
	config.Logger.Debug("initiating RPC stream to register")
 | 
			
		||||
	muxer := handler.muxer
 | 
			
		||||
	stream, err := muxer.OpenStream([]h2mux.Header{
 | 
			
		||||
| 
						 | 
				
			
			@ -328,13 +318,13 @@ func RegisterTunnel(ctx context.Context, handler *TunnelHandler, config *TunnelC
 | 
			
		|||
		ctx,
 | 
			
		||||
		config.OriginCert,
 | 
			
		||||
		config.Hostname,
 | 
			
		||||
		config.RegistrationOptions(connectionID, originLocalIP),
 | 
			
		||||
		config.RegistrationOptions(handler.connectionID, originLocalIP),
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		// RegisterTunnel RPC failure
 | 
			
		||||
		return clientRegisterTunnelError{cause: err}
 | 
			
		||||
	}
 | 
			
		||||
	LogServerInfo(serverInfoPromise.Result(), connectionID, handler, config.Logger)
 | 
			
		||||
	LogServerInfo(serverInfoPromise.Result(), handler, config.Logger)
 | 
			
		||||
 | 
			
		||||
	for _, logLine := range registration.LogLines {
 | 
			
		||||
		config.Logger.Info(logLine)
 | 
			
		||||
| 
						 | 
				
			
			@ -381,7 +371,6 @@ func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log
 | 
			
		|||
 | 
			
		||||
func LogServerInfo(
 | 
			
		||||
	promise tunnelrpc.ServerInfo_Promise,
 | 
			
		||||
	connectionID uint8,
 | 
			
		||||
	handler *TunnelHandler,
 | 
			
		||||
	logger *log.Logger,
 | 
			
		||||
) {
 | 
			
		||||
| 
						 | 
				
			
			@ -395,8 +384,7 @@ func LogServerInfo(
 | 
			
		|||
	}
 | 
			
		||||
	logger.Infof("Connected to %s", serverInfo.LocationName)
 | 
			
		||||
 | 
			
		||||
	metricsLabels := handler.getCombinedMetricsLabels(uint8ToString(connectionID))
 | 
			
		||||
	handler.metrics.registerServerLocation(metricsLabels, serverInfo.LocationName)
 | 
			
		||||
	handler.registerServerLocationMetrics(serverInfo.LocationName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func H2RequestHeadersToH1Request(h2 []h2mux.Header, h1 *http.Request) error {
 | 
			
		||||
| 
						 | 
				
			
			@ -448,11 +436,10 @@ type TunnelHandler struct {
 | 
			
		|||
	tags       []tunnelpogs.Tag
 | 
			
		||||
	metrics    *TunnelMetrics
 | 
			
		||||
 | 
			
		||||
	baseMetricsLabelKeys   []string
 | 
			
		||||
	baseMetricsLabelValues []string
 | 
			
		||||
	baseMetricsLabels MetricsLabelList
 | 
			
		||||
 | 
			
		||||
	// connectionID is only used by metrics, and prometheus requires labels to be string
 | 
			
		||||
	connectionID string
 | 
			
		||||
	// connectionID is used by metrics and is converted to string when used as a prometheus label
 | 
			
		||||
	connectionID uint8
 | 
			
		||||
	logger       *log.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -463,23 +450,21 @@ func NewTunnelHandler(ctx context.Context,
 | 
			
		|||
	config *TunnelConfig,
 | 
			
		||||
	addr string,
 | 
			
		||||
	connectionID uint8,
 | 
			
		||||
	baseMetricsLabelKeys []string,
 | 
			
		||||
	baseMetricsLabelValues []string,
 | 
			
		||||
	baseMetricsLabels MetricsLabelList,
 | 
			
		||||
) (*TunnelHandler, string, error) {
 | 
			
		||||
	originURL, err := validation.ValidateUrl(config.OriginUrl)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, "", fmt.Errorf("Unable to parse origin url %#v", originURL)
 | 
			
		||||
	}
 | 
			
		||||
	h := &TunnelHandler{
 | 
			
		||||
		originUrl:              originURL,
 | 
			
		||||
		httpClient:             config.HTTPTransport,
 | 
			
		||||
		tlsConfig:              config.ClientTlsConfig,
 | 
			
		||||
		tags:                   config.Tags,
 | 
			
		||||
		metrics:                config.Metrics,
 | 
			
		||||
		connectionID:           uint8ToString(connectionID),
 | 
			
		||||
		baseMetricsLabelKeys:   baseMetricsLabelKeys,
 | 
			
		||||
		baseMetricsLabelValues: baseMetricsLabelValues,
 | 
			
		||||
		logger:                 config.Logger,
 | 
			
		||||
		originUrl:         originURL,
 | 
			
		||||
		httpClient:        config.HTTPTransport,
 | 
			
		||||
		tlsConfig:         config.ClientTlsConfig,
 | 
			
		||||
		tags:              config.Tags,
 | 
			
		||||
		metrics:           config.Metrics,
 | 
			
		||||
		connectionID:      connectionID,
 | 
			
		||||
		baseMetricsLabels: baseMetricsLabels,
 | 
			
		||||
		logger:            config.Logger,
 | 
			
		||||
	}
 | 
			
		||||
	if h.httpClient == nil {
 | 
			
		||||
		h.httpClient = http.DefaultTransport
 | 
			
		||||
| 
						 | 
				
			
			@ -522,12 +507,8 @@ func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) getCombinedMetricsLabels(connectionID string) []string {
 | 
			
		||||
	return append(h.baseMetricsLabelValues, connectionID)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
 | 
			
		||||
	h.metrics.incrementRequests(h.getCombinedMetricsLabels(h.connectionID))
 | 
			
		||||
	h.incrementRequestMetrics()
 | 
			
		||||
	req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		h.logger.WithError(err).Panic("Unexpected error from http.NewRequest")
 | 
			
		||||
| 
						 | 
				
			
			@ -542,8 +523,7 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
 | 
			
		|||
	h.logRequest(req, cfRay, lbProbe)
 | 
			
		||||
	if websocket.IsWebSocketUpgrade(req) {
 | 
			
		||||
		conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
 | 
			
		||||
		h.logger.WithFields(log.Fields{"connectionID": h.connectionID, "status": response.StatusCode}).Info("incrementResponses")
 | 
			
		||||
		h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
 | 
			
		||||
		h.incrementResponseMetrics(response.StatusCode)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			h.logError(stream, err)
 | 
			
		||||
		} else {
 | 
			
		||||
| 
						 | 
				
			
			@ -556,8 +536,7 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
 | 
			
		|||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		response, err := h.httpClient.RoundTrip(req)
 | 
			
		||||
		h.logger.WithFields(log.Fields{"connectionID": h.connectionID, "status": response.StatusCode}).Info("incrementResponses")
 | 
			
		||||
		h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
 | 
			
		||||
		h.incrementResponseMetrics(response.StatusCode)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			h.logError(stream, err)
 | 
			
		||||
		} else {
 | 
			
		||||
| 
						 | 
				
			
			@ -568,7 +547,7 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
 | 
			
		|||
			h.logResponse(response, cfRay, lbProbe)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	h.metrics.decrementConcurrentRequests(h.getCombinedMetricsLabels(h.connectionID))
 | 
			
		||||
	h.decrementConcurrentRequestMetrics()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -601,9 +580,41 @@ func (h *TunnelHandler) logResponse(r *http.Response, cfRay string, lbProbe bool
 | 
			
		|||
	h.logger.Debugf("Response Headers %+v", r.Header)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) UpdateMetrics(connectionID string) {
 | 
			
		||||
	// why only updateMuxerMetrics
 | 
			
		||||
	h.metrics.updateMuxerMetrics(h.getCombinedMetricsLabels(h.connectionID), h.muxer.Metrics())
 | 
			
		||||
func (h *TunnelHandler) getCombinedMetricsLabels(values ...string) []string {
 | 
			
		||||
	return append(h.baseMetricsLabels.Values, values...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) incrementResponseMetrics(statusCode int) {
 | 
			
		||||
	h.metrics.incrementResponses(
 | 
			
		||||
		h.getCombinedMetricsLabels(uint8ToString(h.connectionID)),
 | 
			
		||||
		statusCode,
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) decrementConcurrentRequestMetrics() {
 | 
			
		||||
	h.metrics.decrementConcurrentRequests(
 | 
			
		||||
		h.getCombinedMetricsLabels(uint8ToString(h.connectionID)),
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) incrementRequestMetrics() {
 | 
			
		||||
	h.metrics.incrementRequests(
 | 
			
		||||
		h.getCombinedMetricsLabels(uint8ToString(h.connectionID)),
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) updateMetrics() {
 | 
			
		||||
	h.metrics.updateMuxerMetrics(
 | 
			
		||||
		h.getCombinedMetricsLabels(uint8ToString(h.connectionID)),
 | 
			
		||||
		h.muxer.Metrics(),
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TunnelHandler) registerServerLocationMetrics(location string) {
 | 
			
		||||
	h.metrics.registerServerLocation(
 | 
			
		||||
		h.getCombinedMetricsLabels(uint8ToString(h.connectionID)),
 | 
			
		||||
		location,
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func uint8ToString(input uint8) string {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue