TUN-8224: Fix safety of TCP stream logging, separate connect and ack log messages

This commit is contained in:
Igor Postelnik 2024-02-09 09:56:56 -06:00
parent a9aa48d7a1
commit 56aeb6be65
1 changed files with 16 additions and 15 deletions

View File

@ -140,11 +140,12 @@ func (p *Proxy) ProxyHTTP(
return fmt.Errorf("response writer is not a flusher") return fmt.Errorf("response writer is not a flusher")
} }
rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req) rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req)
connectedLogger := p.log.Debug(). logger := p.log.With().
Int(management.EventTypeKey, int(management.HTTP)). Int(management.EventTypeKey, int(management.HTTP)).
Str(LogFieldDestAddr, dest). Str(LogFieldDestAddr, dest).
Uint8(LogFieldConnIndex, tr.ConnIndex) Uint8(LogFieldConnIndex, tr.ConnIndex).
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, connectedLogger); err != nil { Logger()
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, logger); err != nil {
rule, srv := ruleField(p.ingressRules, ruleNum) rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, "", rule, srv) p.logRequestError(err, cfRay, "", rule, srv)
return err return err
@ -178,22 +179,21 @@ func (p *Proxy) ProxyTCP(
tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log) tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log)
debugLogger := func() *zerolog.Event { logger := p.log.With().
return p.log.Debug(). Int(management.EventTypeKey, int(management.TCP)).
Int(management.EventTypeKey, int(management.TCP)). Str(LogFieldFlowID, req.FlowID).
Str(LogFieldFlowID, req.FlowID). Str(LogFieldDestAddr, req.Dest).
Str(LogFieldDestAddr, req.Dest). Uint8(LogFieldConnIndex, req.ConnIndex).
Uint8(LogFieldConnIndex, req.ConnIndex) Logger()
}
debugLogger().Msg("tcp proxy stream started") logger.Debug().Msg("tcp proxy stream started")
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, debugLogger()); err != nil { if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, logger); err != nil {
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting) p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
return err return err
} }
debugLogger().Msg("tcp proxy stream finished successfully") logger.Debug().Msg("tcp proxy stream finished successfully")
return nil return nil
} }
@ -303,7 +303,7 @@ func (p *Proxy) proxyStream(
rwa connection.ReadWriteAcker, rwa connection.ReadWriteAcker,
dest string, dest string,
connectionProxy ingress.StreamBasedOriginProxy, connectionProxy ingress.StreamBasedOriginProxy,
connectedLogger *zerolog.Event, logger zerolog.Logger,
) error { ) error {
ctx := tr.Context ctx := tr.Context
_, connectSpan := tr.Tracer().Start(ctx, "stream-connect") _, connectSpan := tr.Tracer().Start(ctx, "stream-connect")
@ -317,6 +317,7 @@ func (p *Proxy) proxyStream(
} }
connectSpan.End() connectSpan.End()
defer originConn.Close() defer originConn.Close()
logger.Debug().Msg("origin connection established")
encodedSpans := tr.GetSpans() encodedSpans := tr.GetSpans()
@ -326,7 +327,7 @@ func (p *Proxy) proxyStream(
} }
connectLatency.Observe(float64(time.Since(start).Milliseconds())) connectLatency.Observe(float64(time.Since(start).Milliseconds()))
connectedLogger.Msg("proxy stream established") logger.Debug().Msg("proxy stream acknowledged")
originConn.Stream(ctx, rwa, p.log) originConn.Stream(ctx, rwa, p.log)
return nil return nil