refactoring checkpoint; unit tests passing
This commit is contained in:
parent
e4a68da7c2
commit
f2b0e6d962
|
@ -1,6 +1,8 @@
|
||||||
package origin
|
package origin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"hash/fnv"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -28,25 +30,25 @@ type muxerMetrics struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type TunnelMetrics struct {
|
type TunnelMetrics struct {
|
||||||
haConnections prometheus.Gauge
|
haConnections prometheus.Gauge
|
||||||
totalRequests prometheus.Counter
|
totalRequests prometheus.Counter
|
||||||
requestsPerTunnel *prometheus.CounterVec
|
requests *prometheus.CounterVec
|
||||||
// concurrentRequestsLock is a mutex for concurrentRequests and maxConcurrentRequests
|
// concurrentRequestsLock is a mutex for concurrentRequests and maxConcurrentRequests
|
||||||
concurrentRequestsLock sync.Mutex
|
concurrentRequestsLock sync.Mutex
|
||||||
concurrentRequestsPerTunnel *prometheus.GaugeVec
|
concurrentRequestsPerTunnel *prometheus.GaugeVec
|
||||||
// concurrentRequests records count of concurrent requests for each tunnel
|
// concurrentRequests records count of concurrent requests for each tunnel, keyed by hash of label values
|
||||||
concurrentRequests map[string]uint64
|
concurrentRequests map[uint64]uint64
|
||||||
maxConcurrentRequestsPerTunnel *prometheus.GaugeVec
|
maxConcurrentRequestsPerTunnel *prometheus.GaugeVec
|
||||||
// concurrentRequests records max count of concurrent requests for each tunnel
|
// concurrentRequests records max count of concurrent requests for each tunnel, keyed by hash of label values
|
||||||
maxConcurrentRequests map[string]uint64
|
maxConcurrentRequests map[uint64]uint64
|
||||||
timerRetries prometheus.Gauge
|
timerRetries prometheus.Gauge
|
||||||
responseByCode *prometheus.CounterVec
|
|
||||||
responseCodePerTunnel *prometheus.CounterVec
|
reponses *prometheus.CounterVec
|
||||||
serverLocations *prometheus.GaugeVec
|
serverLocations *prometheus.GaugeVec
|
||||||
// locationLock is a mutex for oldServerLocations
|
// locationLock is a mutex for oldServerLocations
|
||||||
locationLock sync.Mutex
|
locationLock sync.Mutex
|
||||||
// oldServerLocations stores the last server the tunnel was connected to
|
// oldServerLocations stores the last server the tunnel was connected to
|
||||||
oldServerLocations map[string]string
|
oldServerLocations map[uint64]string
|
||||||
|
|
||||||
muxerMetrics *muxerMetrics
|
muxerMetrics *muxerMetrics
|
||||||
}
|
}
|
||||||
|
@ -206,22 +208,22 @@ func newMuxerMetrics() *muxerMetrics {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *muxerMetrics) update(connectionID string, metrics *h2mux.MuxerMetrics) {
|
func (m *muxerMetrics) update(metricLabelValues []string, metrics *h2mux.MuxerMetrics) {
|
||||||
m.rtt.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTT))
|
m.rtt.WithLabelValues(metricLabelValues...).Set(convertRTTMilliSec(metrics.RTT))
|
||||||
m.rttMin.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTTMin))
|
m.rttMin.WithLabelValues(metricLabelValues...).Set(convertRTTMilliSec(metrics.RTTMin))
|
||||||
m.rttMax.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTTMax))
|
m.rttMax.WithLabelValues(metricLabelValues...).Set(convertRTTMilliSec(metrics.RTTMax))
|
||||||
m.receiveWindowAve.WithLabelValues(connectionID).Set(metrics.ReceiveWindowAve)
|
m.receiveWindowAve.WithLabelValues(metricLabelValues...).Set(metrics.ReceiveWindowAve)
|
||||||
m.sendWindowAve.WithLabelValues(connectionID).Set(metrics.SendWindowAve)
|
m.sendWindowAve.WithLabelValues(metricLabelValues...).Set(metrics.SendWindowAve)
|
||||||
m.receiveWindowMin.WithLabelValues(connectionID).Set(float64(metrics.ReceiveWindowMin))
|
m.receiveWindowMin.WithLabelValues(metricLabelValues...).Set(float64(metrics.ReceiveWindowMin))
|
||||||
m.receiveWindowMax.WithLabelValues(connectionID).Set(float64(metrics.ReceiveWindowMax))
|
m.receiveWindowMax.WithLabelValues(metricLabelValues...).Set(float64(metrics.ReceiveWindowMax))
|
||||||
m.sendWindowMin.WithLabelValues(connectionID).Set(float64(metrics.SendWindowMin))
|
m.sendWindowMin.WithLabelValues(metricLabelValues...).Set(float64(metrics.SendWindowMin))
|
||||||
m.sendWindowMax.WithLabelValues(connectionID).Set(float64(metrics.SendWindowMax))
|
m.sendWindowMax.WithLabelValues(metricLabelValues...).Set(float64(metrics.SendWindowMax))
|
||||||
m.inBoundRateCurr.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateCurr))
|
m.inBoundRateCurr.WithLabelValues(metricLabelValues...).Set(float64(metrics.InBoundRateCurr))
|
||||||
m.inBoundRateMin.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateMin))
|
m.inBoundRateMin.WithLabelValues(metricLabelValues...).Set(float64(metrics.InBoundRateMin))
|
||||||
m.inBoundRateMax.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateMax))
|
m.inBoundRateMax.WithLabelValues(metricLabelValues...).Set(float64(metrics.InBoundRateMax))
|
||||||
m.outBoundRateCurr.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateCurr))
|
m.outBoundRateCurr.WithLabelValues(metricLabelValues...).Set(float64(metrics.OutBoundRateCurr))
|
||||||
m.outBoundRateMin.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateMin))
|
m.outBoundRateMin.WithLabelValues(metricLabelValues...).Set(float64(metrics.OutBoundRateMin))
|
||||||
m.outBoundRateMax.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateMax))
|
m.outBoundRateMax.WithLabelValues(metricLabelValues...).Set(float64(metrics.OutBoundRateMax))
|
||||||
}
|
}
|
||||||
|
|
||||||
func convertRTTMilliSec(t time.Duration) float64 {
|
func convertRTTMilliSec(t time.Duration) float64 {
|
||||||
|
@ -278,14 +280,14 @@ func NewTunnelMetrics() *TunnelMetrics {
|
||||||
})
|
})
|
||||||
prometheus.MustRegister(timerRetries)
|
prometheus.MustRegister(timerRetries)
|
||||||
|
|
||||||
responseByCode := prometheus.NewCounterVec(
|
// responseByCode := prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
// prometheus.CounterOpts{
|
||||||
Name: "argotunnel_response_by_code",
|
// Name: "argotunnel_response_by_code",
|
||||||
Help: "Count of responses by HTTP status code",
|
// Help: "Count of responses by HTTP status code",
|
||||||
},
|
// },
|
||||||
[]string{"status_code"},
|
// []string{"status_code"},
|
||||||
)
|
// )
|
||||||
prometheus.MustRegister(responseByCode)
|
// prometheus.MustRegister(responseByCode)
|
||||||
|
|
||||||
responseCodePerTunnel := prometheus.NewCounterVec(
|
responseCodePerTunnel := prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
|
@ -308,20 +310,28 @@ func NewTunnelMetrics() *TunnelMetrics {
|
||||||
return &TunnelMetrics{
|
return &TunnelMetrics{
|
||||||
haConnections: haConnections,
|
haConnections: haConnections,
|
||||||
totalRequests: totalRequests,
|
totalRequests: totalRequests,
|
||||||
requestsPerTunnel: requestsPerTunnel,
|
requests: requestsPerTunnel,
|
||||||
concurrentRequestsPerTunnel: concurrentRequestsPerTunnel,
|
concurrentRequestsPerTunnel: concurrentRequestsPerTunnel,
|
||||||
concurrentRequests: make(map[string]uint64),
|
concurrentRequests: make(map[uint64]uint64),
|
||||||
maxConcurrentRequestsPerTunnel: maxConcurrentRequestsPerTunnel,
|
maxConcurrentRequestsPerTunnel: maxConcurrentRequestsPerTunnel,
|
||||||
maxConcurrentRequests: make(map[string]uint64),
|
maxConcurrentRequests: make(map[uint64]uint64),
|
||||||
timerRetries: timerRetries,
|
timerRetries: timerRetries,
|
||||||
responseByCode: responseByCode,
|
|
||||||
responseCodePerTunnel: responseCodePerTunnel,
|
reponses: responseCodePerTunnel,
|
||||||
serverLocations: serverLocations,
|
serverLocations: serverLocations,
|
||||||
oldServerLocations: make(map[string]string),
|
oldServerLocations: make(map[uint64]string),
|
||||||
muxerMetrics: newMuxerMetrics(),
|
muxerMetrics: newMuxerMetrics(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hashLabelValues(labelValues []string) uint64 {
|
||||||
|
h := fnv.New64()
|
||||||
|
for _, text := range labelValues {
|
||||||
|
h.Write([]byte(text))
|
||||||
|
}
|
||||||
|
return h.Sum64()
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TunnelMetrics) incrementHaConnections() {
|
func (t *TunnelMetrics) incrementHaConnections() {
|
||||||
t.haConnections.Inc()
|
t.haConnections.Inc()
|
||||||
}
|
}
|
||||||
|
@ -330,56 +340,61 @@ func (t *TunnelMetrics) decrementHaConnections() {
|
||||||
t.haConnections.Dec()
|
t.haConnections.Dec()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TunnelMetrics) updateMuxerMetrics(connectionID string, metrics *h2mux.MuxerMetrics) {
|
func (t *TunnelMetrics) updateMuxerMetrics(metricLabelValues []string, metrics *h2mux.MuxerMetrics) {
|
||||||
t.muxerMetrics.update(connectionID, metrics)
|
t.muxerMetrics.update(metricLabelValues, metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TunnelMetrics) incrementRequests(connectionID string) {
|
func (t *TunnelMetrics) incrementRequests(metricLabelValues []string) {
|
||||||
t.concurrentRequestsLock.Lock()
|
t.concurrentRequestsLock.Lock()
|
||||||
var concurrentRequests uint64
|
var concurrentRequests uint64
|
||||||
var ok bool
|
var ok bool
|
||||||
if concurrentRequests, ok = t.concurrentRequests[connectionID]; ok {
|
hashKey := hashLabelValues(metricLabelValues)
|
||||||
t.concurrentRequests[connectionID] += 1
|
if concurrentRequests, ok = t.concurrentRequests[hashKey]; ok {
|
||||||
|
t.concurrentRequests[hashKey] += 1
|
||||||
concurrentRequests++
|
concurrentRequests++
|
||||||
} else {
|
} else {
|
||||||
t.concurrentRequests[connectionID] = 1
|
t.concurrentRequests[hashKey] = 1
|
||||||
concurrentRequests = 1
|
concurrentRequests = 1
|
||||||
}
|
}
|
||||||
if maxConcurrentRequests, ok := t.maxConcurrentRequests[connectionID]; (ok && maxConcurrentRequests < concurrentRequests) || !ok {
|
if maxConcurrentRequests, ok := t.maxConcurrentRequests[hashKey]; (ok && maxConcurrentRequests < concurrentRequests) || !ok {
|
||||||
t.maxConcurrentRequests[connectionID] = concurrentRequests
|
t.maxConcurrentRequests[hashKey] = concurrentRequests
|
||||||
t.maxConcurrentRequestsPerTunnel.WithLabelValues(connectionID).Set(float64(concurrentRequests))
|
t.maxConcurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Set(float64(concurrentRequests))
|
||||||
}
|
}
|
||||||
t.concurrentRequestsLock.Unlock()
|
t.concurrentRequestsLock.Unlock()
|
||||||
|
|
||||||
t.totalRequests.Inc()
|
t.totalRequests.Inc()
|
||||||
t.requestsPerTunnel.WithLabelValues(connectionID).Inc()
|
t.requests.WithLabelValues(metricLabelValues...).Inc()
|
||||||
t.concurrentRequestsPerTunnel.WithLabelValues(connectionID).Inc()
|
t.concurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TunnelMetrics) decrementConcurrentRequests(connectionID string) {
|
func (t *TunnelMetrics) decrementConcurrentRequests(metricLabelValues []string) {
|
||||||
t.concurrentRequestsLock.Lock()
|
t.concurrentRequestsLock.Lock()
|
||||||
if _, ok := t.concurrentRequests[connectionID]; ok {
|
hashKey := hashLabelValues(metricLabelValues)
|
||||||
t.concurrentRequests[connectionID] -= 1
|
if _, ok := t.concurrentRequests[hashKey]; ok {
|
||||||
|
t.concurrentRequests[hashKey] -= 1
|
||||||
}
|
}
|
||||||
t.concurrentRequestsLock.Unlock()
|
t.concurrentRequestsLock.Unlock()
|
||||||
|
|
||||||
t.concurrentRequestsPerTunnel.WithLabelValues(connectionID).Dec()
|
t.concurrentRequestsPerTunnel.WithLabelValues(metricLabelValues...).Dec()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TunnelMetrics) incrementResponses(connectionID, code string) {
|
func (t *TunnelMetrics) incrementResponses(metricLabelValues []string, responseCode int) {
|
||||||
t.responseByCode.WithLabelValues(code).Inc()
|
labelValues := append(metricLabelValues, strconv.Itoa(responseCode))
|
||||||
t.responseCodePerTunnel.WithLabelValues(connectionID, code).Inc()
|
t.reponses.WithLabelValues(labelValues...).Inc()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TunnelMetrics) registerServerLocation(connectionID, loc string) {
|
func (t *TunnelMetrics) registerServerLocation(metricLabelValues []string, loc string) {
|
||||||
t.locationLock.Lock()
|
t.locationLock.Lock()
|
||||||
defer t.locationLock.Unlock()
|
defer t.locationLock.Unlock()
|
||||||
if oldLoc, ok := t.oldServerLocations[connectionID]; ok && oldLoc == loc {
|
hashKey := hashLabelValues(metricLabelValues)
|
||||||
|
if oldLoc, ok := t.oldServerLocations[hashKey]; ok && oldLoc == loc {
|
||||||
return
|
return
|
||||||
} else if ok {
|
} else if ok {
|
||||||
t.serverLocations.WithLabelValues(connectionID, oldLoc).Dec()
|
labelValues := append(metricLabelValues, oldLoc)
|
||||||
|
t.serverLocations.WithLabelValues(labelValues...).Dec()
|
||||||
}
|
}
|
||||||
t.serverLocations.WithLabelValues(connectionID, loc).Inc()
|
labelValues := append(metricLabelValues, loc)
|
||||||
t.oldServerLocations[connectionID] = loc
|
t.serverLocations.WithLabelValues(labelValues...).Inc()
|
||||||
|
t.oldServerLocations[hashKey] = loc
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,33 +15,37 @@ func TestConcurrentRequestsSingleTunnel(t *testing.T) {
|
||||||
routines := 20
|
routines := 20
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(routines)
|
wg.Add(routines)
|
||||||
|
|
||||||
|
baseLabels := []string{"0"}
|
||||||
|
hashKey := hashLabelValues(baseLabels)
|
||||||
|
|
||||||
for i := 0; i < routines; i++ {
|
for i := 0; i < routines; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
m.incrementRequests("0")
|
m.incrementRequests(baseLabels)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
assert.Len(t, m.concurrentRequests, 1)
|
assert.Len(t, m.concurrentRequests, 1)
|
||||||
assert.Equal(t, uint64(routines), m.concurrentRequests["0"])
|
assert.Equal(t, uint64(routines), m.concurrentRequests[hashKey])
|
||||||
assert.Len(t, m.maxConcurrentRequests, 1)
|
assert.Len(t, m.maxConcurrentRequests, 1)
|
||||||
assert.Equal(t, uint64(routines), m.maxConcurrentRequests["0"])
|
assert.Equal(t, uint64(routines), m.maxConcurrentRequests[hashKey])
|
||||||
|
|
||||||
wg.Add(routines / 2)
|
wg.Add(routines / 2)
|
||||||
for i := 0; i < routines/2; i++ {
|
for i := 0; i < routines/2; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
m.decrementConcurrentRequests("0")
|
m.decrementConcurrentRequests(baseLabels)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
assert.Equal(t, uint64(routines-routines/2), m.concurrentRequests["0"])
|
assert.Equal(t, uint64(routines-routines/2), m.concurrentRequests[hashKey])
|
||||||
assert.Equal(t, uint64(routines), m.maxConcurrentRequests["0"])
|
assert.Equal(t, uint64(routines), m.maxConcurrentRequests[hashKey])
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcurrentRequestsMultiTunnel(t *testing.T) {
|
func TestConcurrentRequestsMultiTunnel(t *testing.T) {
|
||||||
m.concurrentRequests = make(map[string]uint64)
|
m.concurrentRequests = make(map[uint64]uint64)
|
||||||
m.maxConcurrentRequests = make(map[string]uint64)
|
m.maxConcurrentRequests = make(map[uint64]uint64)
|
||||||
tunnels := 20
|
tunnels := 20
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(tunnels)
|
wg.Add(tunnels)
|
||||||
|
@ -49,8 +53,8 @@ func TestConcurrentRequestsMultiTunnel(t *testing.T) {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
// if we have j < i, then tunnel 0 won't have a chance to call incrementRequests
|
// if we have j < i, then tunnel 0 won't have a chance to call incrementRequests
|
||||||
for j := 0; j < i+1; j++ {
|
for j := 0; j < i+1; j++ {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
m.incrementRequests(id)
|
m.incrementRequests(labels)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(i)
|
}(i)
|
||||||
|
@ -60,17 +64,18 @@ func TestConcurrentRequestsMultiTunnel(t *testing.T) {
|
||||||
assert.Len(t, m.concurrentRequests, tunnels)
|
assert.Len(t, m.concurrentRequests, tunnels)
|
||||||
assert.Len(t, m.maxConcurrentRequests, tunnels)
|
assert.Len(t, m.maxConcurrentRequests, tunnels)
|
||||||
for i := 0; i < tunnels; i++ {
|
for i := 0; i < tunnels; i++ {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
assert.Equal(t, uint64(i+1), m.concurrentRequests[id])
|
hashKey := hashLabelValues(labels)
|
||||||
assert.Equal(t, uint64(i+1), m.maxConcurrentRequests[id])
|
assert.Equal(t, uint64(i+1), m.concurrentRequests[hashKey])
|
||||||
|
assert.Equal(t, uint64(i+1), m.maxConcurrentRequests[hashKey])
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(tunnels)
|
wg.Add(tunnels)
|
||||||
for i := 0; i < tunnels; i++ {
|
for i := 0; i < tunnels; i++ {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
for j := 0; j < i+1; j++ {
|
for j := 0; j < i+1; j++ {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
m.decrementConcurrentRequests(id)
|
m.decrementConcurrentRequests(labels)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(i)
|
}(i)
|
||||||
|
@ -80,9 +85,10 @@ func TestConcurrentRequestsMultiTunnel(t *testing.T) {
|
||||||
assert.Len(t, m.concurrentRequests, tunnels)
|
assert.Len(t, m.concurrentRequests, tunnels)
|
||||||
assert.Len(t, m.maxConcurrentRequests, tunnels)
|
assert.Len(t, m.maxConcurrentRequests, tunnels)
|
||||||
for i := 0; i < tunnels; i++ {
|
for i := 0; i < tunnels; i++ {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
assert.Equal(t, uint64(0), m.concurrentRequests[id])
|
hashKey := hashLabelValues(labels)
|
||||||
assert.Equal(t, uint64(i+1), m.maxConcurrentRequests[id])
|
assert.Equal(t, uint64(0), m.concurrentRequests[hashKey])
|
||||||
|
assert.Equal(t, uint64(i+1), m.maxConcurrentRequests[hashKey])
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -93,29 +99,31 @@ func TestRegisterServerLocation(t *testing.T) {
|
||||||
wg.Add(tunnels)
|
wg.Add(tunnels)
|
||||||
for i := 0; i < tunnels; i++ {
|
for i := 0; i < tunnels; i++ {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
m.registerServerLocation(id, "LHR")
|
m.registerServerLocation(labels, "LHR")
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
for i := 0; i < tunnels; i++ {
|
for i := 0; i < tunnels; i++ {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
assert.Equal(t, "LHR", m.oldServerLocations[id])
|
hashKey := hashLabelValues(labels)
|
||||||
|
assert.Equal(t, "LHR", m.oldServerLocations[hashKey])
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(tunnels)
|
wg.Add(tunnels)
|
||||||
for i := 0; i < tunnels; i++ {
|
for i := 0; i < tunnels; i++ {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
m.registerServerLocation(id, "AUS")
|
m.registerServerLocation(labels, "AUS")
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
for i := 0; i < tunnels; i++ {
|
for i := 0; i < tunnels; i++ {
|
||||||
id := strconv.Itoa(i)
|
labels := []string{strconv.Itoa(i)}
|
||||||
assert.Equal(t, "AUS", m.oldServerLocations[id])
|
hashKey := hashLabelValues(labels)
|
||||||
|
assert.Equal(t, "AUS", m.oldServerLocations[hashKey])
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dialTimeout = 15 * time.Second
|
dialTimeout = 15 * time.Second
|
||||||
lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
|
lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
|
||||||
TagHeaderNamePrefix = "Cf-Warp-Tag-"
|
TagHeaderNamePrefix = "Cf-Warp-Tag-"
|
||||||
DuplicateConnectionError = "EDUPCONN"
|
DuplicateConnectionError = "EDUPCONN"
|
||||||
|
@ -382,7 +382,7 @@ func LogServerInfo(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Infof("Connected to %s", serverInfo.LocationName)
|
logger.Infof("Connected to %s", serverInfo.LocationName)
|
||||||
metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName)
|
// metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func H2RequestHeadersToH1Request(h2 []h2mux.Header, h1 *http.Request) error {
|
func H2RequestHeadersToH1Request(h2 []h2mux.Header, h1 *http.Request) error {
|
||||||
|
@ -433,6 +433,10 @@ type TunnelHandler struct {
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
tags []tunnelpogs.Tag
|
tags []tunnelpogs.Tag
|
||||||
metrics *TunnelMetrics
|
metrics *TunnelMetrics
|
||||||
|
|
||||||
|
baseMetricsLabelKeys []string
|
||||||
|
baseMetricsLabelValues []string
|
||||||
|
|
||||||
// connectionID is only used by metrics, and prometheus requires labels to be string
|
// connectionID is only used by metrics, and prometheus requires labels to be string
|
||||||
connectionID string
|
connectionID string
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
@ -500,8 +504,12 @@ func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *TunnelHandler) getCombinedMetricsLabels(connectionID string) []string {
|
||||||
|
return append(h.baseMetricsLabelValues, connectionID)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
|
func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
|
||||||
h.metrics.incrementRequests(h.connectionID)
|
h.metrics.incrementRequests(h.getCombinedMetricsLabels(h.connectionID))
|
||||||
req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
|
req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.WithError(err).Panic("Unexpected error from http.NewRequest")
|
h.logger.WithError(err).Panic("Unexpected error from http.NewRequest")
|
||||||
|
@ -516,6 +524,7 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
|
||||||
h.logRequest(req, cfRay, lbProbe)
|
h.logRequest(req, cfRay, lbProbe)
|
||||||
if websocket.IsWebSocketUpgrade(req) {
|
if websocket.IsWebSocketUpgrade(req) {
|
||||||
conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
|
conn, response, err := websocket.ClientConnect(req, h.tlsConfig)
|
||||||
|
h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logError(stream, err)
|
h.logError(stream, err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -524,22 +533,22 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
|
||||||
// Copy to/from stream to the undelying connection. Use the underlying
|
// Copy to/from stream to the undelying connection. Use the underlying
|
||||||
// connection because cloudflared doesn't operate on the message themselves
|
// connection because cloudflared doesn't operate on the message themselves
|
||||||
websocket.Stream(conn.UnderlyingConn(), stream)
|
websocket.Stream(conn.UnderlyingConn(), stream)
|
||||||
h.metrics.incrementResponses(h.connectionID, "200")
|
|
||||||
h.logResponse(response, cfRay, lbProbe)
|
h.logResponse(response, cfRay, lbProbe)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
response, err := h.httpClient.RoundTrip(req)
|
response, err := h.httpClient.RoundTrip(req)
|
||||||
|
h.metrics.incrementResponses(h.getCombinedMetricsLabels(h.connectionID), response.StatusCode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logError(stream, err)
|
h.logError(stream, err)
|
||||||
} else {
|
} else {
|
||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
stream.WriteHeaders(H1ResponseToH2Response(response))
|
stream.WriteHeaders(H1ResponseToH2Response(response))
|
||||||
io.Copy(stream, response.Body)
|
io.Copy(stream, response.Body)
|
||||||
h.metrics.incrementResponses(h.connectionID, "200")
|
|
||||||
h.logResponse(response, cfRay, lbProbe)
|
h.logResponse(response, cfRay, lbProbe)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.metrics.decrementConcurrentRequests(h.connectionID)
|
h.metrics.decrementConcurrentRequests(h.getCombinedMetricsLabels(h.connectionID))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -547,7 +556,7 @@ func (h *TunnelHandler) logError(stream *h2mux.MuxedStream, err error) {
|
||||||
h.logger.WithError(err).Error("HTTP request error")
|
h.logger.WithError(err).Error("HTTP request error")
|
||||||
stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "502"}})
|
stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "502"}})
|
||||||
stream.Write([]byte("502 Bad Gateway"))
|
stream.Write([]byte("502 Bad Gateway"))
|
||||||
h.metrics.incrementResponses(h.connectionID, "502")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *TunnelHandler) logRequest(req *http.Request, cfRay string, lbProbe bool) {
|
func (h *TunnelHandler) logRequest(req *http.Request, cfRay string, lbProbe bool) {
|
||||||
|
@ -573,7 +582,8 @@ func (h *TunnelHandler) logResponse(r *http.Response, cfRay string, lbProbe bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *TunnelHandler) UpdateMetrics(connectionID string) {
|
func (h *TunnelHandler) UpdateMetrics(connectionID string) {
|
||||||
h.metrics.updateMuxerMetrics(connectionID, h.muxer.Metrics())
|
// why only updateMuxerMetrics
|
||||||
|
h.metrics.updateMuxerMetrics(h.getCombinedMetricsLabels(h.connectionID), h.muxer.Metrics())
|
||||||
}
|
}
|
||||||
|
|
||||||
func uint8ToString(input uint8) string {
|
func uint8ToString(input uint8) string {
|
||||||
|
|
Loading…
Reference in New Issue