diff --git a/proxy/metrics.go b/proxy/metrics.go index 33a51721..54e09ad2 100644 --- a/proxy/metrics.go +++ b/proxy/metrics.go @@ -59,6 +59,23 @@ var ( Help: "Total count of TCP sessions that have been proxied to any origin", }, ) + connectLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: connection.MetricsNamespace, + Subsystem: "proxy", + Name: "connect_latency", + Help: "Time it takes to establish and acknowledge connections in milliseconds", + Buckets: []float64{1, 10, 25, 50, 100, 500, 1000, 5000}, + }, + ) + connectStreamErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: connection.MetricsNamespace, + Subsystem: "proxy", + Name: "connect_streams_errors", + Help: "Total count of failure to establish and acknowledge connections", + }, + ) ) func init() { @@ -69,6 +86,8 @@ func init() { requestErrors, activeTCPSessions, totalTCPSessions, + connectLatency, + connectStreamErrors, ) } diff --git a/proxy/proxy.go b/proxy/proxy.go index 38989a5f..426bca2c 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "strconv" + "time" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -139,7 +140,11 @@ func (p *Proxy) ProxyHTTP( return fmt.Errorf("response writer is not a flusher") } rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req) - if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy); err != nil { + connectedLogger := p.log.Debug(). + Int(management.EventTypeKey, int(management.HTTP)). + Str(LogFieldDestAddr, dest). + Uint8(LogFieldConnIndex, tr.ConnIndex) + if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, connectedLogger); err != nil { rule, srv := ruleField(p.ingressRules, ruleNum) p.logRequestError(err, cfRay, "", rule, srv) return err @@ -173,24 +178,22 @@ func (p *Proxy) ProxyTCP( tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log) - p.log.Debug(). - Int(management.EventTypeKey, int(management.TCP)). - Str(LogFieldFlowID, req.FlowID). - Str(LogFieldDestAddr, req.Dest). - Uint8(LogFieldConnIndex, req.ConnIndex). - Msg("tcp proxy stream started") + debugLogger := func() *zerolog.Event { + return p.log.Debug(). + Int(management.EventTypeKey, int(management.TCP)). + Str(LogFieldFlowID, req.FlowID). + Str(LogFieldDestAddr, req.Dest). + Uint8(LogFieldConnIndex, req.ConnIndex) + } - if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil { + debugLogger().Msg("tcp proxy stream started") + + if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, debugLogger()); err != nil { p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting) return err } - p.log.Debug(). - Int(management.EventTypeKey, int(management.TCP)). - Str(LogFieldFlowID, req.FlowID). - Str(LogFieldDestAddr, req.Dest). - Uint8(LogFieldConnIndex, req.ConnIndex). - Msg("tcp proxy stream finished successfully") + debugLogger().Msg("tcp proxy stream finished successfully") return nil } @@ -294,16 +297,21 @@ func (p *Proxy) proxyHTTPRequest( // proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented // ingress rule. +// connectedLogger is used to log when the connection is acknowledged func (p *Proxy) proxyStream( tr *tracing.TracedContext, rwa connection.ReadWriteAcker, dest string, connectionProxy ingress.StreamBasedOriginProxy, + connectedLogger *zerolog.Event, ) error { ctx := tr.Context _, connectSpan := tr.Tracer().Start(ctx, "stream-connect") + + start := time.Now() originConn, err := connectionProxy.EstablishConnection(ctx, dest) if err != nil { + connectStreamErrors.Inc() tracing.EndWithErrorStatus(connectSpan, err) return err } @@ -313,9 +321,13 @@ func (p *Proxy) proxyStream( encodedSpans := tr.GetSpans() if err := rwa.AckConnection(encodedSpans); err != nil { + connectStreamErrors.Inc() return err } + connectLatency.Observe(float64(time.Since(start).Milliseconds())) + connectedLogger.Msg("proxy stream established") + originConn.Stream(ctx, rwa, p.log) return nil }