TUN-6347: Add TCP stream logs with FlowID

This commit is contained in:
Sudarsan Reddy 2022-06-09 13:55:26 +01:00
parent 4f468b8a5d
commit 69b28e358c
3 changed files with 20 additions and 5 deletions

View File

@ -132,6 +132,7 @@ type TCPRequest struct {
Dest string
CFRay string
LBProbe bool
FlowID string
}
// ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has

View File

@ -31,6 +31,8 @@ const (
HTTPMethodKey = "HttpMethod"
// HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP.
HTTPHostKey = "HttpHost"
QUICMetadataFlowID = "FlowID"
)
// QUICConnection represents the type that facilitates Proxying via QUIC streams.
@ -180,6 +182,7 @@ func (q *QUICConnection) handleDataStream(stream *quicpogs.RequestServerStream)
if err != nil {
return err
}
switch connectRequest.Type {
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
tracedReq, err := buildHTTPRequest(connectRequest, stream)
@ -191,7 +194,9 @@ func (q *QUICConnection) handleDataStream(stream *quicpogs.RequestServerStream)
return originProxy.ProxyHTTP(w, tracedReq, connectRequest.Type == quicpogs.ConnectionTypeWebsocket)
case quicpogs.ConnectionTypeTCP:
rwa := &streamReadWriteAcker{stream}
return originProxy.ProxyTCP(context.Background(), rwa, &TCPRequest{Dest: connectRequest.Dest})
metadata := connectRequest.MetadataMap()
return originProxy.ProxyTCP(context.Background(), rwa, &TCPRequest{Dest: connectRequest.Dest,
FlowID: metadata[QUICMetadataFlowID]})
}
return nil
}

View File

@ -28,6 +28,7 @@ const (
LogFieldCFRay = "cfRay"
LogFieldRule = "ingressRule"
LogFieldOriginService = "originService"
LogFieldFlowID = "flowID"
)
// Proxy represents a means to Proxy between cloudflared and the origin services.
@ -96,7 +97,7 @@ func (p *Proxy) ProxyHTTP(
logFields,
); err != nil {
rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, rule, srv)
p.logRequestError(err, cfRay, "", rule, srv)
return err
}
return nil
@ -109,7 +110,7 @@ func (p *Proxy) ProxyHTTP(
rws := connection.NewHTTPResponseReadWriterAcker(w, req)
if err := p.proxyStream(req.Context(), rws, dest, originProxy, logFields); err != nil {
rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, rule, srv)
p.logRequestError(err, cfRay, "", rule, srv)
return err
}
return nil
@ -140,13 +141,17 @@ func (p *Proxy) ProxyTCP(
cfRay: req.CFRay,
lbProbe: req.LBProbe,
rule: ingress.ServiceWarpRouting,
flowID: req.FlowID,
}
p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream started")
if err := p.proxyStream(serveCtx, rwa, req.Dest, p.warpRouting.Proxy, logFields); err != nil {
p.logRequestError(err, req.CFRay, "", ingress.ServiceWarpRouting)
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
return err
}
p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream finished successfully")
return nil
}
@ -317,6 +322,7 @@ type logFields struct {
cfRay string
lbProbe bool
rule interface{}
flowID string
}
func (p *Proxy) logRequest(r *http.Request, fields logFields) {
@ -360,12 +366,15 @@ func (p *Proxy) logOriginResponse(resp *http.Response, fields logFields) {
}
}
func (p *Proxy) logRequestError(err error, cfRay string, rule, service string) {
func (p *Proxy) logRequestError(err error, cfRay string, flowID string, rule, service string) {
requestErrors.Inc()
log := p.log.Error().Err(err)
if cfRay != "" {
log = log.Str(LogFieldCFRay, cfRay)
}
if flowID != "" {
log = log.Str(LogFieldFlowID, flowID)
}
if rule != "" {
log = log.Str(LogFieldRule, rule)
}