From 2d5234e02187ee38a268a4802920e6836d036a03 Mon Sep 17 00:00:00 2001 From: cthuang Date: Fri, 14 Oct 2022 14:44:17 +0100 Subject: [PATCH] TUN-6858: Trace ICMP reply --- ingress/icmp_darwin.go | 26 ++++++++++--- ingress/icmp_linux.go | 64 +++++++++++++++++++++---------- ingress/origin_icmp_proxy_test.go | 35 +++++++++++------ ingress/packet_router.go | 14 ++++++- 4 files changed, 100 insertions(+), 39 deletions(-) diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 721aeb4f..640a2bb1 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -219,7 +219,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet") // In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after // the second reply - if err := ip.handleFullPacket(icmpDecoder, buf[:n]); err != nil { + if err := ip.handleFullPacket(ctx, icmpDecoder, buf[:n]); err != nil { ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply as full packet") } continue @@ -228,14 +228,14 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type) continue } - if err := ip.sendReply(reply); err != nil { + if err := ip.sendReply(ctx, reply); err != nil { ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") continue } } } -func (ip *icmpProxy) handleFullPacket(decoder *packet.ICMPDecoder, rawPacket []byte) error { +func (ip *icmpProxy) handleFullPacket(ctx context.Context, decoder *packet.ICMPDecoder, rawPacket []byte) error { icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket}) if err != nil { return err @@ -249,13 +249,13 @@ func (ip *icmpProxy) handleFullPacket(decoder *packet.ICMPDecoder, rawPacket []b msg: icmpPacket.Message, echo: echo, } - if ip.sendReply(&reply); err != nil { + if ip.sendReply(ctx, &reply); err != nil { return err } return nil } -func (ip *icmpProxy) sendReply(reply *echoReply) error { +func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error { funnelID := echoFunnelID(reply.echo.ID) funnel, ok := ip.srcFunnelTracker.Get(funnelID) if !ok { @@ -265,5 +265,19 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error { if err != nil { return err } - return icmpFlow.returnToSrc(reply) + + _, span := icmpFlow.responder.replySpan(ctx, ip.logger) + defer icmpFlow.responder.exportSpan() + + span.SetAttributes( + attribute.String("dst", reply.from.String()), + attribute.Int("echoID", reply.echo.ID), + attribute.Int("seq", reply.echo.Seq), + attribute.Int("originalEchoID", icmpFlow.originalEchoID), + ) + if err := icmpFlow.returnToSrc(reply); err != nil { + tracing.EndWithErrorStatus(span, err) + } + tracing.End(span) + return nil } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index 8e6d546a..b40d88bb 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -20,7 +20,6 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" "go.opentelemetry.io/otel/attribute" - "golang.org/x/net/icmp" "github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/tracing" @@ -113,7 +112,6 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa attribute.Int("seq", originalEcho.Seq), ) - newConnChan := make(chan *icmp.PacketConn, 1) newFunnelFunc := func() (packet.Funnel, error) { conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone) if err != nil { @@ -121,7 +119,6 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa return nil, errors.Wrap(err, "failed to open ICMP socket") } ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr()) - newConnChan <- conn closeCallback := func() error { return conn.Close() } @@ -157,10 +154,9 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa Str("dst", pk.Dst.String()). Int("originalEchoID", originalEcho.ID). Msg("New flow") - conn := <-newConnChan go func() { defer ip.srcFunnelTracker.Unregister(funnelID, icmpFlow) - if err := ip.listenResponse(icmpFlow, conn); err != nil { + if err := ip.listenResponse(ctx, icmpFlow); err != nil { ip.logger.Debug().Err(err). Str("src", pk.Src.String()). Str("dst", pk.Dst.String()). @@ -182,29 +178,55 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { return ctx.Err() } -func (ip *icmpProxy) listenResponse(flow *icmpEchoFlow, conn *icmp.PacketConn) error { +func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) error { buf := make([]byte, mtu) for { - n, from, err := conn.ReadFrom(buf) - if err != nil { + retryable, err := ip.handleResponse(ctx, flow, buf) + if err != nil && !retryable { return err } - reply, err := parseReply(from, buf[:n]) - if err != nil { - ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply") - continue - } - if !isEchoReply(reply.msg) { - ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type) - continue - } - if err := flow.returnToSrc(reply); err != nil { - ip.logger.Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") - continue - } } } +func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (retryableErr bool, err error) { + _, span := flow.responder.replySpan(ctx, ip.logger) + defer flow.responder.exportSpan() + + span.SetAttributes( + attribute.Int("originalEchoID", flow.originalEchoID), + ) + + n, from, err := flow.originConn.ReadFrom(buf) + if err != nil { + tracing.EndWithErrorStatus(span, err) + return false, err + } + reply, err := parseReply(from, buf[:n]) + if err != nil { + ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply") + tracing.EndWithErrorStatus(span, err) + return true, err + } + if !isEchoReply(reply.msg) { + err := fmt.Errorf("Expect ICMP echo reply, got %s", reply.msg.Type) + ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type) + tracing.EndWithErrorStatus(span, err) + return true, err + } + span.SetAttributes( + attribute.String("dst", reply.from.String()), + attribute.Int("echoID", reply.echo.ID), + attribute.Int("seq", reply.echo.Seq), + ) + if err := flow.returnToSrc(reply); err != nil { + ip.logger.Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") + tracing.EndWithErrorStatus(span, err) + return true, err + } + tracing.End(span) + return true, nil +} + // Only linux uses flow3Tuple as FunnelID func (ft flow3Tuple) Type() string { return "srcIP_dstIP_echoID" diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index c36aeab7..314ec3c4 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "net/netip" - "os" "runtime" "strings" "sync" @@ -109,8 +108,7 @@ func TestTraceICMPRouterEcho(t *testing.T) { } tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1" - logger := zerolog.New(os.Stderr) - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &logger) + router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -120,8 +118,8 @@ func TestTraceICMPRouterEcho(t *testing.T) { close(proxyDone) }() - // Buffer 2 packets, reply and request span - muxer := newMockMuxer(2) + // Buffer 3 packets, request span, reply span and reply + muxer := newMockMuxer(3) tracingIdentity, err := tracing.NewIdentity(tracingCtx) require.NoError(t, err) serializedIdentity, err := tracingIdentity.MarshalBinary() @@ -129,7 +127,7 @@ func TestTraceICMPRouterEcho(t *testing.T) { responder := packetResponder{ datagramMuxer: muxer, - tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &logger), + tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &noopLogger), serializedIdentity: serializedIdentity, } @@ -153,21 +151,36 @@ func TestTraceICMPRouterEcho(t *testing.T) { } require.NoError(t, router.Request(ctx, &pk, &responder)) validateEchoFlow(t, muxer, &pk) + + // Request span receivedPacket := <-muxer.cfdToEdge - tracingSpanPacket, ok := receivedPacket.(*quicpogs.TracingSpanPacket) + requestSpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket) require.True(t, ok) - require.NotEmpty(t, tracingSpanPacket.Spans) - require.True(t, bytes.Equal(serializedIdentity, tracingSpanPacket.TracingIdentity)) + require.NotEmpty(t, requestSpan.Spans) + require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity)) + // Reply span + receivedPacket = <-muxer.cfdToEdge + replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket) + require.True(t, ok) + require.NotEmpty(t, replySpan.Spans) + require.True(t, bytes.Equal(serializedIdentity, replySpan.TracingIdentity)) + require.False(t, bytes.Equal(requestSpan.Spans, replySpan.Spans)) echo.Seq++ pk.Body = echo // Only first request for a flow is traced. The edge will not send tracing context for the second request - responder = packetResponder{ + newResponder := packetResponder{ datagramMuxer: muxer, } - require.NoError(t, router.Request(ctx, &pk, &responder)) + require.NoError(t, router.Request(ctx, &pk, &newResponder)) validateEchoFlow(t, muxer, &pk) + select { + case receivedPacket = <-muxer.cfdToEdge: + panic(fmt.Sprintf("Receive unexpected packet %+v", receivedPacket)) + default: + } + cancel() <-proxyDone } diff --git a/ingress/packet_router.go b/ingress/packet_router.go index d4919e2c..1b311aa5 100644 --- a/ingress/packet_router.go +++ b/ingress/packet_router.go @@ -135,10 +135,13 @@ func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, ra return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed)) } +// packetResponder should not be used concurrently. This assumption is upheld because reply packets are ready one-by-one type packetResponder struct { datagramMuxer muxer tracedCtx *tracing.TracedContext serializedIdentity []byte + // hadReply tracks if there has been any reply for this flow + hadReply bool } func (pr *packetResponder) tracingEnabled() bool { @@ -146,6 +149,7 @@ func (pr *packetResponder) tracingEnabled() bool { } func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error { + pr.hadReply = true return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket)) } @@ -159,13 +163,21 @@ func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (co )) } +func (pr *packetResponder) replySpan(ctx context.Context, logger *zerolog.Logger) (context.Context, trace.Span) { + if !pr.tracingEnabled() || pr.hadReply { + return ctx, tracing.NewNoopSpan() + } + return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-reply") +} + func (pr *packetResponder) exportSpan() { if !pr.tracingEnabled() { return } spans := pr.tracedCtx.GetProtoSpans() - pr.tracedCtx.ClearSpans() if len(spans) > 0 { + // Make sure spans are cleared after they are sent + defer pr.tracedCtx.ClearSpans() pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{ Spans: spans, TracingIdentity: pr.serializedIdentity,