TUN-8224: Count and collect metrics on stream connect successes/errors

This commit is contained in:
chungthuang 2024-02-07 14:32:39 +00:00
parent 98e043d17d
commit 638203f9f1
2 changed files with 45 additions and 14 deletions

View File

@ -59,6 +59,23 @@ var (
Help: "Total count of TCP sessions that have been proxied to any origin", Help: "Total count of TCP sessions that have been proxied to any origin",
}, },
) )
connectLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: connection.MetricsNamespace,
Subsystem: "proxy",
Name: "connect_latency",
Help: "Time it takes to establish and acknowledge connections in milliseconds",
Buckets: []float64{1, 10, 25, 50, 100, 500, 1000, 5000},
},
)
connectStreamErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: connection.MetricsNamespace,
Subsystem: "proxy",
Name: "connect_streams_errors",
Help: "Total count of failure to establish and acknowledge connections",
},
)
) )
func init() { func init() {
@ -69,6 +86,8 @@ func init() {
requestErrors, requestErrors,
activeTCPSessions, activeTCPSessions,
totalTCPSessions, totalTCPSessions,
connectLatency,
connectStreamErrors,
) )
} }

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -139,7 +140,11 @@ func (p *Proxy) ProxyHTTP(
return fmt.Errorf("response writer is not a flusher") return fmt.Errorf("response writer is not a flusher")
} }
rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req) rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req)
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy); err != nil { connectedLogger := p.log.Debug().
Int(management.EventTypeKey, int(management.HTTP)).
Str(LogFieldDestAddr, dest).
Uint8(LogFieldConnIndex, tr.ConnIndex)
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, connectedLogger); err != nil {
rule, srv := ruleField(p.ingressRules, ruleNum) rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, "", rule, srv) p.logRequestError(err, cfRay, "", rule, srv)
return err return err
@ -173,24 +178,22 @@ func (p *Proxy) ProxyTCP(
tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log) tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log)
p.log.Debug(). debugLogger := func() *zerolog.Event {
Int(management.EventTypeKey, int(management.TCP)). return p.log.Debug().
Str(LogFieldFlowID, req.FlowID). Int(management.EventTypeKey, int(management.TCP)).
Str(LogFieldDestAddr, req.Dest). Str(LogFieldFlowID, req.FlowID).
Uint8(LogFieldConnIndex, req.ConnIndex). Str(LogFieldDestAddr, req.Dest).
Msg("tcp proxy stream started") Uint8(LogFieldConnIndex, req.ConnIndex)
}
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil { debugLogger().Msg("tcp proxy stream started")
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, debugLogger()); err != nil {
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting) p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
return err return err
} }
p.log.Debug(). debugLogger().Msg("tcp proxy stream finished successfully")
Int(management.EventTypeKey, int(management.TCP)).
Str(LogFieldFlowID, req.FlowID).
Str(LogFieldDestAddr, req.Dest).
Uint8(LogFieldConnIndex, req.ConnIndex).
Msg("tcp proxy stream finished successfully")
return nil return nil
} }
@ -294,16 +297,21 @@ func (p *Proxy) proxyHTTPRequest(
// proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented // proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented
// ingress rule. // ingress rule.
// connectedLogger is used to log when the connection is acknowledged
func (p *Proxy) proxyStream( func (p *Proxy) proxyStream(
tr *tracing.TracedContext, tr *tracing.TracedContext,
rwa connection.ReadWriteAcker, rwa connection.ReadWriteAcker,
dest string, dest string,
connectionProxy ingress.StreamBasedOriginProxy, connectionProxy ingress.StreamBasedOriginProxy,
connectedLogger *zerolog.Event,
) error { ) error {
ctx := tr.Context ctx := tr.Context
_, connectSpan := tr.Tracer().Start(ctx, "stream-connect") _, connectSpan := tr.Tracer().Start(ctx, "stream-connect")
start := time.Now()
originConn, err := connectionProxy.EstablishConnection(ctx, dest) originConn, err := connectionProxy.EstablishConnection(ctx, dest)
if err != nil { if err != nil {
connectStreamErrors.Inc()
tracing.EndWithErrorStatus(connectSpan, err) tracing.EndWithErrorStatus(connectSpan, err)
return err return err
} }
@ -313,9 +321,13 @@ func (p *Proxy) proxyStream(
encodedSpans := tr.GetSpans() encodedSpans := tr.GetSpans()
if err := rwa.AckConnection(encodedSpans); err != nil { if err := rwa.AckConnection(encodedSpans); err != nil {
connectStreamErrors.Inc()
return err return err
} }
connectLatency.Observe(float64(time.Since(start).Milliseconds()))
connectedLogger.Msg("proxy stream established")
originConn.Stream(ctx, rwa, p.log) originConn.Stream(ctx, rwa, p.log)
return nil return nil
} }