From 495f9fb8bdbaf2a996c0a5e412b44ef6dff6d4d2 Mon Sep 17 00:00:00 2001 From: cthuang Date: Thu, 13 Oct 2022 11:01:25 +0100 Subject: [PATCH] TUN-6856: Refactor to lay foundation for tracing ICMP Remove send and return methods from Funnel interface. Users of Funnel can provide their own send and return methods without wrapper to comply with the interface. Move packet router to ingress package to avoid circular dependency --- cmd/cloudflared/tunnel/configuration.go | 5 +- connection/quic.go | 7 +- ingress/icmp_darwin.go | 36 +---- ingress/icmp_generic.go | 2 +- ingress/icmp_linux.go | 27 +--- ingress/icmp_posix.go | 46 +++++- ingress/icmp_posix_test.go | 26 ++-- ingress/icmp_windows.go | 6 +- ingress/origin_icmp_proxy.go | 13 +- ingress/origin_icmp_proxy_test.go | 46 +++--- ingress/packet_router.go | 134 +++++++++++++++++ .../packet_router_test.go | 135 ++++++++++-------- packet/funnel.go | 65 +-------- packet/router.go | 108 -------------- supervisor/tunnel.go | 4 +- 15 files changed, 323 insertions(+), 337 deletions(-) create mode 100644 ingress/packet_router.go rename packet/router_test.go => ingress/packet_router_test.go (57%) delete mode 100644 packet/router.go diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index 9f031641..ce628168 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -22,7 +22,6 @@ import ( "github.com/cloudflare/cloudflared/cmd/cloudflared/cliutil" "github.com/cloudflare/cloudflared/edgediscovery/allregions" - "github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/config" "github.com/cloudflare/cloudflared/connection" @@ -463,7 +462,7 @@ func parseConfigIPVersion(version string) (v allregions.ConfigIPVersion, err err return } -func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRouterConfig, error) { +func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRouterConfig, error) { ipv4Src, err := determineICMPv4Src(c.String("icmpv4-src"), logger) if err != nil { return nil, errors.Wrap(err, "failed to determine IPv4 source address for ICMP proxy") @@ -484,7 +483,7 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRout if err != nil { return nil, err } - return &packet.GlobalRouterConfig{ + return &ingress.GlobalRouterConfig{ ICMPRouter: icmpRouter, IPv4Src: ipv4Src, IPv6Src: ipv6Src, diff --git a/connection/quic.go b/connection/quic.go index 4eb405c7..645daf31 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -57,7 +57,7 @@ type QUICConnection struct { sessionManager datagramsession.Manager // datagramMuxer mux/demux datagrams from quic connection datagramMuxer *quicpogs.DatagramMuxerV2 - packetRouter *packet.Router + packetRouter *ingress.PacketRouter controlStreamHandler ControlStreamHandler connOptions *tunnelpogs.ConnectionOptions } @@ -72,7 +72,7 @@ func NewQUICConnection( connOptions *tunnelpogs.ConnectionOptions, controlStreamHandler ControlStreamHandler, logger *zerolog.Logger, - packetRouterConfig *packet.GlobalRouterConfig, + packetRouterConfig *ingress.GlobalRouterConfig, ) (*QUICConnection, error) { udpConn, err := createUDPConnForConnIndex(connIndex, logger) if err != nil { @@ -93,8 +93,7 @@ func NewQUICConnection( sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity) datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan) sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan) - muxer := muxerWrapper{muxer: datagramMuxer} - packetRouter := packet.NewRouter(packetRouterConfig, &muxer, &muxer, logger, orchestrator.WarpRoutingEnabled) + packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger, orchestrator.WarpRoutingEnabled) return &QUICConnection{ session: session, diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 2a57b415..caa67bb4 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -11,7 +11,6 @@ import ( "context" "fmt" "math" - "net" "net/netip" "strconv" "sync" @@ -129,10 +128,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle }, nil } -func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { - if pk == nil { - return errPacketNil - } +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { originalEcho, err := getICMPEcho(pk.Message) if err != nil { return err @@ -152,13 +148,11 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er if err != nil { return nil, err } - originSender := originSender{ - conn: ip.conn, - echoIDTracker: ip.echoIDTracker, - echoIDTrackerKey: echoIDTrackerKey, - assignedEchoID: assignedEchoID, + closeCallback := func() error { + ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID) + return nil } - icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, int(assignedEchoID), originalEcho.ID, ip.encoder) + icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID, ip.encoder) return icmpFlow, nil } funnelID := echoFunnelID(assignedEchoID) @@ -250,23 +244,3 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error { } return icmpFlow.returnToSrc(reply) } - -// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface -type originSender struct { - conn *icmp.PacketConn - echoIDTracker *echoIDTracker - echoIDTrackerKey flow3Tuple - assignedEchoID uint16 -} - -func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error { - _, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{ - IP: dst.AsSlice(), - }) - return err -} - -func (os *originSender) Close() error { - os.echoIDTracker.release(os.echoIDTrackerKey, os.assignedEchoID) - return nil -} diff --git a/ingress/icmp_generic.go b/ingress/icmp_generic.go index e1c66e81..88e2581d 100644 --- a/ingress/icmp_generic.go +++ b/ingress/icmp_generic.go @@ -18,7 +18,7 @@ var errICMPProxyNotImplemented = fmt.Errorf("ICMP proxy is not implemented on %s type icmpProxy struct{} -func (ip icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { +func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { return errICMPProxyNotImplemented } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index 10ef3e76..a1936ad3 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -97,10 +97,7 @@ func checkInPingGroup() error { return fmt.Errorf("did not find group range in %s", pingGroupPath) } -func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { - if pk == nil { - return errPacketNil - } +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { originalEcho, err := getICMPEcho(pk.Message) if err != nil { return err @@ -113,13 +110,15 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er } ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr()) newConnChan <- conn + closeCallback := func() error { + return conn.Close() + } localUDPAddr, ok := conn.LocalAddr().(*net.UDPAddr) if !ok { return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr()) } - originSender := originSender{conn: conn} echoID := localUDPAddr.Port - icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, echoID, originalEcho.ID, packet.NewEncoder()) + icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder()) return icmpFlow, nil } funnelID := flow3Tuple{ @@ -187,22 +186,6 @@ func (ip *icmpProxy) listenResponse(flow *icmpEchoFlow, conn *icmp.PacketConn) e } } -// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface -type originSender struct { - conn *icmp.PacketConn -} - -func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error { - _, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{ - IP: dst.AsSlice(), - }) - return err -} - -func (os *originSender) Close() error { - return os.conn.Close() -} - // Only linux uses flow3Tuple as FunnelID func (ft flow3Tuple) Type() string { return "srcIP_dstIP_echoID" diff --git a/ingress/icmp_posix.go b/ingress/icmp_posix.go index 0badb685..95962667 100644 --- a/ingress/icmp_posix.go +++ b/ingress/icmp_posix.go @@ -43,24 +43,54 @@ type flow3Tuple struct { // icmpEchoFlow implements the packet.Funnel interface. type icmpEchoFlow struct { - *packet.RawPacketFunnel + *packet.ActivityTracker + closeCallback func() error + src netip.Addr + originConn *icmp.PacketConn + responder *packetResponder assignedEchoID int originalEchoID int // it's up to the user to ensure respEncoder is not used concurrently respEncoder *packet.Encoder } -func newICMPEchoFlow(src netip.Addr, sendPipe, returnPipe packet.FunnelUniPipe, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow { +func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder *packetResponder, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow { return &icmpEchoFlow{ - RawPacketFunnel: packet.NewRawPacketFunnel(src, sendPipe, returnPipe), + ActivityTracker: packet.NewActivityTracker(), + closeCallback: closeCallback, + src: src, + originConn: originConn, + responder: responder, assignedEchoID: assignedEchoID, originalEchoID: originalEchoID, respEncoder: respEncoder, } } +func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool { + otherICMPFlow, ok := other.(*icmpEchoFlow) + if !ok { + return false + } + if otherICMPFlow.src != ief.src { + return false + } + if otherICMPFlow.originalEchoID != ief.originalEchoID { + return false + } + if otherICMPFlow.assignedEchoID != ief.assignedEchoID { + return false + } + return true +} + +func (ief *icmpEchoFlow) Close() error { + return ief.closeCallback() +} + // sendToDst rewrites the echo ID to the one assigned to this flow func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error { + ief.UpdateLastActive() originalEcho, err := getICMPEcho(msg) if err != nil { return err @@ -80,17 +110,21 @@ func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error { if err != nil { return err } - return ief.SendToDst(dst, packet.RawPacket{Data: serializedPacket}) + _, err = ief.originConn.WriteTo(serializedPacket, &net.UDPAddr{ + IP: dst.AsSlice(), + }) + return err } // returnToSrc rewrites the echo ID to the original echo ID from the eyeball func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error { + ief.UpdateLastActive() reply.echo.ID = ief.originalEchoID reply.msg.Body = reply.echo pk := packet.ICMP{ IP: &packet.IP{ Src: reply.from, - Dst: ief.Src, + Dst: ief.src, Protocol: layers.IPProtocol(reply.msg.Type.Protocol()), TTL: packet.DefaultTTL, }, @@ -100,7 +134,7 @@ func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error { if err != nil { return err } - return ief.ReturnToSrc(serializedPacket) + return ief.responder.returnPacket(serializedPacket) } type echoReply struct { diff --git a/ingress/icmp_posix_test.go b/ingress/icmp_posix_test.go index bbadd196..d4b49a1e 100644 --- a/ingress/icmp_posix_test.go +++ b/ingress/icmp_posix_test.go @@ -52,24 +52,26 @@ func TestFunnelIdleTimeout(t *testing.T) { }, }, } - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte), + muxer := newMockMuxer(0) + responder := packetResponder{ + datagramMuxer: muxer, } - require.NoError(t, proxy.Request(&pk, &responder)) - responder.validate(t, &pk) + require.NoError(t, proxy.Request(ctx, &pk, &responder)) + validateEchoFlow(t, muxer, &pk) // Send second request, should reuse the funnel - require.NoError(t, proxy.Request(&pk, nil)) - responder.validate(t, &pk) + require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{ + datagramMuxer: nil, + })) + validateEchoFlow(t, muxer, &pk) time.Sleep(idleTimeout * 2) - newResponder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte), + newMuxer := newMockMuxer(0) + newResponder := packetResponder{ + datagramMuxer: newMuxer, } - require.NoError(t, proxy.Request(&pk, &newResponder)) - newResponder.validate(t, &pk) + require.NoError(t, proxy.Request(ctx, &pk, &newResponder)) + validateEchoFlow(t, newMuxer, &pk) cancel() <-proxyDone diff --git a/ingress/icmp_windows.go b/ingress/icmp_windows.go index 7d9a46e0..c32b6f1d 100644 --- a/ingress/icmp_windows.go +++ b/ingress/icmp_windows.go @@ -265,7 +265,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { // Request sends an ICMP echo request and wait for a reply or timeout. // The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version. // It's possible that a slow request will block other requests, so we set the timeout to only 1s. -func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { if pk == nil { return errPacketNil } @@ -292,7 +292,7 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er return nil } -func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder packet.FunnelUniPipe) error { +func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder *packetResponder) error { var replyType icmp.Type if request.Dst.Is4() { replyType = ipv4.ICMPTypeEchoReply @@ -331,7 +331,7 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d if err != nil { return err } - return responder.SendPacket(request.Src, serializedPacket) + return responder.returnPacket(serializedPacket) } func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, error) { diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index 4b6826b4..466608b3 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -37,7 +37,9 @@ func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerol ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout) ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout) if ipv4Err != nil && ipv6Err != nil { - return nil, fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err) + err := fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err) + logger.Debug().Err(err).Msg("ICMP proxy feature is disabled") + return nil, err } if ipv4Err != nil { logger.Debug().Err(ipv4Err).Msg("failed to create ICMPv4 proxy, only ICMPv6 proxy is created") @@ -73,15 +75,18 @@ func (ir *icmpRouter) Serve(ctx context.Context) error { return fmt.Errorf("ICMPv4 proxy and ICMPv6 proxy are both nil") } -func (ir *icmpRouter) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { +func (ir *icmpRouter) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { + if pk == nil { + return errPacketNil + } if pk.Dst.Is4() { if ir.ipv4Proxy != nil { - return ir.ipv4Proxy.Request(pk, responder) + return ir.ipv4Proxy.Request(ctx, pk, responder) } return fmt.Errorf("ICMPv4 proxy was not instantiated") } if ir.ipv6Proxy != nil { - return ir.ipv6Proxy.Request(pk, responder) + return ir.ipv6Proxy.Request(ctx, pk, responder) } return fmt.Errorf("ICMPv6 proxy was not instantiated") } diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index d080f46a..a6370e7c 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -52,9 +52,9 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { close(proxyDone) }() - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte, 1), + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } protocol := layers.IPProtocolICMPv6 @@ -90,8 +90,8 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { }, }, } - require.NoError(t, router.Request(&pk, &responder)) - responder.validate(t, &pk) + require.NoError(t, router.Request(ctx, &pk, &responder)) + validateEchoFlow(t, muxer, &pk) } } cancel() @@ -123,9 +123,10 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { echoID := 38451 + i go func() { defer wg.Done() - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte, 1), + + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } for seq := 0; seq < endSeq; seq++ { pk := &packet.ICMP{ @@ -145,15 +146,15 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, }, } - require.NoError(t, router.Request(pk, &responder)) - responder.validate(t, pk) + require.NoError(t, router.Request(ctx, pk, &responder)) + validateEchoFlow(t, muxer, pk) } }() go func() { defer wg.Done() - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte, 1), + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } for seq := 0; seq < endSeq; seq++ { pk := &packet.ICMP{ @@ -173,8 +174,8 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, }, } - require.NoError(t, router.Request(pk, &responder)) - responder.validate(t, pk) + require.NoError(t, router.Request(ctx, pk, &responder)) + validateEchoFlow(t, muxer, pk) } }() } @@ -241,9 +242,9 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp. router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) require.NoError(t, err) - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte), + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } protocol := layers.IPProtocolICMPv4 if srcDstIP.Is6() { @@ -259,7 +260,7 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp. }, Message: &m, } - require.Error(t, router.Request(&pk, &responder)) + require.Error(t, router.Request(context.Background(), &pk, &responder)) } } @@ -285,9 +286,10 @@ func (efr *echoFlowResponder) Close() error { return nil } -func (efr *echoFlowResponder) validate(t *testing.T, echoReq *packet.ICMP) { - pk := <-efr.respChan - decoded, err := efr.decoder.Decode(packet.RawPacket{Data: pk}) +func validateEchoFlow(t *testing.T, muxer *mockMuxer, echoReq *packet.ICMP) { + pk := <-muxer.cfdToEdge + decoder := packet.NewICMPDecoder() + decoded, err := decoder.Decode(packet.RawPacket{Data: pk.Payload()}) require.NoError(t, err) require.Equal(t, decoded.Src, echoReq.Dst) require.Equal(t, decoded.Dst, echoReq.Src) diff --git a/ingress/packet_router.go b/ingress/packet_router.go new file mode 100644 index 00000000..88777869 --- /dev/null +++ b/ingress/packet_router.go @@ -0,0 +1,134 @@ +package ingress + +import ( + "context" + "fmt" + "net/netip" + + "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/packet" + quicpogs "github.com/cloudflare/cloudflared/quic" +) + +// Upstream of raw packets +type muxer interface { + SendPacket(pk quicpogs.Packet) error + // ReceivePacket waits for the next raw packet from upstream + ReceivePacket(ctx context.Context) (quicpogs.Packet, error) +} + +// PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets +type PacketRouter struct { + globalConfig *GlobalRouterConfig + muxer muxer + logger *zerolog.Logger + checkRouterEnabledFunc func() bool + icmpDecoder *packet.ICMPDecoder + encoder *packet.Encoder +} + +// GlobalRouterConfig is the configuration shared by all instance of Router. +type GlobalRouterConfig struct { + ICMPRouter *icmpRouter + IPv4Src netip.Addr + IPv6Src netip.Addr + Zone string +} + +// NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil. +func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *PacketRouter { + return &PacketRouter{ + globalConfig: globalConfig, + muxer: muxer, + logger: logger, + checkRouterEnabledFunc: checkRouterEnabledFunc, + icmpDecoder: packet.NewICMPDecoder(), + encoder: packet.NewEncoder(), + } +} + +func (r *PacketRouter) Serve(ctx context.Context) error { + for { + rawPacket, responder, err := r.nextPacket(ctx) + if err != nil { + return err + } + r.handlePacket(ctx, rawPacket, responder) + } +} + +func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packetResponder, error) { + pk, err := r.muxer.ReceivePacket(ctx) + if err != nil { + return packet.RawPacket{}, nil, err + } + responder := &packetResponder{ + datagramMuxer: r.muxer, + } + switch pk.Type() { + case quicpogs.DatagramTypeIP: + return packet.RawPacket{Data: pk.Payload()}, responder, nil + case quicpogs.DatagramTypeIPWithTrace: + return packet.RawPacket{}, responder, fmt.Errorf("TODO: TUN-6604 Handle IP packet with trace") + default: + return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type()) + } +} + +func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPacket, responder *packetResponder) { + // ICMP Proxy feature is disabled, drop packets + if r.globalConfig == nil { + return + } + + if enabled := r.checkRouterEnabledFunc(); !enabled { + return + } + + icmpPacket, err := r.icmpDecoder.Decode(rawPacket) + if err != nil { + r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram") + return + } + + if icmpPacket.TTL <= 1 { + if err := r.sendTTLExceedMsg(ctx, icmpPacket, rawPacket, r.encoder); err != nil { + r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error") + } + return + } + icmpPacket.TTL-- + + if err := r.globalConfig.ICMPRouter.Request(ctx, icmpPacket, responder); err != nil { + r.logger.Err(err). + Str("src", icmpPacket.Src.String()). + Str("dst", icmpPacket.Dst.String()). + Interface("type", icmpPacket.Type). + Msg("Failed to send ICMP packet") + } +} + +func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, rawPacket packet.RawPacket, encoder *packet.Encoder) error { + var srcIP netip.Addr + if pk.Dst.Is4() { + srcIP = r.globalConfig.IPv4Src + } else { + srcIP = r.globalConfig.IPv6Src + } + ttlExceedPacket := packet.NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP) + + encodedTTLExceed, err := encoder.Encode(ttlExceedPacket) + if err != nil { + return err + } + return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed)) +} + +type packetResponder struct { + datagramMuxer muxer +} + +func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error { + return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket)) +} diff --git a/packet/router_test.go b/ingress/packet_router_test.go similarity index 57% rename from packet/router_test.go rename to ingress/packet_router_test.go index 4998456e..7738d294 100644 --- a/packet/router_test.go +++ b/ingress/packet_router_test.go @@ -1,4 +1,4 @@ -package packet +package ingress import ( "bytes" @@ -10,32 +10,28 @@ import ( "time" "github.com/google/gopacket/layers" - "github.com/rs/zerolog" "github.com/stretchr/testify/require" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" + + "github.com/cloudflare/cloudflared/packet" + quicpogs "github.com/cloudflare/cloudflared/quic" ) var ( - noopLogger = zerolog.Nop() packetConfig = &GlobalRouterConfig{ - ICMPRouter: &mockICMPRouter{}, + ICMPRouter: nil, IPv4Src: netip.MustParseAddr("172.16.0.1"), IPv6Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"), } ) func TestRouterReturnTTLExceed(t *testing.T) { - upstream := &mockUpstream{ - source: make(chan RawPacket), - } - returnPipe := &mockFunnelUniPipe{ - uniPipe: make(chan RawPacket), - } + muxer := newMockMuxer(0) routerEnabled := &routerEnabledChecker{} routerEnabled.set(true) - router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled) + router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled) ctx, cancel := context.WithCancel(context.Background()) routerStopped := make(chan struct{}) go func() { @@ -43,8 +39,8 @@ func TestRouterReturnTTLExceed(t *testing.T) { close(routerStopped) }() - pk := ICMP{ - IP: &IP{ + pk := packet.ICMP{ + IP: &packet.IP{ Src: netip.MustParseAddr("192.168.1.1"), Dst: netip.MustParseAddr("10.0.0.1"), Protocol: layers.IPProtocolICMPv4, @@ -60,9 +56,9 @@ func TestRouterReturnTTLExceed(t *testing.T) { }, }, } - assertTTLExceed(t, &pk, router.globalConfig.IPv4Src, upstream, returnPipe) - pk = ICMP{ - IP: &IP{ + assertTTLExceed(t, &pk, router.globalConfig.IPv4Src, muxer) + pk = packet.ICMP{ + IP: &packet.IP{ Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"), Dst: netip.MustParseAddr("fd51:2391:697:f4ee::2"), Protocol: layers.IPProtocolICMPv6, @@ -78,21 +74,16 @@ func TestRouterReturnTTLExceed(t *testing.T) { }, }, } - assertTTLExceed(t, &pk, router.globalConfig.IPv6Src, upstream, returnPipe) + assertTTLExceed(t, &pk, router.globalConfig.IPv6Src, muxer) cancel() <-routerStopped } func TestRouterCheckEnabled(t *testing.T) { - upstream := &mockUpstream{ - source: make(chan RawPacket), - } - returnPipe := &mockFunnelUniPipe{ - uniPipe: make(chan RawPacket), - } + muxer := newMockMuxer(0) routerEnabled := &routerEnabledChecker{} - router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled) + router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled) ctx, cancel := context.WithCancel(context.Background()) routerStopped := make(chan struct{}) go func() { @@ -100,8 +91,8 @@ func TestRouterCheckEnabled(t *testing.T) { close(routerStopped) }() - pk := ICMP{ - IP: &IP{ + pk := packet.ICMP{ + IP: &packet.IP{ Src: netip.MustParseAddr("192.168.1.1"), Dst: netip.MustParseAddr("10.0.0.1"), Protocol: layers.IPProtocolICMPv4, @@ -119,23 +110,28 @@ func TestRouterCheckEnabled(t *testing.T) { } // router is disabled - require.NoError(t, upstream.send(&pk)) + encoder := packet.NewEncoder() + encodedPacket, err := encoder.Encode(&pk) + require.NoError(t, err) + sendPacket := quicpogs.RawPacket(encodedPacket) + + muxer.edgeToCfd <- sendPacket select { case <-time.After(time.Millisecond * 10): - case <-returnPipe.uniPipe: + case <-muxer.cfdToEdge: t.Error("Unexpected reply when router is disabled") } routerEnabled.set(true) // router is enabled, expects reply - require.NoError(t, upstream.send(&pk)) - <-returnPipe.uniPipe + muxer.edgeToCfd <- sendPacket + <-muxer.cfdToEdge routerEnabled.set(false) // router is disabled - require.NoError(t, upstream.send(&pk)) + muxer.edgeToCfd <- sendPacket select { case <-time.After(time.Millisecond * 10): - case <-returnPipe.uniPipe: + case <-muxer.cfdToEdge: t.Error("Unexpected reply when router is disabled") } @@ -143,66 +139,83 @@ func TestRouterCheckEnabled(t *testing.T) { <-routerStopped } -func assertTTLExceed(t *testing.T, originalPacket *ICMP, expectedSrc netip.Addr, upstream *mockUpstream, returnPipe *mockFunnelUniPipe) { - encoder := NewEncoder() +func assertTTLExceed(t *testing.T, originalPacket *packet.ICMP, expectedSrc netip.Addr, muxer *mockMuxer) { + encoder := packet.NewEncoder() rawPacket, err := encoder.Encode(originalPacket) require.NoError(t, err) - upstream.source <- rawPacket + muxer.edgeToCfd <- quicpogs.RawPacket(rawPacket) - resp := <-returnPipe.uniPipe - decoder := NewICMPDecoder() - decoded, err := decoder.Decode(resp) + resp := <-muxer.cfdToEdge + decoder := packet.NewICMPDecoder() + decoded, err := decoder.Decode(packet.RawPacket(resp.(quicpogs.RawPacket))) require.NoError(t, err) require.Equal(t, expectedSrc, decoded.Src) require.Equal(t, originalPacket.Src, decoded.Dst) require.Equal(t, originalPacket.Protocol, decoded.Protocol) - require.Equal(t, DefaultTTL, decoded.TTL) + require.Equal(t, packet.DefaultTTL, decoded.TTL) if originalPacket.Dst.Is4() { require.Equal(t, ipv4.ICMPTypeTimeExceeded, decoded.Type) } else { require.Equal(t, ipv6.ICMPTypeTimeExceeded, decoded.Type) } require.Equal(t, 0, decoded.Code) - assertICMPChecksum(t, decoded) timeExceed, ok := decoded.Body.(*icmp.TimeExceeded) require.True(t, ok) require.True(t, bytes.Equal(rawPacket.Data, timeExceed.Data)) } -type mockUpstream struct { - source chan RawPacket +type mockMuxer struct { + cfdToEdge chan quicpogs.Packet + edgeToCfd chan quicpogs.Packet } -func (ms *mockUpstream) send(pk Packet) error { - encoder := NewEncoder() - rawPacket, err := encoder.Encode(pk) - if err != nil { - return err +func newMockMuxer(capacity int) *mockMuxer { + return &mockMuxer{ + cfdToEdge: make(chan quicpogs.Packet, capacity), + edgeToCfd: make(chan quicpogs.Packet, capacity), } - ms.source <- rawPacket +} + +// Copy packet, because icmpProxy expects the encoder buffer to be reusable after the packet is sent +func (mm *mockMuxer) SendPacket(pk quicpogs.Packet) error { + payload := pk.Payload() + copiedPayload := make([]byte, len(payload)) + copy(copiedPayload, payload) + + metadata := pk.Metadata() + copiedMetadata := make([]byte, len(metadata)) + copy(copiedMetadata, metadata) + + var copiedPacket quicpogs.Packet + switch pk.Type() { + case quicpogs.DatagramTypeIP: + copiedPacket = quicpogs.RawPacket(packet.RawPacket{ + Data: copiedPayload, + }) + case quicpogs.DatagramTypeIPWithTrace: + copiedPacket = &quicpogs.TracedPacket{ + Packet: packet.RawPacket{ + Data: copiedPayload, + }, + TracingIdentity: copiedMetadata, + } + default: + return fmt.Errorf("unexpected metadata type %d", pk.Type()) + } + mm.cfdToEdge <- copiedPacket return nil } -func (ms *mockUpstream) ReceivePacket(ctx context.Context) (RawPacket, error) { +func (mm *mockMuxer) ReceivePacket(ctx context.Context) (quicpogs.Packet, error) { select { case <-ctx.Done(): - return RawPacket{}, ctx.Err() - case pk := <-ms.source: + return nil, ctx.Err() + case pk := <-mm.edgeToCfd: return pk, nil } } -type mockICMPRouter struct{} - -func (mir mockICMPRouter) Serve(ctx context.Context) error { - return fmt.Errorf("Serve not implemented by mockICMPRouter") -} - -func (mir mockICMPRouter) Request(pk *ICMP, responder FunnelUniPipe) error { - return fmt.Errorf("Request not implemented by mockICMPRouter") -} - type routerEnabledChecker struct { enabled uint32 } diff --git a/packet/funnel.go b/packet/funnel.go index f0124070..547161bc 100644 --- a/packet/funnel.go +++ b/packet/funnel.go @@ -16,10 +16,6 @@ var ( // Funnel is an abstraction to pipe from 1 src to 1 or more destinations type Funnel interface { - // SendToDst sends a raw packet to a destination - SendToDst(dst netip.Addr, pk RawPacket) error - // ReturnToSrc returns a raw packet to the source - ReturnToSrc(pk RawPacket) error // LastActive returns the last time SendToDst or ReturnToSrc is called LastActive() time.Time // Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error @@ -36,73 +32,26 @@ type FunnelUniPipe interface { Close() error } -// RawPacketFunnel is an implementation of Funnel that sends raw packets. It can be embedded in other structs to -// satisfy the Funnel interface. -type RawPacketFunnel struct { - Src netip.Addr +type ActivityTracker struct { // last active unix time. Unit is seconds lastActive int64 - sendPipe FunnelUniPipe - returnPipe FunnelUniPipe } -func NewRawPacketFunnel(src netip.Addr, sendPipe, returnPipe FunnelUniPipe) *RawPacketFunnel { - return &RawPacketFunnel{ - Src: src, +func NewActivityTracker() *ActivityTracker { + return &ActivityTracker{ lastActive: time.Now().Unix(), - sendPipe: sendPipe, - returnPipe: returnPipe, } } -func (rpf *RawPacketFunnel) SendToDst(dst netip.Addr, pk RawPacket) error { - rpf.updateLastActive() - return rpf.sendPipe.SendPacket(dst, pk) +func (at *ActivityTracker) UpdateLastActive() { + atomic.StoreInt64(&at.lastActive, time.Now().Unix()) } -func (rpf *RawPacketFunnel) ReturnToSrc(pk RawPacket) error { - rpf.updateLastActive() - return rpf.returnPipe.SendPacket(rpf.Src, pk) -} - -func (rpf *RawPacketFunnel) updateLastActive() { - atomic.StoreInt64(&rpf.lastActive, time.Now().Unix()) -} - -func (rpf *RawPacketFunnel) LastActive() time.Time { - lastActive := atomic.LoadInt64(&rpf.lastActive) +func (at *ActivityTracker) LastActive() time.Time { + lastActive := atomic.LoadInt64(&at.lastActive) return time.Unix(lastActive, 0) } -func (rpf *RawPacketFunnel) Close() error { - sendPipeErr := rpf.sendPipe.Close() - returnPipeErr := rpf.returnPipe.Close() - if sendPipeErr != nil { - return sendPipeErr - } - if returnPipeErr != nil { - return returnPipeErr - } - return nil -} - -func (rpf *RawPacketFunnel) Equal(other Funnel) bool { - otherRawFunnel, ok := other.(*RawPacketFunnel) - if !ok { - return false - } - if rpf.Src != otherRawFunnel.Src { - return false - } - if rpf.sendPipe != otherRawFunnel.sendPipe { - return false - } - if rpf.returnPipe != otherRawFunnel.returnPipe { - return false - } - return true -} - // FunnelID represents a key type that can be used by FunnelTracker type FunnelID interface { // Type returns the name of the type that implements the FunnelID diff --git a/packet/router.go b/packet/router.go deleted file mode 100644 index 29a5d839..00000000 --- a/packet/router.go +++ /dev/null @@ -1,108 +0,0 @@ -package packet - -import ( - "context" - "net/netip" - - "github.com/rs/zerolog" -) - -// ICMPRouter sends ICMP messages and listens for their responses -type ICMPRouter interface { - // Serve starts listening for responses to the requests until context is done - Serve(ctx context.Context) error - // Request sends an ICMP message. Implementations should not modify pk after the function returns. - Request(pk *ICMP, responder FunnelUniPipe) error -} - -// Upstream of raw packets -type Upstream interface { - // ReceivePacket waits for the next raw packet from upstream - ReceivePacket(ctx context.Context) (RawPacket, error) -} - -// Router routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets -type Router struct { - upstream Upstream - returnPipe FunnelUniPipe - globalConfig *GlobalRouterConfig - logger *zerolog.Logger - checkRouterEnabledFunc func() bool -} - -// GlobalRouterConfig is the configuration shared by all instance of Router. -type GlobalRouterConfig struct { - ICMPRouter ICMPRouter - IPv4Src netip.Addr - IPv6Src netip.Addr - Zone string -} - -func NewRouter(globalConfig *GlobalRouterConfig, upstream Upstream, returnPipe FunnelUniPipe, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *Router { - return &Router{ - upstream: upstream, - returnPipe: returnPipe, - globalConfig: globalConfig, - logger: logger, - checkRouterEnabledFunc: checkRouterEnabledFunc, - } -} - -func (r *Router) Serve(ctx context.Context) error { - icmpDecoder := NewICMPDecoder() - encoder := NewEncoder() - for { - rawPacket, err := r.upstream.ReceivePacket(ctx) - if err != nil { - return err - } - - // Drop packets if ICMPRouter wasn't created - if r.globalConfig == nil { - continue - } - - if enabled := r.checkRouterEnabledFunc(); !enabled { - continue - } - - icmpPacket, err := icmpDecoder.Decode(rawPacket) - if err != nil { - r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram") - continue - } - - if icmpPacket.TTL <= 1 { - if err := r.sendTTLExceedMsg(icmpPacket, rawPacket, encoder); err != nil { - r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error") - } - continue - } - icmpPacket.TTL-- - - if err := r.globalConfig.ICMPRouter.Request(icmpPacket, r.returnPipe); err != nil { - r.logger.Err(err). - Str("src", icmpPacket.Src.String()). - Str("dst", icmpPacket.Dst.String()). - Interface("type", icmpPacket.Type). - Msg("Failed to send ICMP packet") - continue - } - } -} - -func (r *Router) sendTTLExceedMsg(pk *ICMP, rawPacket RawPacket, encoder *Encoder) error { - var srcIP netip.Addr - if pk.Dst.Is4() { - srcIP = r.globalConfig.IPv4Src - } else { - srcIP = r.globalConfig.IPv6Src - } - ttlExceedPacket := NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP) - - encodedTTLExceed, err := encoder.Encode(ttlExceedPacket) - if err != nil { - return err - } - return r.returnPipe.SendPacket(pk.Src, encodedTTLExceed) -} diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 6c0c4cd7..95271ab4 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -20,8 +20,8 @@ import ( "github.com/cloudflare/cloudflared/edgediscovery" "github.com/cloudflare/cloudflared/edgediscovery/allregions" "github.com/cloudflare/cloudflared/h2mux" + "github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/orchestration" - "github.com/cloudflare/cloudflared/packet" quicpogs "github.com/cloudflare/cloudflared/quic" "github.com/cloudflare/cloudflared/retry" "github.com/cloudflare/cloudflared/signal" @@ -70,7 +70,7 @@ type TunnelConfig struct { MuxerConfig *connection.MuxerConfig ProtocolSelector connection.ProtocolSelector EdgeTLSConfigs map[connection.Protocol]*tls.Config - PacketConfig *packet.GlobalRouterConfig + PacketConfig *ingress.GlobalRouterConfig } func (c *TunnelConfig) registrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {