diff --git a/connection/connection.go b/connection/connection.go index cf17d506..6140e3db 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -142,6 +142,7 @@ type TCPRequest struct { LBProbe bool FlowID string CfTraceID string + ConnIndex uint8 } // ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has diff --git a/connection/h2mux.go b/connection/h2mux.go index 1cc247ad..75170cf0 100644 --- a/connection/h2mux.go +++ b/connection/h2mux.go @@ -196,7 +196,7 @@ func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error { return err } - err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, h.log), sourceConnectionType == TypeWebsocket) + err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, h.connIndex, h.log), sourceConnectionType == TypeWebsocket) if err != nil { respWriter.WriteErrorResponse() } diff --git a/connection/http2.go b/connection/http2.go index 5fa26bcb..b895b7db 100644 --- a/connection/http2.go +++ b/connection/http2.go @@ -129,7 +129,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) { case TypeWebsocket, TypeHTTP: stripWebsocketUpgradeHeader(r) // Check for tracing on request - tr := tracing.NewTracedHTTPRequest(r, c.log) + tr := tracing.NewTracedHTTPRequest(r, c.connIndex, c.log) if err := originProxy.ProxyHTTP(respWriter, tr, connType == TypeWebsocket); err != nil { requestErr = fmt.Errorf("Failed to proxy HTTP: %w", err) } @@ -147,6 +147,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) { CFRay: FindCfRayHeader(r), LBProbe: IsLBProbeRequest(r), CfTraceID: r.Header.Get(tracing.TracerContextName), + ConnIndex: c.connIndex, }) default: diff --git a/connection/quic.go b/connection/quic.go index c0db84d8..3bd49b11 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -60,6 +60,7 @@ type QUICConnection struct { packetRouter *ingress.PacketRouter controlStreamHandler ControlStreamHandler connOptions *tunnelpogs.ConnectionOptions + connIndex uint8 } // NewQUICConnection returns a new instance of QUICConnection. @@ -106,6 +107,7 @@ func NewQUICConnection( packetRouter: packetRouter, controlStreamHandler: controlStreamHandler, connOptions: connOptions, + connIndex: connIndex, }, nil } @@ -258,7 +260,7 @@ func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.R switch request.Type { case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket: - tracedReq, err := buildHTTPRequest(ctx, request, stream, q.logger) + tracedReq, err := buildHTTPRequest(ctx, request, stream, q.connIndex, q.logger) if err != nil { return err, false } @@ -272,6 +274,7 @@ func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.R Dest: request.Dest, FlowID: metadata[QUICMetadataFlowID], CfTraceID: metadata[tracing.TracerContextName], + ConnIndex: q.connIndex, }), rwa.connectResponseSent default: return errors.Errorf("unsupported error type: %s", request.Type), false @@ -435,6 +438,7 @@ func buildHTTPRequest( ctx context.Context, connectRequest *quicpogs.ConnectRequest, body io.ReadCloser, + connIndex uint8, log *zerolog.Logger, ) (*tracing.TracedHTTPRequest, error) { metadata := connectRequest.MetadataMap() @@ -478,7 +482,7 @@ func buildHTTPRequest( stripWebsocketUpgradeHeader(req) // Check for tracing on request - tracedReq := tracing.NewTracedHTTPRequest(req, log) + tracedReq := tracing.NewTracedHTTPRequest(req, connIndex, log) return tracedReq, err } diff --git a/connection/quic_test.go b/connection/quic_test.go index db330190..d6777506 100644 --- a/connection/quic_test.go +++ b/connection/quic_test.go @@ -485,7 +485,7 @@ func TestBuildHTTPRequest(t *testing.T) { for _, test := range tests { test := test // capture range variable t.Run(test.name, func(t *testing.T) { - req, err := buildHTTPRequest(context.Background(), test.connectRequest, test.body, &log) + req, err := buildHTTPRequest(context.Background(), test.connectRequest, test.body, 0, &log) assert.NoError(t, err) test.req = test.req.WithContext(req.Context()) assert.Equal(t, test.req, req.Request) diff --git a/orchestration/orchestrator_test.go b/orchestration/orchestrator_test.go index d3e1ee62..f6a9f2bf 100644 --- a/orchestration/orchestrator_test.go +++ b/orchestration/orchestrator_test.go @@ -358,7 +358,7 @@ func proxyHTTP(originProxy connection.OriginProxy, hostname string) (*http.Respo return nil, err } - err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), false) + err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, 0, &log), false) if err != nil { return nil, err } @@ -608,7 +608,7 @@ func TestPersistentConnection(t *testing.T) { respWriter, err := connection.NewHTTP2RespWriter(req, wsRespReadWriter, connection.TypeWebsocket, &log) require.NoError(t, err) - err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), true) + err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, 0, &log), true) require.NoError(t, err) }() diff --git a/proxy/proxy.go b/proxy/proxy.go index 6c404109..5a3d2e62 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -28,6 +28,7 @@ const ( LogFieldRule = "ingressRule" LogFieldOriginService = "originService" LogFieldFlowID = "flowID" + LogFieldConnIndex = "connIndex" trailerHeaderName = "Trailer" ) @@ -94,9 +95,10 @@ func (p *Proxy) ProxyHTTP( trace.WithAttributes(attribute.String("req-host", req.Host))) rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path) logFields := logFields{ - cfRay: cfRay, - lbProbe: lbProbe, - rule: ruleNum, + cfRay: cfRay, + lbProbe: lbProbe, + rule: ruleNum, + connIndex: tr.ConnIndex, } p.logRequest(req, logFields) ruleSpan.SetAttributes(attribute.Int("rule-num", ruleNum)) @@ -163,14 +165,14 @@ func (p *Proxy) ProxyTCP( tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log) - p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream started") + p.log.Debug().Str(LogFieldFlowID, req.FlowID).Uint8(LogFieldConnIndex, req.ConnIndex).Msg("tcp proxy stream started") if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil { p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting) return err } - p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream finished successfully") + p.log.Debug().Str(LogFieldFlowID, req.FlowID).Uint8(LogFieldConnIndex, req.ConnIndex).Msg("tcp proxy stream finished successfully") return nil } @@ -320,10 +322,11 @@ func (p *Proxy) appendTagHeaders(r *http.Request) { } type logFields struct { - cfRay string - lbProbe bool - rule interface{} - flowID string + cfRay string + lbProbe bool + rule interface{} + flowID string + connIndex uint8 } func copyTrailers(w connection.ResponseWriter, response *http.Response) { @@ -348,6 +351,7 @@ func (p *Proxy) logRequest(r *http.Request, fields logFields) { Str("host", r.Host). Str("path", r.URL.Path). Interface("rule", fields.rule). + Uint8(LogFieldConnIndex, fields.connIndex). Msg("Inbound request") if contentLen := r.ContentLength; contentLen == -1 { @@ -360,18 +364,18 @@ func (p *Proxy) logRequest(r *http.Request, fields logFields) { func (p *Proxy) logOriginResponse(resp *http.Response, fields logFields) { responseByCode.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc() if fields.cfRay != "" { - p.log.Debug().Msgf("CF-RAY: %s Status: %s served by ingress %d", fields.cfRay, resp.Status, fields.rule) + p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Status: %s served by ingress %d", fields.cfRay, resp.Status, fields.rule) } else if fields.lbProbe { - p.log.Debug().Msgf("Response to Load Balancer health check %s", resp.Status) + p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("Response to Load Balancer health check %s", resp.Status) } else { - p.log.Debug().Msgf("Status: %s served by ingress %v", resp.Status, fields.rule) + p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("Status: %s served by ingress %v", resp.Status, fields.rule) } - p.log.Debug().Msgf("CF-RAY: %s Response Headers %+v", fields.cfRay, resp.Header) + p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Response Headers %+v", fields.cfRay, resp.Header) if contentLen := resp.ContentLength; contentLen == -1 { - p.log.Debug().Msgf("CF-RAY: %s Response content length unknown", fields.cfRay) + p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Response content length unknown", fields.cfRay) } else { - p.log.Debug().Msgf("CF-RAY: %s Response content length %d", fields.cfRay, contentLen) + p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Response content length %d", fields.cfRay, contentLen) } } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 58e541d5..437d3bd2 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -170,7 +170,7 @@ func testProxyHTTP(proxy connection.OriginProxy) func(t *testing.T) { require.NoError(t, err) log := zerolog.Nop() - err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false) + err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, 0, &log), false) require.NoError(t, err) for _, tag := range testTags { assert.Equal(t, tag.Value, req.Header.Get(TagHeaderNamePrefix+tag.Name)) @@ -198,7 +198,7 @@ func testProxyWebsocket(proxy connection.OriginProxy) func(t *testing.T) { errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { log := zerolog.Nop() - err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), true) + err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, 0, &log), true) require.NoError(t, err) require.Equal(t, http.StatusSwitchingProtocols, responseWriter.Code) @@ -260,7 +260,7 @@ func testProxySSE(proxy connection.OriginProxy) func(t *testing.T) { go func() { defer wg.Done() log := zerolog.Nop() - err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false) + err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, 0, &log), false) require.Equal(t, err.Error(), "context canceled") require.Equal(t, http.StatusOK, responseWriter.Code) @@ -367,7 +367,7 @@ func runIngressTestScenarios(t *testing.T, unvalidatedIngress []config.Unvalidat req, err := http.NewRequest(http.MethodGet, test.url, nil) require.NoError(t, err) - err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false) + err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, 0, &log), false) require.NoError(t, err) assert.Equal(t, test.expectedStatus, responseWriter.Code) @@ -414,7 +414,7 @@ func TestProxyError(t *testing.T) { req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1", nil) assert.NoError(t, err) - assert.Error(t, proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)) + assert.Error(t, proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, 0, &log), false)) } type replayer struct { @@ -695,7 +695,7 @@ func TestConnections(t *testing.T) { err = proxy.ProxyTCP(ctx, rwa, &connection.TCPRequest{Dest: dest}) } else { log := zerolog.Nop() - err = proxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), test.args.connectionType == connection.TypeWebsocket) + err = proxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, 0, &log), test.args.connectionType == connection.TypeWebsocket) } cancel() diff --git a/tracing/tracing.go b/tracing/tracing.go index 57744120..f7bc59fc 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -73,15 +73,16 @@ func Init(version string) { type TracedHTTPRequest struct { *http.Request *cfdTracer + ConnIndex uint8 // The connection index used to proxy the request } // NewTracedHTTPRequest creates a new tracer for the current HTTP request context. -func NewTracedHTTPRequest(req *http.Request, log *zerolog.Logger) *TracedHTTPRequest { +func NewTracedHTTPRequest(req *http.Request, connIndex uint8, log *zerolog.Logger) *TracedHTTPRequest { ctx, exists := extractTrace(req) if !exists { - return &TracedHTTPRequest{req, &cfdTracer{trace.NewNoopTracerProvider(), &NoopOtlpClient{}, log}} + return &TracedHTTPRequest{req, &cfdTracer{trace.NewNoopTracerProvider(), &NoopOtlpClient{}, log}, connIndex} } - return &TracedHTTPRequest{req.WithContext(ctx), newCfdTracer(ctx, log)} + return &TracedHTTPRequest{req.WithContext(ctx), newCfdTracer(ctx, log), connIndex} } func (tr *TracedHTTPRequest) ToTracedContext() *TracedContext { diff --git a/tracing/tracing_test.go b/tracing/tracing_test.go index 5750056e..5c478ed6 100644 --- a/tracing/tracing_test.go +++ b/tracing/tracing_test.go @@ -17,7 +17,7 @@ func TestNewCfTracer(t *testing.T) { log := zerolog.Nop() req := httptest.NewRequest("GET", "http://localhost", nil) req.Header.Add(TracerContextName, "14cb070dde8e51fc5ae8514e69ba42ca:b38f1bf5eae406f3:0:1") - tr := NewTracedHTTPRequest(req, &log) + tr := NewTracedHTTPRequest(req, 0, &log) assert.NotNil(t, tr) assert.IsType(t, tracesdk.NewTracerProvider(), tr.TracerProvider) assert.IsType(t, &InMemoryOtlpClient{}, tr.exporter) @@ -28,7 +28,7 @@ func TestNewCfTracerMultiple(t *testing.T) { req := httptest.NewRequest("GET", "http://localhost", nil) req.Header.Add(TracerContextName, "1241ce3ecdefc68854e8514e69ba42ca:b38f1bf5eae406f3:0:1") req.Header.Add(TracerContextName, "14cb070dde8e51fc5ae8514e69ba42ca:b38f1bf5eae406f3:0:1") - tr := NewTracedHTTPRequest(req, &log) + tr := NewTracedHTTPRequest(req, 0, &log) assert.NotNil(t, tr) assert.IsType(t, tracesdk.NewTracerProvider(), tr.TracerProvider) assert.IsType(t, &InMemoryOtlpClient{}, tr.exporter) @@ -38,7 +38,7 @@ func TestNewCfTracerNilHeader(t *testing.T) { log := zerolog.Nop() req := httptest.NewRequest("GET", "http://localhost", nil) req.Header[http.CanonicalHeaderKey(TracerContextName)] = nil - tr := NewTracedHTTPRequest(req, &log) + tr := NewTracedHTTPRequest(req, 0, &log) assert.NotNil(t, tr) assert.IsType(t, trace.NewNoopTracerProvider(), tr.TracerProvider) assert.IsType(t, &NoopOtlpClient{}, tr.exporter) @@ -49,7 +49,7 @@ func TestNewCfTracerInvalidHeaders(t *testing.T) { req := httptest.NewRequest("GET", "http://localhost", nil) for _, test := range [][]string{nil, {""}} { req.Header[http.CanonicalHeaderKey(TracerContextName)] = test - tr := NewTracedHTTPRequest(req, &log) + tr := NewTracedHTTPRequest(req, 0, &log) assert.NotNil(t, tr) assert.IsType(t, trace.NewNoopTracerProvider(), tr.TracerProvider) assert.IsType(t, &NoopOtlpClient{}, tr.exporter) @@ -60,7 +60,7 @@ func TestAddingSpansWithNilMap(t *testing.T) { log := zerolog.Nop() req := httptest.NewRequest("GET", "http://localhost", nil) req.Header.Add(TracerContextName, "14cb070dde8e51fc5ae8514e69ba42ca:b38f1bf5eae406f3:0:1") - tr := NewTracedHTTPRequest(req, &log) + tr := NewTracedHTTPRequest(req, 0, &log) exporter := tr.exporter.(*InMemoryOtlpClient)