TUN-6858: Trace ICMP reply

This commit is contained in:
cthuang 2022-10-14 14:44:17 +01:00
parent b6bd8c1f5e
commit 2d5234e021
4 changed files with 100 additions and 39 deletions

View File

@ -219,7 +219,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet") ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet")
// In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after // In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after
// the second reply // the second reply
if err := ip.handleFullPacket(icmpDecoder, buf[:n]); err != nil { if err := ip.handleFullPacket(ctx, icmpDecoder, buf[:n]); err != nil {
ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply as full packet") ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply as full packet")
} }
continue continue
@ -228,14 +228,14 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type) ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
continue continue
} }
if err := ip.sendReply(reply); err != nil { if err := ip.sendReply(ctx, reply); err != nil {
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
continue continue
} }
} }
} }
func (ip *icmpProxy) handleFullPacket(decoder *packet.ICMPDecoder, rawPacket []byte) error { func (ip *icmpProxy) handleFullPacket(ctx context.Context, decoder *packet.ICMPDecoder, rawPacket []byte) error {
icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket}) icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket})
if err != nil { if err != nil {
return err return err
@ -249,13 +249,13 @@ func (ip *icmpProxy) handleFullPacket(decoder *packet.ICMPDecoder, rawPacket []b
msg: icmpPacket.Message, msg: icmpPacket.Message,
echo: echo, echo: echo,
} }
if ip.sendReply(&reply); err != nil { if ip.sendReply(ctx, &reply); err != nil {
return err return err
} }
return nil return nil
} }
func (ip *icmpProxy) sendReply(reply *echoReply) error { func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
funnelID := echoFunnelID(reply.echo.ID) funnelID := echoFunnelID(reply.echo.ID)
funnel, ok := ip.srcFunnelTracker.Get(funnelID) funnel, ok := ip.srcFunnelTracker.Get(funnelID)
if !ok { if !ok {
@ -265,5 +265,19 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error {
if err != nil { if err != nil {
return err return err
} }
return icmpFlow.returnToSrc(reply)
_, span := icmpFlow.responder.replySpan(ctx, ip.logger)
defer icmpFlow.responder.exportSpan()
span.SetAttributes(
attribute.String("dst", reply.from.String()),
attribute.Int("echoID", reply.echo.ID),
attribute.Int("seq", reply.echo.Seq),
attribute.Int("originalEchoID", icmpFlow.originalEchoID),
)
if err := icmpFlow.returnToSrc(reply); err != nil {
tracing.EndWithErrorStatus(span, err)
}
tracing.End(span)
return nil
} }

View File

@ -20,7 +20,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"golang.org/x/net/icmp"
"github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/packet"
"github.com/cloudflare/cloudflared/tracing" "github.com/cloudflare/cloudflared/tracing"
@ -113,7 +112,6 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
attribute.Int("seq", originalEcho.Seq), attribute.Int("seq", originalEcho.Seq),
) )
newConnChan := make(chan *icmp.PacketConn, 1)
newFunnelFunc := func() (packet.Funnel, error) { newFunnelFunc := func() (packet.Funnel, error) {
conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone) conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone)
if err != nil { if err != nil {
@ -121,7 +119,6 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
return nil, errors.Wrap(err, "failed to open ICMP socket") return nil, errors.Wrap(err, "failed to open ICMP socket")
} }
ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr()) ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr())
newConnChan <- conn
closeCallback := func() error { closeCallback := func() error {
return conn.Close() return conn.Close()
} }
@ -157,10 +154,9 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
Str("dst", pk.Dst.String()). Str("dst", pk.Dst.String()).
Int("originalEchoID", originalEcho.ID). Int("originalEchoID", originalEcho.ID).
Msg("New flow") Msg("New flow")
conn := <-newConnChan
go func() { go func() {
defer ip.srcFunnelTracker.Unregister(funnelID, icmpFlow) defer ip.srcFunnelTracker.Unregister(funnelID, icmpFlow)
if err := ip.listenResponse(icmpFlow, conn); err != nil { if err := ip.listenResponse(ctx, icmpFlow); err != nil {
ip.logger.Debug().Err(err). ip.logger.Debug().Err(err).
Str("src", pk.Src.String()). Str("src", pk.Src.String()).
Str("dst", pk.Dst.String()). Str("dst", pk.Dst.String()).
@ -182,27 +178,53 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
return ctx.Err() return ctx.Err()
} }
func (ip *icmpProxy) listenResponse(flow *icmpEchoFlow, conn *icmp.PacketConn) error { func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) error {
buf := make([]byte, mtu) buf := make([]byte, mtu)
for { for {
n, from, err := conn.ReadFrom(buf) retryable, err := ip.handleResponse(ctx, flow, buf)
if err != nil { if err != nil && !retryable {
return err return err
} }
}
}
func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (retryableErr bool, err error) {
_, span := flow.responder.replySpan(ctx, ip.logger)
defer flow.responder.exportSpan()
span.SetAttributes(
attribute.Int("originalEchoID", flow.originalEchoID),
)
n, from, err := flow.originConn.ReadFrom(buf)
if err != nil {
tracing.EndWithErrorStatus(span, err)
return false, err
}
reply, err := parseReply(from, buf[:n]) reply, err := parseReply(from, buf[:n])
if err != nil { if err != nil {
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply") ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply")
continue tracing.EndWithErrorStatus(span, err)
return true, err
} }
if !isEchoReply(reply.msg) { if !isEchoReply(reply.msg) {
err := fmt.Errorf("Expect ICMP echo reply, got %s", reply.msg.Type)
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type) ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
continue tracing.EndWithErrorStatus(span, err)
return true, err
} }
span.SetAttributes(
attribute.String("dst", reply.from.String()),
attribute.Int("echoID", reply.echo.ID),
attribute.Int("seq", reply.echo.Seq),
)
if err := flow.returnToSrc(reply); err != nil { if err := flow.returnToSrc(reply); err != nil {
ip.logger.Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") ip.logger.Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
continue tracing.EndWithErrorStatus(span, err)
} return true, err
} }
tracing.End(span)
return true, nil
} }
// Only linux uses flow3Tuple as FunnelID // Only linux uses flow3Tuple as FunnelID

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"net" "net"
"net/netip" "net/netip"
"os"
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
@ -109,8 +108,7 @@ func TestTraceICMPRouterEcho(t *testing.T) {
} }
tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1" tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1"
logger := zerolog.New(os.Stderr) router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &logger)
require.NoError(t, err) require.NoError(t, err)
proxyDone := make(chan struct{}) proxyDone := make(chan struct{})
@ -120,8 +118,8 @@ func TestTraceICMPRouterEcho(t *testing.T) {
close(proxyDone) close(proxyDone)
}() }()
// Buffer 2 packets, reply and request span // Buffer 3 packets, request span, reply span and reply
muxer := newMockMuxer(2) muxer := newMockMuxer(3)
tracingIdentity, err := tracing.NewIdentity(tracingCtx) tracingIdentity, err := tracing.NewIdentity(tracingCtx)
require.NoError(t, err) require.NoError(t, err)
serializedIdentity, err := tracingIdentity.MarshalBinary() serializedIdentity, err := tracingIdentity.MarshalBinary()
@ -129,7 +127,7 @@ func TestTraceICMPRouterEcho(t *testing.T) {
responder := packetResponder{ responder := packetResponder{
datagramMuxer: muxer, datagramMuxer: muxer,
tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &logger), tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &noopLogger),
serializedIdentity: serializedIdentity, serializedIdentity: serializedIdentity,
} }
@ -153,21 +151,36 @@ func TestTraceICMPRouterEcho(t *testing.T) {
} }
require.NoError(t, router.Request(ctx, &pk, &responder)) require.NoError(t, router.Request(ctx, &pk, &responder))
validateEchoFlow(t, muxer, &pk) validateEchoFlow(t, muxer, &pk)
// Request span
receivedPacket := <-muxer.cfdToEdge receivedPacket := <-muxer.cfdToEdge
tracingSpanPacket, ok := receivedPacket.(*quicpogs.TracingSpanPacket) requestSpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
require.True(t, ok) require.True(t, ok)
require.NotEmpty(t, tracingSpanPacket.Spans) require.NotEmpty(t, requestSpan.Spans)
require.True(t, bytes.Equal(serializedIdentity, tracingSpanPacket.TracingIdentity)) require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
// Reply span
receivedPacket = <-muxer.cfdToEdge
replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
require.True(t, ok)
require.NotEmpty(t, replySpan.Spans)
require.True(t, bytes.Equal(serializedIdentity, replySpan.TracingIdentity))
require.False(t, bytes.Equal(requestSpan.Spans, replySpan.Spans))
echo.Seq++ echo.Seq++
pk.Body = echo pk.Body = echo
// Only first request for a flow is traced. The edge will not send tracing context for the second request // Only first request for a flow is traced. The edge will not send tracing context for the second request
responder = packetResponder{ newResponder := packetResponder{
datagramMuxer: muxer, datagramMuxer: muxer,
} }
require.NoError(t, router.Request(ctx, &pk, &responder)) require.NoError(t, router.Request(ctx, &pk, &newResponder))
validateEchoFlow(t, muxer, &pk) validateEchoFlow(t, muxer, &pk)
select {
case receivedPacket = <-muxer.cfdToEdge:
panic(fmt.Sprintf("Receive unexpected packet %+v", receivedPacket))
default:
}
cancel() cancel()
<-proxyDone <-proxyDone
} }

View File

@ -135,10 +135,13 @@ func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, ra
return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed)) return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed))
} }
// packetResponder should not be used concurrently. This assumption is upheld because reply packets are ready one-by-one
type packetResponder struct { type packetResponder struct {
datagramMuxer muxer datagramMuxer muxer
tracedCtx *tracing.TracedContext tracedCtx *tracing.TracedContext
serializedIdentity []byte serializedIdentity []byte
// hadReply tracks if there has been any reply for this flow
hadReply bool
} }
func (pr *packetResponder) tracingEnabled() bool { func (pr *packetResponder) tracingEnabled() bool {
@ -146,6 +149,7 @@ func (pr *packetResponder) tracingEnabled() bool {
} }
func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error { func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error {
pr.hadReply = true
return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket)) return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket))
} }
@ -159,13 +163,21 @@ func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (co
)) ))
} }
func (pr *packetResponder) replySpan(ctx context.Context, logger *zerolog.Logger) (context.Context, trace.Span) {
if !pr.tracingEnabled() || pr.hadReply {
return ctx, tracing.NewNoopSpan()
}
return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-reply")
}
func (pr *packetResponder) exportSpan() { func (pr *packetResponder) exportSpan() {
if !pr.tracingEnabled() { if !pr.tracingEnabled() {
return return
} }
spans := pr.tracedCtx.GetProtoSpans() spans := pr.tracedCtx.GetProtoSpans()
pr.tracedCtx.ClearSpans()
if len(spans) > 0 { if len(spans) > 0 {
// Make sure spans are cleared after they are sent
defer pr.tracedCtx.ClearSpans()
pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{ pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{
Spans: spans, Spans: spans,
TracingIdentity: pr.serializedIdentity, TracingIdentity: pr.serializedIdentity,