TUN-5990: Add otlp span export to response header
This commit is contained in:
parent
8a07a900fd
commit
f81b0ee9e8
|
@ -231,6 +231,12 @@ func (rp *http2RespWriter) WriteRespHeaders(status int, header http.Header) erro
|
||||||
dest[name] = values
|
dest[name] = values
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if h2name == tracing.IntCloudflaredTracingHeader {
|
||||||
|
// Add cf-int-cloudflared-tracing header outside of serialized userHeaders
|
||||||
|
rp.w.Header()[tracing.CanonicalCloudflaredTracingHeader] = values
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if !IsControlResponseHeader(h2name) || IsWebsocketClientHeader(h2name) {
|
if !IsControlResponseHeader(h2name) || IsWebsocketClientHeader(h2name) {
|
||||||
// User headers, on the other hand, must all be serialized so that
|
// User headers, on the other hand, must all be serialized so that
|
||||||
// HTTP/2 header validation won't be applied to HTTP/1 header values
|
// HTTP/2 header validation won't be applied to HTTP/1 header values
|
||||||
|
|
|
@ -87,7 +87,7 @@ func (p *Proxy) ProxyHTTP(
|
||||||
case ingress.HTTPOriginProxy:
|
case ingress.HTTPOriginProxy:
|
||||||
if err := p.proxyHTTPRequest(
|
if err := p.proxyHTTPRequest(
|
||||||
w,
|
w,
|
||||||
req,
|
tr,
|
||||||
originProxy,
|
originProxy,
|
||||||
isWebsocket,
|
isWebsocket,
|
||||||
rule.Config.DisableChunkedEncoding,
|
rule.Config.DisableChunkedEncoding,
|
||||||
|
@ -159,15 +159,15 @@ func ruleField(ing ingress.Ingress, ruleNum int) (ruleID string, srv string) {
|
||||||
// ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
|
// ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
|
||||||
func (p *Proxy) proxyHTTPRequest(
|
func (p *Proxy) proxyHTTPRequest(
|
||||||
w connection.ResponseWriter,
|
w connection.ResponseWriter,
|
||||||
req *http.Request,
|
tr *tracing.TracedRequest,
|
||||||
httpService ingress.HTTPOriginProxy,
|
httpService ingress.HTTPOriginProxy,
|
||||||
isWebsocket bool,
|
isWebsocket bool,
|
||||||
disableChunkedEncoding bool,
|
disableChunkedEncoding bool,
|
||||||
fields logFields,
|
fields logFields,
|
||||||
) error {
|
) error {
|
||||||
roundTripReq := req
|
roundTripReq := tr.Request
|
||||||
if isWebsocket {
|
if isWebsocket {
|
||||||
roundTripReq = req.Clone(req.Context())
|
roundTripReq = tr.Request.Clone(tr.Request.Context())
|
||||||
roundTripReq.Header.Set("Connection", "Upgrade")
|
roundTripReq.Header.Set("Connection", "Upgrade")
|
||||||
roundTripReq.Header.Set("Upgrade", "websocket")
|
roundTripReq.Header.Set("Upgrade", "websocket")
|
||||||
roundTripReq.Header.Set("Sec-Websocket-Version", "13")
|
roundTripReq.Header.Set("Sec-Websocket-Version", "13")
|
||||||
|
@ -177,7 +177,7 @@ func (p *Proxy) proxyHTTPRequest(
|
||||||
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
|
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
|
||||||
if disableChunkedEncoding {
|
if disableChunkedEncoding {
|
||||||
roundTripReq.TransferEncoding = []string{"gzip", "deflate"}
|
roundTripReq.TransferEncoding = []string{"gzip", "deflate"}
|
||||||
cLength, err := strconv.Atoi(req.Header.Get("Content-Length"))
|
cLength, err := strconv.Atoi(tr.Request.Header.Get("Content-Length"))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
roundTripReq.ContentLength = int64(cLength)
|
roundTripReq.ContentLength = int64(cLength)
|
||||||
}
|
}
|
||||||
|
@ -197,6 +197,9 @@ func (p *Proxy) proxyHTTPRequest(
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Add spans to response header (if available)
|
||||||
|
tr.AddSpans(resp.Header, p.log)
|
||||||
|
|
||||||
err = w.WriteRespHeaders(resp.StatusCode, resp.Header)
|
err = w.WriteRespHeaders(resp.StatusCode, resp.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Error writing response header")
|
return errors.Wrap(err, "Error writing response header")
|
||||||
|
@ -211,7 +214,7 @@ func (p *Proxy) proxyHTTPRequest(
|
||||||
|
|
||||||
eyeballStream := &bidirectionalStream{
|
eyeballStream := &bidirectionalStream{
|
||||||
writer: w,
|
writer: w,
|
||||||
reader: req.Body,
|
reader: tr.Request.Body,
|
||||||
}
|
}
|
||||||
|
|
||||||
websocket.Stream(eyeballStream, rwc, p.log)
|
websocket.Stream(eyeballStream, rwc, p.log)
|
||||||
|
|
|
@ -16,7 +16,8 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errNoTraces = errors.New("no traces recorded to be exported")
|
errNoTraces = errors.New("no traces recorded to be exported")
|
||||||
|
errNoopTracer = errors.New("noop tracer has no traces")
|
||||||
)
|
)
|
||||||
|
|
||||||
type InMemoryClient interface {
|
type InMemoryClient interface {
|
||||||
|
@ -86,5 +87,5 @@ func (mc *NoopOtlpClient) UploadTraces(_ context.Context, _ []*tracepb.ResourceS
|
||||||
|
|
||||||
// Spans always returns no traces error
|
// Spans always returns no traces error
|
||||||
func (mc *NoopOtlpClient) Spans() (string, error) {
|
func (mc *NoopOtlpClient) Spans() (string, error) {
|
||||||
return "", errNoTraces
|
return "", errNoopTracer
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog"
|
||||||
otelContrib "go.opentelemetry.io/contrib/propagators/Jaeger"
|
otelContrib "go.opentelemetry.io/contrib/propagators/Jaeger"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -23,11 +24,14 @@ const (
|
||||||
|
|
||||||
tracerContextName = "cf-trace-id"
|
tracerContextName = "cf-trace-id"
|
||||||
tracerContextNameOverride = "uber-trace-id"
|
tracerContextNameOverride = "uber-trace-id"
|
||||||
|
|
||||||
|
IntCloudflaredTracingHeader = "cf-int-cloudflared-tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
Http2TransportAttribute = trace.WithAttributes(TransportAttributeKey.String("http2"))
|
CanonicalCloudflaredTracingHeader = http.CanonicalHeaderKey(IntCloudflaredTracingHeader)
|
||||||
QuicTransportAttribute = trace.WithAttributes(TransportAttributeKey.String("quic"))
|
Http2TransportAttribute = trace.WithAttributes(TransportAttributeKey.String("http2"))
|
||||||
|
QuicTransportAttribute = trace.WithAttributes(TransportAttributeKey.String("quic"))
|
||||||
|
|
||||||
TransportAttributeKey = attribute.Key("transport")
|
TransportAttributeKey = attribute.Key("transport")
|
||||||
TrafficAttributeKey = attribute.Key("traffic")
|
TrafficAttributeKey = attribute.Key("traffic")
|
||||||
|
@ -75,8 +79,26 @@ func (cft *TracedRequest) Tracer() trace.Tracer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spans returns the spans as base64 encoded protobuf otlp traces.
|
// Spans returns the spans as base64 encoded protobuf otlp traces.
|
||||||
func (cft *TracedRequest) Spans() (string, error) {
|
func (cft *TracedRequest) AddSpans(headers http.Header, log *zerolog.Logger) {
|
||||||
return cft.exporter.Spans()
|
enc, err := cft.exporter.Spans()
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
break
|
||||||
|
case errNoTraces:
|
||||||
|
log.Error().Err(err).Msgf("expected traces to be available")
|
||||||
|
return
|
||||||
|
case errNoopTracer:
|
||||||
|
return // noop tracer has no traces
|
||||||
|
default:
|
||||||
|
log.Error().Err(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// No need to add header if no traces
|
||||||
|
if enc == "" {
|
||||||
|
log.Error().Msgf("no traces provided and no error from exporter")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
headers[CanonicalCloudflaredTracingHeader] = []string{enc}
|
||||||
}
|
}
|
||||||
|
|
||||||
// EndWithStatus will set a status for the span and then end it.
|
// EndWithStatus will set a status for the span and then end it.
|
||||||
|
|
Loading…
Reference in New Issue