TUN-8238: Refactor proxy logging
Propagates the logger context into further locations to help provide more context for certain errors. For instance, upstream and downstream copying errors will properly have the assigned flow id attached and destination address.
This commit is contained in:
parent
76badfa01b
commit
971360d5e0
|
@ -0,0 +1,78 @@
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
|
||||||
|
"github.com/cloudflare/cloudflared/connection"
|
||||||
|
"github.com/cloudflare/cloudflared/ingress"
|
||||||
|
"github.com/cloudflare/cloudflared/management"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
logFieldCFRay = "cfRay"
|
||||||
|
logFieldLBProbe = "lbProbe"
|
||||||
|
logFieldRule = "ingressRule"
|
||||||
|
logFieldOriginService = "originService"
|
||||||
|
logFieldFlowID = "flowID"
|
||||||
|
logFieldConnIndex = "connIndex"
|
||||||
|
logFieldDestAddr = "destAddr"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newHTTPLogger creates a child zerolog.Logger from the provided with added context from the HTTP request, ingress
|
||||||
|
// services, and connection index.
|
||||||
|
func newHTTPLogger(logger *zerolog.Logger, connIndex uint8, req *http.Request, rule int, serviceName string) zerolog.Logger {
|
||||||
|
ctx := logger.With().
|
||||||
|
Int(management.EventTypeKey, int(management.HTTP)).
|
||||||
|
Uint8(logFieldConnIndex, connIndex)
|
||||||
|
cfRay := connection.FindCfRayHeader(req)
|
||||||
|
lbProbe := connection.IsLBProbeRequest(req)
|
||||||
|
if cfRay != "" {
|
||||||
|
ctx.Str(logFieldCFRay, cfRay)
|
||||||
|
}
|
||||||
|
if lbProbe {
|
||||||
|
ctx.Bool(logFieldLBProbe, lbProbe)
|
||||||
|
}
|
||||||
|
return ctx.
|
||||||
|
Str(logFieldOriginService, serviceName).
|
||||||
|
Interface(logFieldRule, rule).
|
||||||
|
Logger()
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTCPLogger creates a child zerolog.Logger from the provided with added context from the TCPRequest.
|
||||||
|
func newTCPLogger(logger *zerolog.Logger, req *connection.TCPRequest) zerolog.Logger {
|
||||||
|
return logger.With().
|
||||||
|
Int(management.EventTypeKey, int(management.TCP)).
|
||||||
|
Uint8(logFieldConnIndex, req.ConnIndex).
|
||||||
|
Str(logFieldOriginService, ingress.ServiceWarpRouting).
|
||||||
|
Str(logFieldFlowID, req.FlowID).
|
||||||
|
Str(logFieldDestAddr, req.Dest).
|
||||||
|
Uint8(logFieldConnIndex, req.ConnIndex).
|
||||||
|
Logger()
|
||||||
|
}
|
||||||
|
|
||||||
|
// logHTTPRequest logs a Debug message with the corresponding HTTP request details from the eyeball.
|
||||||
|
func logHTTPRequest(logger *zerolog.Logger, r *http.Request) {
|
||||||
|
logger.Debug().
|
||||||
|
Str("host", r.Host).
|
||||||
|
Str("path", r.URL.Path).
|
||||||
|
Interface("headers", r.Header).
|
||||||
|
Int64("content-length", r.ContentLength).
|
||||||
|
Msgf("%s %s %s", r.Method, r.URL, r.Proto)
|
||||||
|
}
|
||||||
|
|
||||||
|
// logOriginHTTPResponse logs a Debug message of the origin response.
|
||||||
|
func logOriginHTTPResponse(logger *zerolog.Logger, resp *http.Response) {
|
||||||
|
responseByCode.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()
|
||||||
|
logger.Debug().
|
||||||
|
Int64("content-length", resp.ContentLength).
|
||||||
|
Msgf("%s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// logRequestError logs an error for the proxied request.
|
||||||
|
func logRequestError(logger *zerolog.Logger, err error) {
|
||||||
|
requestErrors.Inc()
|
||||||
|
logger.Error().Err(err).Send()
|
||||||
|
}
|
142
proxy/proxy.go
142
proxy/proxy.go
|
@ -17,7 +17,6 @@ import (
|
||||||
"github.com/cloudflare/cloudflared/cfio"
|
"github.com/cloudflare/cloudflared/cfio"
|
||||||
"github.com/cloudflare/cloudflared/connection"
|
"github.com/cloudflare/cloudflared/connection"
|
||||||
"github.com/cloudflare/cloudflared/ingress"
|
"github.com/cloudflare/cloudflared/ingress"
|
||||||
"github.com/cloudflare/cloudflared/management"
|
|
||||||
"github.com/cloudflare/cloudflared/stream"
|
"github.com/cloudflare/cloudflared/stream"
|
||||||
"github.com/cloudflare/cloudflared/tracing"
|
"github.com/cloudflare/cloudflared/tracing"
|
||||||
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||||
|
@ -25,16 +24,8 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// TagHeaderNamePrefix indicates a Cloudflared Warp Tag prefix that gets appended for warp traffic stream headers.
|
// TagHeaderNamePrefix indicates a Cloudflared Warp Tag prefix that gets appended for warp traffic stream headers.
|
||||||
TagHeaderNamePrefix = "Cf-Warp-Tag-"
|
TagHeaderNamePrefix = "Cf-Warp-Tag-"
|
||||||
LogFieldCFRay = "cfRay"
|
trailerHeaderName = "Trailer"
|
||||||
LogFieldLBProbe = "lbProbe"
|
|
||||||
LogFieldRule = "ingressRule"
|
|
||||||
LogFieldOriginService = "originService"
|
|
||||||
LogFieldFlowID = "flowID"
|
|
||||||
LogFieldConnIndex = "connIndex"
|
|
||||||
LogFieldDestAddr = "destAddr"
|
|
||||||
|
|
||||||
trailerHeaderName = "Trailer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Proxy represents a means to Proxy between cloudflared and the origin services.
|
// Proxy represents a means to Proxy between cloudflared and the origin services.
|
||||||
|
@ -91,26 +82,18 @@ func (p *Proxy) ProxyHTTP(
|
||||||
defer decrementConcurrentRequests()
|
defer decrementConcurrentRequests()
|
||||||
|
|
||||||
req := tr.Request
|
req := tr.Request
|
||||||
cfRay := connection.FindCfRayHeader(req)
|
|
||||||
lbProbe := connection.IsLBProbeRequest(req)
|
|
||||||
p.appendTagHeaders(req)
|
p.appendTagHeaders(req)
|
||||||
|
|
||||||
_, ruleSpan := tr.Tracer().Start(req.Context(), "ingress_match",
|
_, ruleSpan := tr.Tracer().Start(req.Context(), "ingress_match",
|
||||||
trace.WithAttributes(attribute.String("req-host", req.Host)))
|
trace.WithAttributes(attribute.String("req-host", req.Host)))
|
||||||
rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
|
rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
|
||||||
logFields := logFields{
|
|
||||||
cfRay: cfRay,
|
|
||||||
lbProbe: lbProbe,
|
|
||||||
rule: ruleNum,
|
|
||||||
connIndex: tr.ConnIndex,
|
|
||||||
}
|
|
||||||
p.logRequest(req, logFields)
|
|
||||||
ruleSpan.SetAttributes(attribute.Int("rule-num", ruleNum))
|
ruleSpan.SetAttributes(attribute.Int("rule-num", ruleNum))
|
||||||
ruleSpan.End()
|
ruleSpan.End()
|
||||||
|
logger := newHTTPLogger(p.log, tr.ConnIndex, req, ruleNum, rule.Service.String())
|
||||||
|
logHTTPRequest(&logger, req)
|
||||||
if err, applied := p.applyIngressMiddleware(rule, req, w); err != nil {
|
if err, applied := p.applyIngressMiddleware(rule, req, w); err != nil {
|
||||||
if applied {
|
if applied {
|
||||||
rule, srv := ruleField(p.ingressRules, ruleNum)
|
logRequestError(&logger, err)
|
||||||
p.logRequestError(err, cfRay, "", rule, srv)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -124,10 +107,9 @@ func (p *Proxy) ProxyHTTP(
|
||||||
originProxy,
|
originProxy,
|
||||||
isWebsocket,
|
isWebsocket,
|
||||||
rule.Config.DisableChunkedEncoding,
|
rule.Config.DisableChunkedEncoding,
|
||||||
logFields,
|
&logger,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
rule, srv := ruleField(p.ingressRules, ruleNum)
|
logRequestError(&logger, err)
|
||||||
p.logRequestError(err, cfRay, "", rule, srv)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -141,14 +123,9 @@ func (p *Proxy) ProxyHTTP(
|
||||||
return fmt.Errorf("response writer is not a flusher")
|
return fmt.Errorf("response writer is not a flusher")
|
||||||
}
|
}
|
||||||
rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req)
|
rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req)
|
||||||
logger := p.log.With().
|
logger := logger.With().Str(logFieldDestAddr, dest).Logger()
|
||||||
Int(management.EventTypeKey, int(management.HTTP)).
|
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, &logger); err != nil {
|
||||||
Str(LogFieldDestAddr, dest).
|
logRequestError(&logger, err)
|
||||||
Uint8(LogFieldConnIndex, tr.ConnIndex).
|
|
||||||
Logger()
|
|
||||||
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, logger); err != nil {
|
|
||||||
rule, srv := ruleField(p.ingressRules, ruleNum)
|
|
||||||
p.logRequestError(err, cfRay, "", rule, srv)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -178,19 +155,12 @@ func (p *Proxy) ProxyTCP(
|
||||||
serveCtx, cancel := context.WithCancel(ctx)
|
serveCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log)
|
logger := newTCPLogger(p.log, req)
|
||||||
|
tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, &logger)
|
||||||
logger := p.log.With().
|
|
||||||
Int(management.EventTypeKey, int(management.TCP)).
|
|
||||||
Str(LogFieldFlowID, req.FlowID).
|
|
||||||
Str(LogFieldDestAddr, req.Dest).
|
|
||||||
Uint8(LogFieldConnIndex, req.ConnIndex).
|
|
||||||
Logger()
|
|
||||||
|
|
||||||
logger.Debug().Msg("tcp proxy stream started")
|
logger.Debug().Msg("tcp proxy stream started")
|
||||||
|
|
||||||
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, logger); err != nil {
|
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, &logger); err != nil {
|
||||||
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
|
logRequestError(&logger, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,14 +169,6 @@ func (p *Proxy) ProxyTCP(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ruleField(ing ingress.Ingress, ruleNum int) (ruleID string, srv string) {
|
|
||||||
srv = ing.Rules[ruleNum].Service.String()
|
|
||||||
if ing.IsSingleRule() {
|
|
||||||
return "", srv
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%d", ruleNum), srv
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
|
// ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
|
||||||
func (p *Proxy) proxyHTTPRequest(
|
func (p *Proxy) proxyHTTPRequest(
|
||||||
w connection.ResponseWriter,
|
w connection.ResponseWriter,
|
||||||
|
@ -214,7 +176,7 @@ func (p *Proxy) proxyHTTPRequest(
|
||||||
httpService ingress.HTTPOriginProxy,
|
httpService ingress.HTTPOriginProxy,
|
||||||
isWebsocket bool,
|
isWebsocket bool,
|
||||||
disableChunkedEncoding bool,
|
disableChunkedEncoding bool,
|
||||||
fields logFields,
|
logger *zerolog.Logger,
|
||||||
) error {
|
) error {
|
||||||
roundTripReq := tr.Request
|
roundTripReq := tr.Request
|
||||||
if isWebsocket {
|
if isWebsocket {
|
||||||
|
@ -281,7 +243,7 @@ func (p *Proxy) proxyHTTPRequest(
|
||||||
reader: tr.Request.Body,
|
reader: tr.Request.Body,
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.Pipe(eyeballStream, rwc, p.log)
|
stream.Pipe(eyeballStream, rwc, logger)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +254,7 @@ func (p *Proxy) proxyHTTPRequest(
|
||||||
// copy trailers
|
// copy trailers
|
||||||
copyTrailers(w, resp)
|
copyTrailers(w, resp)
|
||||||
|
|
||||||
p.logOriginResponse(resp, fields)
|
logOriginHTTPResponse(logger, resp)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,7 +266,7 @@ func (p *Proxy) proxyStream(
|
||||||
rwa connection.ReadWriteAcker,
|
rwa connection.ReadWriteAcker,
|
||||||
dest string,
|
dest string,
|
||||||
connectionProxy ingress.StreamBasedOriginProxy,
|
connectionProxy ingress.StreamBasedOriginProxy,
|
||||||
logger zerolog.Logger,
|
logger *zerolog.Logger,
|
||||||
) error {
|
) error {
|
||||||
ctx := tr.Context
|
ctx := tr.Context
|
||||||
_, connectSpan := tr.Tracer().Start(ctx, "stream-connect")
|
_, connectSpan := tr.Tracer().Start(ctx, "stream-connect")
|
||||||
|
@ -330,7 +292,7 @@ func (p *Proxy) proxyStream(
|
||||||
connectLatency.Observe(float64(time.Since(start).Milliseconds()))
|
connectLatency.Observe(float64(time.Since(start).Milliseconds()))
|
||||||
logger.Debug().Msg("proxy stream acknowledged")
|
logger.Debug().Msg("proxy stream acknowledged")
|
||||||
|
|
||||||
originConn.Stream(ctx, rwa, p.log)
|
originConn.Stream(ctx, rwa, logger)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,14 +326,6 @@ func (p *Proxy) appendTagHeaders(r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type logFields struct {
|
|
||||||
cfRay string
|
|
||||||
lbProbe bool
|
|
||||||
rule int
|
|
||||||
flowID string
|
|
||||||
connIndex uint8
|
|
||||||
}
|
|
||||||
|
|
||||||
func copyTrailers(w connection.ResponseWriter, response *http.Response) {
|
func copyTrailers(w connection.ResponseWriter, response *http.Response) {
|
||||||
for trailerHeader, trailerValues := range response.Trailer {
|
for trailerHeader, trailerValues := range response.Trailer {
|
||||||
for _, trailerValue := range trailerValues {
|
for _, trailerValue := range trailerValues {
|
||||||
|
@ -380,64 +334,6 @@ func copyTrailers(w connection.ResponseWriter, response *http.Response) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Proxy) logRequest(r *http.Request, fields logFields) {
|
|
||||||
log := p.log.With().Int(management.EventTypeKey, int(management.HTTP)).Logger()
|
|
||||||
event := log.Debug()
|
|
||||||
if fields.cfRay != "" {
|
|
||||||
event = event.Str(LogFieldCFRay, fields.cfRay)
|
|
||||||
}
|
|
||||||
if fields.lbProbe {
|
|
||||||
event = event.Bool(LogFieldLBProbe, fields.lbProbe)
|
|
||||||
}
|
|
||||||
if fields.cfRay == "" && !fields.lbProbe {
|
|
||||||
log.Debug().Msgf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", r.Method, r.URL, r.Proto)
|
|
||||||
}
|
|
||||||
event.
|
|
||||||
Uint8(LogFieldConnIndex, fields.connIndex).
|
|
||||||
Str("host", r.Host).
|
|
||||||
Str("path", r.URL.Path).
|
|
||||||
Interface(LogFieldRule, fields.rule).
|
|
||||||
Interface("headers", r.Header).
|
|
||||||
Int64("content-length", r.ContentLength).
|
|
||||||
Msgf("%s %s %s", r.Method, r.URL, r.Proto)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Proxy) logOriginResponse(resp *http.Response, fields logFields) {
|
|
||||||
responseByCode.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()
|
|
||||||
event := p.log.Debug()
|
|
||||||
if fields.cfRay != "" {
|
|
||||||
event = event.Str(LogFieldCFRay, fields.cfRay)
|
|
||||||
}
|
|
||||||
if fields.lbProbe {
|
|
||||||
event = event.Bool(LogFieldLBProbe, fields.lbProbe)
|
|
||||||
}
|
|
||||||
event.
|
|
||||||
Int(management.EventTypeKey, int(management.HTTP)).
|
|
||||||
Uint8(LogFieldConnIndex, fields.connIndex).
|
|
||||||
Int64("content-length", resp.ContentLength).
|
|
||||||
Msgf("%s", resp.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Proxy) logRequestError(err error, cfRay string, flowID string, rule, service string) {
|
|
||||||
requestErrors.Inc()
|
|
||||||
log := p.log.Error().Err(err)
|
|
||||||
if cfRay != "" {
|
|
||||||
log = log.Str(LogFieldCFRay, cfRay)
|
|
||||||
}
|
|
||||||
if flowID != "" {
|
|
||||||
log = log.Str(LogFieldFlowID, flowID).Int(management.EventTypeKey, int(management.TCP))
|
|
||||||
} else {
|
|
||||||
log = log.Int(management.EventTypeKey, int(management.HTTP))
|
|
||||||
}
|
|
||||||
if rule != "" {
|
|
||||||
log = log.Str(LogFieldRule, rule)
|
|
||||||
}
|
|
||||||
if service != "" {
|
|
||||||
log = log.Str(LogFieldOriginService, service)
|
|
||||||
}
|
|
||||||
log.Send()
|
|
||||||
}
|
|
||||||
|
|
||||||
func getDestFromRule(rule *ingress.Rule, req *http.Request) (string, error) {
|
func getDestFromRule(rule *ingress.Rule, req *http.Request) (string, error) {
|
||||||
switch rule.Service.String() {
|
switch rule.Service.String() {
|
||||||
case ingress.ServiceBastion:
|
case ingress.ServiceBastion:
|
||||||
|
|
Loading…
Reference in New Issue