wiring
This commit is contained in:
parent
2791c20b55
commit
09fbd75000
|
@ -31,8 +31,8 @@ type muxerMetrics struct {
|
||||||
|
|
||||||
type TunnelMetrics struct {
|
type TunnelMetrics struct {
|
||||||
haConnections prometheus.Gauge
|
haConnections prometheus.Gauge
|
||||||
totalRequests prometheus.Counter
|
|
||||||
requests *prometheus.CounterVec
|
requests *prometheus.CounterVec
|
||||||
// concurrentRequestsLock is a mutex for concurrentRequests and maxConcurrentRequests
|
// concurrentRequestsLock is a mutex for concurrentRequests and maxConcurrentRequests
|
||||||
concurrentRequestsLock sync.Mutex
|
concurrentRequestsLock sync.Mutex
|
||||||
concurrentRequestsPerTunnel *prometheus.GaugeVec
|
concurrentRequestsPerTunnel *prometheus.GaugeVec
|
||||||
|
@ -43,7 +43,7 @@ type TunnelMetrics struct {
|
||||||
maxConcurrentRequests map[uint64]uint64
|
maxConcurrentRequests map[uint64]uint64
|
||||||
timerRetries prometheus.Gauge
|
timerRetries prometheus.Gauge
|
||||||
|
|
||||||
reponses *prometheus.CounterVec
|
responses *prometheus.CounterVec
|
||||||
serverLocations *prometheus.GaugeVec
|
serverLocations *prometheus.GaugeVec
|
||||||
// locationLock is a mutex for oldServerLocations
|
// locationLock is a mutex for oldServerLocations
|
||||||
locationLock sync.Mutex
|
locationLock sync.Mutex
|
||||||
|
@ -53,13 +53,13 @@ type TunnelMetrics struct {
|
||||||
muxerMetrics *muxerMetrics
|
muxerMetrics *muxerMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMuxerMetrics() *muxerMetrics {
|
func newMuxerMetrics(baseMetricsLabels []string) *muxerMetrics {
|
||||||
rtt := prometheus.NewGaugeVec(
|
rtt := prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Name: "argotunnel_rtt",
|
Name: "argotunnel_rtt",
|
||||||
Help: "Round-trip time in millisecond",
|
Help: "Round-trip time in millisecond",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(rtt)
|
prometheus.MustRegister(rtt)
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_rtt_min",
|
Name: "argotunnel_rtt_min",
|
||||||
Help: "Shortest round-trip time in millisecond",
|
Help: "Shortest round-trip time in millisecond",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(rttMin)
|
prometheus.MustRegister(rttMin)
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_rtt_max",
|
Name: "argotunnel_rtt_max",
|
||||||
Help: "Longest round-trip time in millisecond",
|
Help: "Longest round-trip time in millisecond",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(rttMax)
|
prometheus.MustRegister(rttMax)
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_receive_window_ave",
|
Name: "argotunnel_receive_window_ave",
|
||||||
Help: "Average receive window size in bytes",
|
Help: "Average receive window size in bytes",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(receiveWindowAve)
|
prometheus.MustRegister(receiveWindowAve)
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_send_window_ave",
|
Name: "argotunnel_send_window_ave",
|
||||||
Help: "Average send window size in bytes",
|
Help: "Average send window size in bytes",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(sendWindowAve)
|
prometheus.MustRegister(sendWindowAve)
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_receive_window_min",
|
Name: "argotunnel_receive_window_min",
|
||||||
Help: "Smallest receive window size in bytes",
|
Help: "Smallest receive window size in bytes",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(receiveWindowMin)
|
prometheus.MustRegister(receiveWindowMin)
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_receive_window_max",
|
Name: "argotunnel_receive_window_max",
|
||||||
Help: "Largest receive window size in bytes",
|
Help: "Largest receive window size in bytes",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(receiveWindowMax)
|
prometheus.MustRegister(receiveWindowMax)
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_send_window_min",
|
Name: "argotunnel_send_window_min",
|
||||||
Help: "Smallest send window size in bytes",
|
Help: "Smallest send window size in bytes",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(sendWindowMin)
|
prometheus.MustRegister(sendWindowMin)
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_send_window_max",
|
Name: "argotunnel_send_window_max",
|
||||||
Help: "Largest send window size in bytes",
|
Help: "Largest send window size in bytes",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(sendWindowMax)
|
prometheus.MustRegister(sendWindowMax)
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_inbound_bytes_per_sec_curr",
|
Name: "argotunnel_inbound_bytes_per_sec_curr",
|
||||||
Help: "Current inbounding bytes per second, 0 if there is no incoming connection",
|
Help: "Current inbounding bytes per second, 0 if there is no incoming connection",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(inBoundRateCurr)
|
prometheus.MustRegister(inBoundRateCurr)
|
||||||
|
|
||||||
|
@ -149,7 +149,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_inbound_bytes_per_sec_min",
|
Name: "argotunnel_inbound_bytes_per_sec_min",
|
||||||
Help: "Minimum non-zero inbounding bytes per second",
|
Help: "Minimum non-zero inbounding bytes per second",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(inBoundRateMin)
|
prometheus.MustRegister(inBoundRateMin)
|
||||||
|
|
||||||
|
@ -158,7 +158,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_inbound_bytes_per_sec_max",
|
Name: "argotunnel_inbound_bytes_per_sec_max",
|
||||||
Help: "Maximum inbounding bytes per second",
|
Help: "Maximum inbounding bytes per second",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(inBoundRateMax)
|
prometheus.MustRegister(inBoundRateMax)
|
||||||
|
|
||||||
|
@ -167,7 +167,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_outbound_bytes_per_sec_curr",
|
Name: "argotunnel_outbound_bytes_per_sec_curr",
|
||||||
Help: "Current outbounding bytes per second, 0 if there is no outgoing traffic",
|
Help: "Current outbounding bytes per second, 0 if there is no outgoing traffic",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(outBoundRateCurr)
|
prometheus.MustRegister(outBoundRateCurr)
|
||||||
|
|
||||||
|
@ -176,7 +176,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_outbound_bytes_per_sec_min",
|
Name: "argotunnel_outbound_bytes_per_sec_min",
|
||||||
Help: "Minimum non-zero outbounding bytes per second",
|
Help: "Minimum non-zero outbounding bytes per second",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(outBoundRateMin)
|
prometheus.MustRegister(outBoundRateMin)
|
||||||
|
|
||||||
|
@ -185,7 +185,7 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
Name: "argotunnel_outbound_bytes_per_sec_max",
|
Name: "argotunnel_outbound_bytes_per_sec_max",
|
||||||
Help: "Maximum outbounding bytes per second",
|
Help: "Maximum outbounding bytes per second",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricsLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(outBoundRateMax)
|
prometheus.MustRegister(outBoundRateMax)
|
||||||
|
|
||||||
|
@ -231,7 +231,7 @@ func convertRTTMilliSec(t time.Duration) float64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metrics that can be collected without asking the edge
|
// Metrics that can be collected without asking the edge
|
||||||
func NewTunnelMetrics() *TunnelMetrics {
|
func NewTunnelMetrics(baseMetricLabels []string) *TunnelMetrics {
|
||||||
haConnections := prometheus.NewGauge(
|
haConnections := prometheus.NewGauge(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Name: "argotunnel_ha_connections",
|
Name: "argotunnel_ha_connections",
|
||||||
|
@ -239,28 +239,21 @@ func NewTunnelMetrics() *TunnelMetrics {
|
||||||
})
|
})
|
||||||
prometheus.MustRegister(haConnections)
|
prometheus.MustRegister(haConnections)
|
||||||
|
|
||||||
totalRequests := prometheus.NewCounter(
|
requests := prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "argotunnel_total_requests",
|
Name: "argotunnel_requests",
|
||||||
Help: "Amount of requests proxied through all the tunnels",
|
|
||||||
})
|
|
||||||
prometheus.MustRegister(totalRequests)
|
|
||||||
|
|
||||||
requestsPerTunnel := prometheus.NewCounterVec(
|
|
||||||
prometheus.CounterOpts{
|
|
||||||
Name: "argotunnel_requests_per_tunnel",
|
|
||||||
Help: "Amount of requests proxied through each tunnel",
|
Help: "Amount of requests proxied through each tunnel",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(requestsPerTunnel)
|
prometheus.MustRegister(requests)
|
||||||
|
|
||||||
concurrentRequestsPerTunnel := prometheus.NewGaugeVec(
|
concurrentRequestsPerTunnel := prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Name: "argotunnel_concurrent_requests_per_tunnel",
|
Name: "argotunnel_concurrent_requests_per_tunnel",
|
||||||
Help: "Concurrent requests proxied through each tunnel",
|
Help: "Concurrent requests proxied through each tunnel",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(concurrentRequestsPerTunnel)
|
prometheus.MustRegister(concurrentRequestsPerTunnel)
|
||||||
|
|
||||||
|
@ -269,7 +262,7 @@ func NewTunnelMetrics() *TunnelMetrics {
|
||||||
Name: "argotunnel_max_concurrent_requests_per_tunnel",
|
Name: "argotunnel_max_concurrent_requests_per_tunnel",
|
||||||
Help: "Largest number of concurrent requests proxied through each tunnel so far",
|
Help: "Largest number of concurrent requests proxied through each tunnel so far",
|
||||||
},
|
},
|
||||||
[]string{"connection_id"},
|
append(baseMetricLabels, "connection_id"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(maxConcurrentRequestsPerTunnel)
|
prometheus.MustRegister(maxConcurrentRequestsPerTunnel)
|
||||||
|
|
||||||
|
@ -289,38 +282,37 @@ func NewTunnelMetrics() *TunnelMetrics {
|
||||||
// )
|
// )
|
||||||
// prometheus.MustRegister(responseByCode)
|
// prometheus.MustRegister(responseByCode)
|
||||||
|
|
||||||
responseCodePerTunnel := prometheus.NewCounterVec(
|
responses := prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "argotunnel_response_code_per_tunnel",
|
Name: "argotunnel_responses",
|
||||||
Help: "Count of responses by HTTP status code fore each tunnel",
|
Help: "Count of responses for each tunnel",
|
||||||
},
|
},
|
||||||
[]string{"connection_id", "status_code"},
|
append(baseMetricLabels, "connection_id", "status_code"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(responseCodePerTunnel)
|
prometheus.MustRegister(responses)
|
||||||
|
|
||||||
serverLocations := prometheus.NewGaugeVec(
|
serverLocations := prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Name: "argotunnel_server_locations",
|
Name: "argotunnel_server_locations",
|
||||||
Help: "Where each tunnel is connected to. 1 means current location, 0 means previous locations.",
|
Help: "Where each tunnel is connected to. 1 means current location, 0 means previous locations.",
|
||||||
},
|
},
|
||||||
[]string{"connection_id", "location"},
|
append(baseMetricLabels, "connection_id", "location"),
|
||||||
)
|
)
|
||||||
prometheus.MustRegister(serverLocations)
|
prometheus.MustRegister(serverLocations)
|
||||||
|
|
||||||
return &TunnelMetrics{
|
return &TunnelMetrics{
|
||||||
haConnections: haConnections,
|
haConnections: haConnections,
|
||||||
totalRequests: totalRequests,
|
requests: requests,
|
||||||
requests: requestsPerTunnel,
|
|
||||||
concurrentRequestsPerTunnel: concurrentRequestsPerTunnel,
|
concurrentRequestsPerTunnel: concurrentRequestsPerTunnel,
|
||||||
concurrentRequests: make(map[uint64]uint64),
|
concurrentRequests: make(map[uint64]uint64),
|
||||||
maxConcurrentRequestsPerTunnel: maxConcurrentRequestsPerTunnel,
|
maxConcurrentRequestsPerTunnel: maxConcurrentRequestsPerTunnel,
|
||||||
maxConcurrentRequests: make(map[uint64]uint64),
|
maxConcurrentRequests: make(map[uint64]uint64),
|
||||||
timerRetries: timerRetries,
|
timerRetries: timerRetries,
|
||||||
|
|
||||||
reponses: responseCodePerTunnel,
|
responses: responses,
|
||||||
serverLocations: serverLocations,
|
serverLocations: serverLocations,
|
||||||
oldServerLocations: make(map[uint64]string),
|
oldServerLocations: make(map[uint64]string),
|
||||||
muxerMetrics: newMuxerMetrics(),
|
muxerMetrics: newMuxerMetrics(baseMetricLabels),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,7 +354,6 @@ func (t *TunnelMetrics) incrementRequests(metricLabelValues []string) {
|
||||||
}
|
}
|
||||||
t.concurrentRequestsLock.Unlock()
|
t.concurrentRequestsLock.Unlock()
|
||||||
|
|
||||||
t.totalRequests.Inc()
|
|
||||||
t.requests.WithLabelValues(metricLabelValues...).Inc()
|
t.requests.WithLabelValues(metricLabelValues...).Inc()
|
||||||
t.concurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Inc()
|
t.concurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Inc()
|
||||||
}
|
}
|
||||||
|
@ -380,7 +371,7 @@ func (t *TunnelMetrics) decrementConcurrentRequests(metricLabelValues []string)
|
||||||
|
|
||||||
func (t *TunnelMetrics) incrementResponses(metricLabelValues []string, responseCode int) {
|
func (t *TunnelMetrics) incrementResponses(metricLabelValues []string, responseCode int) {
|
||||||
labelValues := append(metricLabelValues, strconv.Itoa(responseCode))
|
labelValues := append(metricLabelValues, strconv.Itoa(responseCode))
|
||||||
t.reponses.WithLabelValues(labelValues...).Inc()
|
t.responses.WithLabelValues(labelValues...).Inc()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// can only be called once
|
// can only be called once
|
||||||
var m = NewTunnelMetrics()
|
var testMetrics = make([]string, 0)
|
||||||
|
var m = NewTunnelMetrics(testMetrics)
|
||||||
|
|
||||||
func TestConcurrentRequestsSingleTunnel(t *testing.T) {
|
func TestConcurrentRequestsSingleTunnel(t *testing.T) {
|
||||||
routines := 20
|
routines := 20
|
||||||
|
@ -92,7 +93,6 @@ func TestConcurrentRequestsMultiTunnel(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRegisterServerLocation(t *testing.T) {
|
func TestRegisterServerLocation(t *testing.T) {
|
||||||
tunnels := 20
|
tunnels := 20
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
|
@ -542,6 +542,7 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
|
||||||
h.logRequest(req, cfRay, lbProbe)
|
h.logRequest(req, cfRay, lbProbe)
|
||||||
if websocket.IsWebSocketUpgrade(req) {
|
if websocket.IsWebSocketUpgrade(req) {
|
||||||
conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
|
conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
|
||||||
|
h.logger.WithFields(log.Fields{"connectionID": h.connectionID, "status": response.StatusCode}).Info("incrementResponses")
|
||||||
h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
|
h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logError(stream, err)
|
h.logError(stream, err)
|
||||||
|
@ -555,6 +556,7 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
response, err := h.httpClient.RoundTrip(req)
|
response, err := h.httpClient.RoundTrip(req)
|
||||||
|
h.logger.WithFields(log.Fields{"connectionID": h.connectionID, "status": response.StatusCode}).Info("incrementResponses")
|
||||||
h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
|
h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logError(stream, err)
|
h.logError(stream, err)
|
||||||
|
|
Loading…
Reference in New Issue