diff --git a/connection/connection.go b/connection/connection.go index aa66e494..bcc24b64 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -132,6 +132,7 @@ type TCPRequest struct { Dest string CFRay string LBProbe bool + FlowID string } // ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has diff --git a/connection/quic.go b/connection/quic.go index ef46a658..2d0684bc 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -31,6 +31,8 @@ const ( HTTPMethodKey = "HttpMethod" // HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP. HTTPHostKey = "HttpHost" + + QUICMetadataFlowID = "FlowID" ) // QUICConnection represents the type that facilitates Proxying via QUIC streams. @@ -180,6 +182,7 @@ func (q *QUICConnection) handleDataStream(stream *quicpogs.RequestServerStream) if err != nil { return err } + switch connectRequest.Type { case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket: tracedReq, err := buildHTTPRequest(connectRequest, stream) @@ -191,7 +194,9 @@ func (q *QUICConnection) handleDataStream(stream *quicpogs.RequestServerStream) return originProxy.ProxyHTTP(w, tracedReq, connectRequest.Type == quicpogs.ConnectionTypeWebsocket) case quicpogs.ConnectionTypeTCP: rwa := &streamReadWriteAcker{stream} - return originProxy.ProxyTCP(context.Background(), rwa, &TCPRequest{Dest: connectRequest.Dest}) + metadata := connectRequest.MetadataMap() + return originProxy.ProxyTCP(context.Background(), rwa, &TCPRequest{Dest: connectRequest.Dest, + FlowID: metadata[QUICMetadataFlowID]}) } return nil } diff --git a/proxy/proxy.go b/proxy/proxy.go index 661ab363..5db9ab4c 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -28,6 +28,7 @@ const ( LogFieldCFRay = "cfRay" LogFieldRule = "ingressRule" LogFieldOriginService = "originService" + LogFieldFlowID = "flowID" ) // Proxy represents a means to Proxy between cloudflared and the origin services. @@ -96,7 +97,7 @@ func (p *Proxy) ProxyHTTP( logFields, ); err != nil { rule, srv := ruleField(p.ingressRules, ruleNum) - p.logRequestError(err, cfRay, rule, srv) + p.logRequestError(err, cfRay, "", rule, srv) return err } return nil @@ -109,7 +110,7 @@ func (p *Proxy) ProxyHTTP( rws := connection.NewHTTPResponseReadWriterAcker(w, req) if err := p.proxyStream(req.Context(), rws, dest, originProxy, logFields); err != nil { rule, srv := ruleField(p.ingressRules, ruleNum) - p.logRequestError(err, cfRay, rule, srv) + p.logRequestError(err, cfRay, "", rule, srv) return err } return nil @@ -140,13 +141,17 @@ func (p *Proxy) ProxyTCP( cfRay: req.CFRay, lbProbe: req.LBProbe, rule: ingress.ServiceWarpRouting, + flowID: req.FlowID, } + p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream started") if err := p.proxyStream(serveCtx, rwa, req.Dest, p.warpRouting.Proxy, logFields); err != nil { - p.logRequestError(err, req.CFRay, "", ingress.ServiceWarpRouting) + p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting) return err } + p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream finished successfully") + return nil } @@ -317,6 +322,7 @@ type logFields struct { cfRay string lbProbe bool rule interface{} + flowID string } func (p *Proxy) logRequest(r *http.Request, fields logFields) { @@ -360,12 +366,15 @@ func (p *Proxy) logOriginResponse(resp *http.Response, fields logFields) { } } -func (p *Proxy) logRequestError(err error, cfRay string, rule, service string) { +func (p *Proxy) logRequestError(err error, cfRay string, flowID string, rule, service string) { requestErrors.Inc() log := p.log.Error().Err(err) if cfRay != "" { log = log.Str(LogFieldCFRay, cfRay) } + if flowID != "" { + log = log.Str(LogFieldFlowID, flowID) + } if rule != "" { log = log.Str(LogFieldRule, rule) }