From b01cfc90ad8be8fe272b5303a2f0fc54ccfefa73 Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Tue, 19 Nov 2024 12:44:24 -0800 Subject: [PATCH] TUN-8640: Refactor ICMPRouter to accept new ICMPResponders --- cmd/cloudflared/tunnel/cmd.go | 2 +- cmd/cloudflared/tunnel/configuration.go | 20 ++--- connection/quic_connection.go | 27 ------- connection/quic_connection_test.go | 2 +- connection/quic_datagram_v2.go | 5 +- ingress/icmp_darwin.go | 16 ++-- ingress/icmp_generic.go | 4 +- ingress/icmp_linux.go | 24 +++--- ingress/icmp_posix.go | 25 +++---- ingress/icmp_posix_test.go | 31 +++----- ingress/icmp_windows.go | 14 ++-- ingress/icmp_windows_test.go | 2 +- ingress/origin_icmp_proxy.go | 50 ++++++++++++- ingress/origin_icmp_proxy_test.go | 47 +++++------- ingress/packet_router.go | 98 +++++++++++++------------ ingress/packet_router_test.go | 15 ++-- packet/encoder.go | 6 +- supervisor/supervisor.go | 4 +- supervisor/tunnel.go | 5 +- 19 files changed, 194 insertions(+), 203 deletions(-) diff --git a/cmd/cloudflared/tunnel/cmd.go b/cmd/cloudflared/tunnel/cmd.go index f7482154..ebde041d 100644 --- a/cmd/cloudflared/tunnel/cmd.go +++ b/cmd/cloudflared/tunnel/cmd.go @@ -420,7 +420,7 @@ func StartServer( // Disable ICMP packet routing for quick tunnels if quickTunnelURL != "" { - tunnelConfig.PacketConfig = nil + tunnelConfig.ICMPRouterServer = nil } internalRules := []ingress.Rule{} diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index 727f4f90..4e06fc0b 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -252,11 +252,11 @@ func prepareTunnelConfig( QUICConnectionLevelFlowControlLimit: c.Uint64(quicConnLevelFlowControlLimit), QUICStreamLevelFlowControlLimit: c.Uint64(quicStreamLevelFlowControlLimit), } - packetConfig, err := newPacketConfig(c, log) + icmpRouter, err := newICMPRouter(c, log) if err != nil { log.Warn().Err(err).Msg("ICMP proxy feature is disabled") } else { - tunnelConfig.PacketConfig = packetConfig + tunnelConfig.ICMPRouterServer = icmpRouter } orchestratorConfig := &orchestration.Config{ Ingress: &ingressRules, @@ -351,7 +351,7 @@ func adjustIPVersionByBindAddress(ipVersion allregions.ConfigIPVersion, ip net.I } } -func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRouterConfig, error) { +func newICMPRouter(c *cli.Context, logger *zerolog.Logger) (ingress.ICMPRouterServer, error) { ipv4Src, err := determineICMPv4Src(c.String("icmpv4-src"), logger) if err != nil { return nil, errors.Wrap(err, "failed to determine IPv4 source address for ICMP proxy") @@ -368,16 +368,11 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRou logger.Info().Msgf("ICMP proxy will use %s as source for IPv6", ipv6Src) } - icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, zone, logger, icmpFunnelTimeout) + icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, logger, icmpFunnelTimeout) if err != nil { return nil, err } - return &ingress.GlobalRouterConfig{ - ICMPRouter: icmpRouter, - IPv4Src: ipv4Src, - IPv6Src: ipv6Src, - Zone: zone, - }, nil + return icmpRouter, nil } func determineICMPv4Src(userDefinedSrc string, logger *zerolog.Logger) (netip.Addr, error) { @@ -407,13 +402,12 @@ type interfaceIP struct { func determineICMPv6Src(userDefinedSrc string, logger *zerolog.Logger, ipv4Src netip.Addr) (addr netip.Addr, zone string, err error) { if userDefinedSrc != "" { - userDefinedIP, zone, _ := strings.Cut(userDefinedSrc, "%") - addr, err := netip.ParseAddr(userDefinedIP) + addr, err := netip.ParseAddr(userDefinedSrc) if err != nil { return netip.Addr{}, "", err } if addr.Is6() { - return addr, zone, nil + return addr, addr.Zone(), nil } return netip.Addr{}, "", fmt.Errorf("expect IPv6, but %s is IPv4", userDefinedSrc) } diff --git a/connection/quic_connection.go b/connection/quic_connection.go index d0baab5e..7a20e15a 100644 --- a/connection/quic_connection.go +++ b/connection/quic_connection.go @@ -7,7 +7,6 @@ import ( "io" "net" "net/http" - "net/netip" "strconv" "strings" "sync/atomic" @@ -18,7 +17,6 @@ import ( "github.com/rs/zerolog" "golang.org/x/sync/errgroup" - "github.com/cloudflare/cloudflared/packet" cfdquic "github.com/cloudflare/cloudflared/quic" "github.com/cloudflare/cloudflared/tracing" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" @@ -417,28 +415,3 @@ func (np *nopCloserReadWriter) Close() error { return nil } - -// muxerWrapper wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface -type muxerWrapper struct { - muxer *cfdquic.DatagramMuxerV2 -} - -func (rp *muxerWrapper) SendPacket(dst netip.Addr, pk packet.RawPacket) error { - return rp.muxer.SendPacket(cfdquic.RawPacket(pk)) -} - -func (rp *muxerWrapper) ReceivePacket(ctx context.Context) (packet.RawPacket, error) { - pk, err := rp.muxer.ReceivePacket(ctx) - if err != nil { - return packet.RawPacket{}, err - } - rawPacket, ok := pk.(cfdquic.RawPacket) - if ok { - return packet.RawPacket(rawPacket), nil - } - return packet.RawPacket{}, fmt.Errorf("unexpected packet type %+v", pk) -} - -func (rp *muxerWrapper) Close() error { - return nil -} diff --git a/connection/quic_connection_test.go b/connection/quic_connection_test.go index ba052437..49c14445 100644 --- a/connection/quic_connection_test.go +++ b/connection/quic_connection_test.go @@ -752,7 +752,7 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8) sessionDemuxChan := make(chan *packet.Session, 4) datagramMuxer := cfdquic.NewDatagramMuxerV2(conn, &log, sessionDemuxChan) sessionManager := datagramsession.NewManager(&log, datagramMuxer.SendToSession, sessionDemuxChan) - packetRouter := ingress.NewPacketRouter(nil, datagramMuxer, &log) + packetRouter := ingress.NewPacketRouter(nil, datagramMuxer, 0, &log) datagramConn := &datagramV2Connection{ conn, diff --git a/connection/quic_datagram_v2.go b/connection/quic_datagram_v2.go index 8ffcea60..c6b8bc03 100644 --- a/connection/quic_datagram_v2.go +++ b/connection/quic_datagram_v2.go @@ -54,7 +54,8 @@ type datagramV2Connection struct { func NewDatagramV2Connection(ctx context.Context, conn quic.Connection, - packetConfig *ingress.GlobalRouterConfig, + icmpRouter ingress.ICMPRouter, + index uint8, rpcTimeout time.Duration, streamWriteTimeout time.Duration, logger *zerolog.Logger, @@ -62,7 +63,7 @@ func NewDatagramV2Connection(ctx context.Context, sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity) datagramMuxer := cfdquic.NewDatagramMuxerV2(conn, logger, sessionDemuxChan) sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan) - packetRouter := ingress.NewPacketRouter(packetConfig, datagramMuxer, logger) + packetRouter := ingress.NewPacketRouter(icmpRouter, datagramMuxer, index, logger) return &datagramV2Connection{ conn, diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 4e315f15..cb78fb8e 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -114,8 +114,8 @@ func (snf echoFunnelID) String() string { return strconv.FormatUint(uint64(snf), 10) } -func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { - conn, err := newICMPConn(listenIP, zone) +func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { + conn, err := newICMPConn(listenIP) if err != nil { return nil, err } @@ -130,9 +130,9 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle }, nil } -func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { - _, span := responder.requestSpan(ctx, pk) - defer responder.exportSpan() +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error { + _, span := responder.RequestSpan(ctx, pk) + defer responder.ExportSpan() originalEcho, err := getICMPEcho(pk.Message) if err != nil { @@ -154,7 +154,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa } span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID))) - shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID) + shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder, pk, originalEcho.ID) newFunnelFunc := func() (packet.Funnel, error) { originalEcho, err := getICMPEcho(pk.Message) if err != nil { @@ -265,8 +265,8 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error { return err } - _, span := icmpFlow.responder.replySpan(ctx, ip.logger) - defer icmpFlow.responder.exportSpan() + _, span := icmpFlow.responder.ReplySpan(ctx, ip.logger) + defer icmpFlow.responder.ExportSpan() if err := icmpFlow.returnToSrc(reply); err != nil { tracing.EndWithErrorStatus(span, err) diff --git a/ingress/icmp_generic.go b/ingress/icmp_generic.go index 88e2581d..4964244a 100644 --- a/ingress/icmp_generic.go +++ b/ingress/icmp_generic.go @@ -18,7 +18,7 @@ var errICMPProxyNotImplemented = fmt.Errorf("ICMP proxy is not implemented on %s type icmpProxy struct{} -func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { +func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error { return errICMPProxyNotImplemented } @@ -26,6 +26,6 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { return errICMPProxyNotImplemented } -func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { +func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { return nil, errICMPProxyNotImplemented } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index 829a000a..1ecda7ea 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -37,25 +37,23 @@ var ( type icmpProxy struct { srcFunnelTracker *packet.FunnelTracker listenIP netip.Addr - ipv6Zone string logger *zerolog.Logger idleTimeout time.Duration } -func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { - if err := testPermission(listenIP, zone, logger); err != nil { +func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { + if err := testPermission(listenIP, logger); err != nil { return nil, err } return &icmpProxy{ srcFunnelTracker: packet.NewFunnelTracker(), listenIP: listenIP, - ipv6Zone: zone, logger: logger, idleTimeout: idleTimeout, }, nil } -func testPermission(listenIP netip.Addr, zone string, logger *zerolog.Logger) error { +func testPermission(listenIP netip.Addr, logger *zerolog.Logger) error { // Opens a non-privileged ICMP socket. On Linux the group ID of the process needs to be in ping_group_range // Only check ping_group_range once for IPv4 if listenIP.Is4() { @@ -64,7 +62,7 @@ func testPermission(listenIP netip.Addr, zone string, logger *zerolog.Logger) er return err } } - conn, err := newICMPConn(listenIP, zone) + conn, err := newICMPConn(listenIP) if err != nil { return err } @@ -98,9 +96,9 @@ func checkInPingGroup() error { return fmt.Errorf("did not find group range in %s", pingGroupPath) } -func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { - ctx, span := responder.requestSpan(ctx, pk) - defer responder.exportSpan() +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error { + ctx, span := responder.RequestSpan(ctx, pk) + defer responder.ExportSpan() originalEcho, err := getICMPEcho(pk.Message) if err != nil { @@ -109,9 +107,9 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa } observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq) - shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID) + shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder, pk, originalEcho.ID) newFunnelFunc := func() (packet.Funnel, error) { - conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone) + conn, err := newICMPConn(ip.listenIP) if err != nil { tracing.EndWithErrorStatus(span, err) return nil, errors.Wrap(err, "failed to open ICMP socket") @@ -181,8 +179,8 @@ func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) { // Listens for ICMP response and handles error logging func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (done bool) { - _, span := flow.responder.replySpan(ctx, ip.logger) - defer flow.responder.exportSpan() + _, span := flow.responder.ReplySpan(ctx, ip.logger) + defer flow.responder.ExportSpan() span.SetAttributes( attribute.Int("originalEchoID", flow.originalEchoID), diff --git a/ingress/icmp_posix.go b/ingress/icmp_posix.go index b03be49e..9c170077 100644 --- a/ingress/icmp_posix.go +++ b/ingress/icmp_posix.go @@ -18,15 +18,11 @@ import ( ) // Opens a non-privileged ICMP socket on Linux and Darwin -func newICMPConn(listenIP netip.Addr, zone string) (*icmp.PacketConn, error) { +func newICMPConn(listenIP netip.Addr) (*icmp.PacketConn, error) { if listenIP.Is4() { return icmp.ListenPacket("udp4", listenIP.String()) } - listenAddr := listenIP.String() - if zone != "" { - listenAddr = listenAddr + "%" + zone - } - return icmp.ListenPacket("udp6", listenAddr) + return icmp.ListenPacket("udp6", listenIP.String()) } func netipAddr(addr net.Addr) (netip.Addr, bool) { @@ -34,7 +30,8 @@ func netipAddr(addr net.Addr) (netip.Addr, bool) { if !ok { return netip.Addr{}, false } - return netip.AddrFromSlice(udpAddr.IP) + + return udpAddr.AddrPort().Addr(), true } type flow3Tuple struct { @@ -50,14 +47,14 @@ type icmpEchoFlow struct { closed *atomic.Bool src netip.Addr originConn *icmp.PacketConn - responder *packetResponder + responder ICMPResponder assignedEchoID int originalEchoID int // it's up to the user to ensure respEncoder is not used concurrently respEncoder *packet.Encoder } -func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder *packetResponder, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow { +func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder ICMPResponder, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow { return &icmpEchoFlow{ ActivityTracker: packet.NewActivityTracker(), closeCallback: closeCallback, @@ -139,11 +136,7 @@ func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error { }, Message: reply.msg, } - serializedPacket, err := ief.respEncoder.Encode(&pk) - if err != nil { - return err - } - return ief.responder.returnPacket(serializedPacket) + return ief.responder.ReturnPacket(&pk) } type echoReply struct { @@ -184,7 +177,7 @@ func toICMPEchoFlow(funnel packet.Funnel) (*icmpEchoFlow, error) { return icmpFlow, nil } -func createShouldReplaceFunnelFunc(logger *zerolog.Logger, muxer muxer, pk *packet.ICMP, originalEchoID int) func(packet.Funnel) bool { +func createShouldReplaceFunnelFunc(logger *zerolog.Logger, responder ICMPResponder, pk *packet.ICMP, originalEchoID int) func(packet.Funnel) bool { return func(existing packet.Funnel) bool { existingFlow, err := toICMPEchoFlow(existing) if err != nil { @@ -199,7 +192,7 @@ func createShouldReplaceFunnelFunc(logger *zerolog.Logger, muxer muxer, pk *pack // If the existing flow has a different muxer, there's a new quic connection where return packets should be // routed. Otherwise, return packets will be send to the first observed incoming connection, rather than the // most recently observed connection. - if existingFlow.responder.datagramMuxer != muxer { + if existingFlow.responder.ConnectionID() != responder.ConnectionID() { logger.Debug(). Str("src", pk.Src.String()). Str("dst", pk.Dst.String()). diff --git a/ingress/icmp_posix_test.go b/ingress/icmp_posix_test.go index ea8dc7e9..6231e1b9 100644 --- a/ingress/icmp_posix_test.go +++ b/ingress/icmp_posix_test.go @@ -27,7 +27,7 @@ func TestFunnelIdleTimeout(t *testing.T) { startSeq = 8129 ) logger := zerolog.New(os.Stderr) - proxy, err := newICMPProxy(localhostIP, "", &logger, idleTimeout) + proxy, err := newICMPProxy(localhostIP, &logger, idleTimeout) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -56,24 +56,19 @@ func TestFunnelIdleTimeout(t *testing.T) { }, } muxer := newMockMuxer(0) - responder := packetResponder{ - datagramMuxer: muxer, - } - require.NoError(t, proxy.Request(ctx, &pk, &responder)) + responder := newPacketResponder(muxer, 0, packet.NewEncoder()) + require.NoError(t, proxy.Request(ctx, &pk, responder)) validateEchoFlow(t, <-muxer.cfdToEdge, &pk) // Send second request, should reuse the funnel - require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{ - datagramMuxer: muxer, - })) + require.NoError(t, proxy.Request(ctx, &pk, responder)) validateEchoFlow(t, <-muxer.cfdToEdge, &pk) + // New muxer on a different connection should use a new flow time.Sleep(idleTimeout * 2) newMuxer := newMockMuxer(0) - newResponder := packetResponder{ - datagramMuxer: newMuxer, - } - require.NoError(t, proxy.Request(ctx, &pk, &newResponder)) + newResponder := newPacketResponder(newMuxer, 1, packet.NewEncoder()) + require.NoError(t, proxy.Request(ctx, &pk, newResponder)) validateEchoFlow(t, <-newMuxer.cfdToEdge, &pk) time.Sleep(idleTimeout * 2) @@ -90,7 +85,7 @@ func TestReuseFunnel(t *testing.T) { startSeq = 8129 ) logger := zerolog.New(os.Stderr) - proxy, err := newICMPProxy(localhostIP, "", &logger, idleTimeout) + proxy, err := newICMPProxy(localhostIP, &logger, idleTimeout) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -124,18 +119,14 @@ func TestReuseFunnel(t *testing.T) { originalEchoID: echoID, } muxer := newMockMuxer(0) - responder := packetResponder{ - datagramMuxer: muxer, - } - require.NoError(t, proxy.Request(ctx, &pk, &responder)) + responder := newPacketResponder(muxer, 0, packet.NewEncoder()) + require.NoError(t, proxy.Request(ctx, &pk, responder)) validateEchoFlow(t, <-muxer.cfdToEdge, &pk) funnel1, found := getFunnel(t, proxy, tuple) require.True(t, found) // Send second request, should reuse the funnel - require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{ - datagramMuxer: muxer, - })) + require.NoError(t, proxy.Request(ctx, &pk, responder)) validateEchoFlow(t, <-muxer.cfdToEdge, &pk) funnel2, found := getFunnel(t, proxy, tuple) require.True(t, found) diff --git a/ingress/icmp_windows.go b/ingress/icmp_windows.go index 19604ee4..b7801212 100644 --- a/ingress/icmp_windows.go +++ b/ingress/icmp_windows.go @@ -226,7 +226,7 @@ type icmpProxy struct { encoderPool sync.Pool } -func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { +func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) { var ( srcSocketAddr *sockAddrIn6 handle uintptr @@ -267,15 +267,15 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { // Request sends an ICMP echo request and wait for a reply or timeout. // The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version. // It's possible that a slow request will block other requests, so we set the timeout to only 1s. -func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error { defer func() { if r := recover(); r != nil { ip.logger.Error().Interface("error", r).Msgf("Recover panic from sending icmp request/response, error %s", debug.Stack()) } }() - _, requestSpan := responder.requestSpan(ctx, pk) - defer responder.exportSpan() + _, requestSpan := responder.RequestSpan(ctx, pk) + defer responder.ExportSpan() echo, err := getICMPEcho(pk.Message) if err != nil { @@ -290,9 +290,9 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa return err } tracing.End(requestSpan) - responder.exportSpan() + responder.ExportSpan() - _, replySpan := responder.replySpan(ctx, ip.logger) + _, replySpan := responder.ReplySpan(ctx, ip.logger) err = ip.handleEchoReply(pk, echo, resp, responder) if err != nil { ip.logger.Err(err).Msg("Failed to send ICMP reply") @@ -308,7 +308,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa return nil } -func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, resp echoResp, responder *packetResponder) error { +func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, resp echoResp, responder ICMPResponder) error { var replyType icmp.Type if request.Dst.Is4() { replyType = ipv4.ICMPTypeEchoReply diff --git a/ingress/icmp_windows_test.go b/ingress/icmp_windows_test.go index 8d7ad6b3..fa102442 100644 --- a/ingress/icmp_windows_test.go +++ b/ingress/icmp_windows_test.go @@ -132,7 +132,7 @@ func TestSendEchoErrors(t *testing.T) { } func testSendEchoErrors(t *testing.T, listenIP netip.Addr) { - proxy, err := newICMPProxy(listenIP, "", &noopLogger, time.Second) + proxy, err := newICMPProxy(listenIP, &noopLogger, time.Second) require.NoError(t, err) echo := icmp.Echo{ diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index 3eb0a6b3..d25fdc99 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -14,6 +14,7 @@ import ( "golang.org/x/net/ipv6" "github.com/cloudflare/cloudflared/packet" + "github.com/cloudflare/cloudflared/tracing" ) const ( @@ -26,17 +27,46 @@ var ( errPacketNil = fmt.Errorf("packet is nil") ) +// ICMPRouterServer is a parent interface over-top of ICMPRouter that allows for the operation of the proxy origin listeners. +type ICMPRouterServer interface { + ICMPRouter + // Serve runs the ICMPRouter proxy origin listeners for any of the IPv4 or IPv6 interfaces configured. + Serve(ctx context.Context) error +} + +// ICMPRouter manages out-going ICMP requests towards the origin. +type ICMPRouter interface { + // Request will send an ICMP packet towards the origin with an ICMPResponder to attach to the ICMP flow for the + // response to utilize. + Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error + // ConvertToTTLExceeded will take an ICMP packet and create a ICMP TTL Exceeded response origininating from the + // ICMPRouter's IP interface. + ConvertToTTLExceeded(pk *packet.ICMP, rawPacket packet.RawPacket) *packet.ICMP +} + +// ICMPResponder manages how to handle incoming ICMP messages coming from the origin to the edge. +type ICMPResponder interface { + ConnectionID() uint8 + ReturnPacket(pk *packet.ICMP) error + AddTraceContext(tracedCtx *tracing.TracedContext, serializedIdentity []byte) + RequestSpan(ctx context.Context, pk *packet.ICMP) (context.Context, trace.Span) + ReplySpan(ctx context.Context, logger *zerolog.Logger) (context.Context, trace.Span) + ExportSpan() +} + type icmpRouter struct { ipv4Proxy *icmpProxy + ipv4Src netip.Addr ipv6Proxy *icmpProxy + ipv6Src netip.Addr } // NewICMPRouter doesn't return an error if either ipv4 proxy or ipv6 proxy can be created. The machine might only // support one of them. // funnelIdleTimeout controls how long to wait to close a funnel without send/return -func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerolog.Logger, funnelIdleTimeout time.Duration) (*icmpRouter, error) { - ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout) - ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout) +func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, logger *zerolog.Logger, funnelIdleTimeout time.Duration) (ICMPRouterServer, error) { + ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, logger, funnelIdleTimeout) + ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, logger, funnelIdleTimeout) if ipv4Err != nil && ipv6Err != nil { err := fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err) logger.Debug().Err(err).Msg("ICMP proxy feature is disabled") @@ -52,7 +82,9 @@ func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerol } return &icmpRouter{ ipv4Proxy: ipv4Proxy, + ipv4Src: ipv4Addr, ipv6Proxy: ipv6Proxy, + ipv6Src: ipv6Addr, }, nil } @@ -76,7 +108,7 @@ func (ir *icmpRouter) Serve(ctx context.Context) error { return fmt.Errorf("ICMPv4 proxy and ICMPv6 proxy are both nil") } -func (ir *icmpRouter) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { +func (ir *icmpRouter) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error { if pk == nil { return errPacketNil } @@ -92,6 +124,16 @@ func (ir *icmpRouter) Request(ctx context.Context, pk *packet.ICMP, responder *p return fmt.Errorf("ICMPv6 proxy was not instantiated") } +func (ir *icmpRouter) ConvertToTTLExceeded(pk *packet.ICMP, rawPacket packet.RawPacket) *packet.ICMP { + var srcIP netip.Addr + if pk.Dst.Is4() { + srcIP = ir.ipv4Src + } else { + srcIP = ir.ipv6Src + } + return packet.NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP) +} + func getICMPEcho(msg *icmp.Message) (*icmp.Echo, error) { echo, ok := msg.Body.(*icmp.Echo) if !ok { diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index 6c7b2f8c..b85013be 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -50,7 +50,7 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { endSeq = 20 ) - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) + router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -61,9 +61,7 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { }() muxer := newMockMuxer(1) - responder := packetResponder{ - datagramMuxer: muxer, - } + responder := newPacketResponder(muxer, 0, packet.NewEncoder()) protocol := layers.IPProtocolICMPv6 if sendIPv4 { @@ -98,7 +96,7 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { }, }, } - require.NoError(t, router.Request(ctx, &pk, &responder)) + require.NoError(t, router.Request(ctx, &pk, responder)) validateEchoFlow(t, <-muxer.cfdToEdge, &pk) } } @@ -114,7 +112,7 @@ func TestTraceICMPRouterEcho(t *testing.T) { tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1" - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) + router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -131,11 +129,8 @@ func TestTraceICMPRouterEcho(t *testing.T) { serializedIdentity, err := tracingIdentity.MarshalBinary() require.NoError(t, err) - responder := packetResponder{ - datagramMuxer: muxer, - tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &noopLogger), - serializedIdentity: serializedIdentity, - } + responder := newPacketResponder(muxer, 0, packet.NewEncoder()) + responder.AddTraceContext(tracing.NewTracedContext(ctx, tracingIdentity.String(), &noopLogger), serializedIdentity) echo := &icmp.Echo{ ID: 12910, @@ -156,7 +151,7 @@ func TestTraceICMPRouterEcho(t *testing.T) { }, } - require.NoError(t, router.Request(ctx, &pk, &responder)) + require.NoError(t, router.Request(ctx, &pk, responder)) firstPK := <-muxer.cfdToEdge var requestSpan *quicpogs.TracingSpanPacket // The order of receiving reply or request span is not deterministic @@ -194,10 +189,8 @@ func TestTraceICMPRouterEcho(t *testing.T) { echo.Seq++ pk.Body = echo // Only first request for a flow is traced. The edge will not send tracing context for the second request - newResponder := packetResponder{ - datagramMuxer: muxer, - } - require.NoError(t, router.Request(ctx, &pk, &newResponder)) + newResponder := newPacketResponder(muxer, 0, packet.NewEncoder()) + require.NoError(t, router.Request(ctx, &pk, newResponder)) validateEchoFlow(t, <-muxer.cfdToEdge, &pk) select { @@ -221,7 +214,7 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { endSeq = 5 ) - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) + router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -240,9 +233,7 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { defer wg.Done() muxer := newMockMuxer(1) - responder := packetResponder{ - datagramMuxer: muxer, - } + responder := newPacketResponder(muxer, 0, packet.NewEncoder()) for seq := 0; seq < endSeq; seq++ { pk := &packet.ICMP{ IP: &packet.IP{ @@ -261,16 +252,14 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, }, } - require.NoError(t, router.Request(ctx, pk, &responder)) + require.NoError(t, router.Request(ctx, pk, responder)) validateEchoFlow(t, <-muxer.cfdToEdge, pk) } }() go func() { defer wg.Done() muxer := newMockMuxer(1) - responder := packetResponder{ - datagramMuxer: muxer, - } + responder := newPacketResponder(muxer, 0, packet.NewEncoder()) for seq := 0; seq < endSeq; seq++ { pk := &packet.ICMP{ IP: &packet.IP{ @@ -289,7 +278,7 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, }, } - require.NoError(t, router.Request(ctx, pk, &responder)) + require.NoError(t, router.Request(ctx, pk, responder)) validateEchoFlow(t, <-muxer.cfdToEdge, pk) } }() @@ -358,13 +347,11 @@ func TestICMPRouterRejectNotEcho(t *testing.T) { } func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp.Message) { - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) + router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) muxer := newMockMuxer(1) - responder := packetResponder{ - datagramMuxer: muxer, - } + responder := newPacketResponder(muxer, 0, packet.NewEncoder()) protocol := layers.IPProtocolICMPv4 if srcDstIP.Is6() { protocol = layers.IPProtocolICMPv6 @@ -379,7 +366,7 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp. }, Message: &m, } - require.Error(t, router.Request(context.Background(), &pk, &responder)) + require.Error(t, router.Request(context.Background(), &pk, responder)) } } diff --git a/ingress/packet_router.go b/ingress/packet_router.go index 1e15163a..6cadeaef 100644 --- a/ingress/packet_router.go +++ b/ingress/packet_router.go @@ -3,7 +3,6 @@ package ingress import ( "context" "fmt" - "net/netip" "github.com/rs/zerolog" "go.opentelemetry.io/otel/attribute" @@ -23,29 +22,23 @@ type muxer interface { // PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets type PacketRouter struct { - globalConfig *GlobalRouterConfig - muxer muxer - logger *zerolog.Logger - icmpDecoder *packet.ICMPDecoder - encoder *packet.Encoder -} - -// GlobalRouterConfig is the configuration shared by all instance of Router. -type GlobalRouterConfig struct { - ICMPRouter *icmpRouter - IPv4Src netip.Addr - IPv6Src netip.Addr - Zone string + icmpRouter ICMPRouter + muxer muxer + connID uint8 + logger *zerolog.Logger + encoder *packet.Encoder + decoder *packet.ICMPDecoder } // NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil. -func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger) *PacketRouter { +func NewPacketRouter(icmpRouter ICMPRouter, muxer muxer, connIndex uint8, logger *zerolog.Logger) *PacketRouter { return &PacketRouter{ - globalConfig: globalConfig, - muxer: muxer, - logger: logger, - icmpDecoder: packet.NewICMPDecoder(), - encoder: packet.NewEncoder(), + icmpRouter: icmpRouter, + muxer: muxer, + connID: connIndex, + logger: logger, + encoder: packet.NewEncoder(), + decoder: packet.NewICMPDecoder(), } } @@ -59,14 +52,13 @@ func (r *PacketRouter) Serve(ctx context.Context) error { } } -func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packetResponder, error) { +func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, ICMPResponder, error) { pk, err := r.muxer.ReceivePacket(ctx) if err != nil { return packet.RawPacket{}, nil, err } - responder := &packetResponder{ - datagramMuxer: r.muxer, - } + responder := newPacketResponder(r.muxer, r.connID, packet.NewEncoder()) + switch pk.Type() { case quicpogs.DatagramTypeIP: return packet.RawPacket{Data: pk.Payload()}, responder, nil @@ -75,8 +67,8 @@ func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packe if err := identity.UnmarshalBinary(pk.Metadata()); err != nil { r.logger.Err(err).Bytes("tracingIdentity", pk.Metadata()).Msg("Failed to unmarshal tracing identity") } else { - responder.tracedCtx = tracing.NewTracedContext(ctx, identity.String(), r.logger) - responder.serializedIdentity = pk.Metadata() + tracedCtx := tracing.NewTracedContext(ctx, identity.String(), r.logger) + responder.AddTraceContext(tracedCtx, pk.Metadata()) } return packet.RawPacket{Data: pk.Payload()}, responder, nil default: @@ -84,27 +76,27 @@ func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packe } } -func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPacket, responder *packetResponder) { +func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPacket, responder ICMPResponder) { // ICMP Proxy feature is disabled, drop packets - if r.globalConfig == nil { + if r.icmpRouter == nil { return } - icmpPacket, err := r.icmpDecoder.Decode(rawPacket) + icmpPacket, err := r.decoder.Decode(rawPacket) if err != nil { r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram") return } if icmpPacket.TTL <= 1 { - if err := r.sendTTLExceedMsg(ctx, icmpPacket, rawPacket, r.encoder); err != nil { + if err := r.sendTTLExceedMsg(icmpPacket, rawPacket); err != nil { r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error") } return } icmpPacket.TTL-- - if err := r.globalConfig.ICMPRouter.Request(ctx, icmpPacket, responder); err != nil { + if err := r.icmpRouter.Request(ctx, icmpPacket, responder); err != nil { r.logger.Err(err). Str("src", icmpPacket.Src.String()). Str("dst", icmpPacket.Dst.String()). @@ -113,16 +105,9 @@ func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPac } } -func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, rawPacket packet.RawPacket, encoder *packet.Encoder) error { - var srcIP netip.Addr - if pk.Dst.Is4() { - srcIP = r.globalConfig.IPv4Src - } else { - srcIP = r.globalConfig.IPv6Src - } - ttlExceedPacket := packet.NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP) - - encodedTTLExceed, err := encoder.Encode(ttlExceedPacket) +func (r *PacketRouter) sendTTLExceedMsg(pk *packet.ICMP, rawPacket packet.RawPacket) error { + icmpTTLPacket := r.icmpRouter.ConvertToTTLExceeded(pk, rawPacket) + encodedTTLExceed, err := r.encoder.Encode(icmpTTLPacket) if err != nil { return err } @@ -132,22 +117,45 @@ func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, ra // packetResponder should not be used concurrently. This assumption is upheld because reply packets are ready one-by-one type packetResponder struct { datagramMuxer muxer + connID uint8 + encoder *packet.Encoder tracedCtx *tracing.TracedContext serializedIdentity []byte // hadReply tracks if there has been any reply for this flow hadReply bool } +func newPacketResponder(datagramMuxer muxer, connID uint8, encoder *packet.Encoder) ICMPResponder { + return &packetResponder{ + datagramMuxer: datagramMuxer, + connID: connID, + encoder: encoder, + } +} + func (pr *packetResponder) tracingEnabled() bool { return pr.tracedCtx != nil } -func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error { +func (pr *packetResponder) ConnectionID() uint8 { + return pr.connID +} + +func (pr *packetResponder) ReturnPacket(pk *packet.ICMP) error { + rawPacket, err := pr.encoder.Encode(pk) + if err != nil { + return err + } pr.hadReply = true return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket)) } -func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (context.Context, trace.Span) { +func (pr *packetResponder) AddTraceContext(tracedCtx *tracing.TracedContext, serializedIdentity []byte) { + pr.tracedCtx = tracedCtx + pr.serializedIdentity = serializedIdentity +} + +func (pr *packetResponder) RequestSpan(ctx context.Context, pk *packet.ICMP) (context.Context, trace.Span) { if !pr.tracingEnabled() { return ctx, tracing.NewNoopSpan() } @@ -157,14 +165,14 @@ func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (co )) } -func (pr *packetResponder) replySpan(ctx context.Context, logger *zerolog.Logger) (context.Context, trace.Span) { +func (pr *packetResponder) ReplySpan(ctx context.Context, logger *zerolog.Logger) (context.Context, trace.Span) { if !pr.tracingEnabled() || pr.hadReply { return ctx, tracing.NewNoopSpan() } return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-reply") } -func (pr *packetResponder) exportSpan() { +func (pr *packetResponder) ExportSpan() { if !pr.tracingEnabled() { return } diff --git a/ingress/packet_router_test.go b/ingress/packet_router_test.go index 403a2274..77ab5e89 100644 --- a/ingress/packet_router_test.go +++ b/ingress/packet_router_test.go @@ -19,16 +19,17 @@ import ( ) var ( - packetConfig = &GlobalRouterConfig{ - ICMPRouter: nil, - IPv4Src: netip.MustParseAddr("172.16.0.1"), - IPv6Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"), + defaultRouter = &icmpRouter{ + ipv4Proxy: nil, + ipv4Src: netip.MustParseAddr("172.16.0.1"), + ipv6Proxy: nil, + ipv6Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"), } ) func TestRouterReturnTTLExceed(t *testing.T) { muxer := newMockMuxer(0) - router := NewPacketRouter(packetConfig, muxer, &noopLogger) + router := NewPacketRouter(defaultRouter, muxer, 0, &noopLogger) ctx, cancel := context.WithCancel(context.Background()) routerStopped := make(chan struct{}) go func() { @@ -53,7 +54,7 @@ func TestRouterReturnTTLExceed(t *testing.T) { }, }, } - assertTTLExceed(t, &pk, router.globalConfig.IPv4Src, muxer) + assertTTLExceed(t, &pk, defaultRouter.ipv4Src, muxer) pk = packet.ICMP{ IP: &packet.IP{ Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"), @@ -71,7 +72,7 @@ func TestRouterReturnTTLExceed(t *testing.T) { }, }, } - assertTTLExceed(t, &pk, router.globalConfig.IPv6Src, muxer) + assertTTLExceed(t, &pk, defaultRouter.ipv6Src, muxer) cancel() <-routerStopped diff --git a/packet/encoder.go b/packet/encoder.go index 906d2b6d..09859f48 100644 --- a/packet/encoder.go +++ b/packet/encoder.go @@ -1,6 +1,8 @@ package packet -import "github.com/google/gopacket" +import ( + "github.com/google/gopacket" +) var ( serializeOpts = gopacket.SerializeOptions{ @@ -25,7 +27,7 @@ func NewEncoder() *Encoder { } } -func (e Encoder) Encode(packet Packet) (RawPacket, error) { +func (e *Encoder) Encode(packet Packet) (RawPacket, error) { encodedLayers, err := packet.EncodeLayers() if err != nil { return RawPacket{}, err diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index d4a75e77..8736963c 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -119,9 +119,9 @@ func (s *Supervisor) Run( ctx context.Context, connectedSignal *signal.Signal, ) error { - if s.config.PacketConfig != nil { + if s.config.ICMPRouterServer != nil { go func() { - if err := s.config.PacketConfig.ICMPRouter.Serve(ctx); err != nil { + if err := s.config.ICMPRouterServer.Serve(ctx); err != nil { if errors.Is(err, net.ErrClosed) { s.log.Logger().Info().Err(err).Msg("icmp router terminated") } else { diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 6a456ca6..c5ec9978 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -63,7 +63,7 @@ type TunnelConfig struct { NamedTunnel *connection.TunnelProperties ProtocolSelector connection.ProtocolSelector EdgeTLSConfigs map[connection.Protocol]*tls.Config - PacketConfig *ingress.GlobalRouterConfig + ICMPRouterServer ingress.ICMPRouterServer RPCTimeout time.Duration WriteStreamTimeout time.Duration @@ -615,7 +615,8 @@ func (e *EdgeTunnelServer) serveQUIC( datagramSessionManager = connection.NewDatagramV2Connection( ctx, conn, - e.config.PacketConfig, + e.config.ICMPRouterServer, + connIndex, e.config.RPCTimeout, e.config.WriteStreamTimeout, connLogger.Logger(),