diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index 9f031641..ce628168 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -22,7 +22,6 @@ import ( "github.com/cloudflare/cloudflared/cmd/cloudflared/cliutil" "github.com/cloudflare/cloudflared/edgediscovery/allregions" - "github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/config" "github.com/cloudflare/cloudflared/connection" @@ -463,7 +462,7 @@ func parseConfigIPVersion(version string) (v allregions.ConfigIPVersion, err err return } -func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRouterConfig, error) { +func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRouterConfig, error) { ipv4Src, err := determineICMPv4Src(c.String("icmpv4-src"), logger) if err != nil { return nil, errors.Wrap(err, "failed to determine IPv4 source address for ICMP proxy") @@ -484,7 +483,7 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRout if err != nil { return nil, err } - return &packet.GlobalRouterConfig{ + return &ingress.GlobalRouterConfig{ ICMPRouter: icmpRouter, IPv4Src: ipv4Src, IPv6Src: ipv6Src, diff --git a/connection/quic.go b/connection/quic.go index 4eb405c7..645daf31 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -57,7 +57,7 @@ type QUICConnection struct { sessionManager datagramsession.Manager // datagramMuxer mux/demux datagrams from quic connection datagramMuxer *quicpogs.DatagramMuxerV2 - packetRouter *packet.Router + packetRouter *ingress.PacketRouter controlStreamHandler ControlStreamHandler connOptions *tunnelpogs.ConnectionOptions } @@ -72,7 +72,7 @@ func NewQUICConnection( connOptions *tunnelpogs.ConnectionOptions, controlStreamHandler ControlStreamHandler, logger *zerolog.Logger, - packetRouterConfig *packet.GlobalRouterConfig, + packetRouterConfig *ingress.GlobalRouterConfig, ) (*QUICConnection, error) { udpConn, err := createUDPConnForConnIndex(connIndex, logger) if err != nil { @@ -93,8 +93,7 @@ func NewQUICConnection( sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity) datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan) sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan) - muxer := muxerWrapper{muxer: datagramMuxer} - packetRouter := packet.NewRouter(packetRouterConfig, &muxer, &muxer, logger, orchestrator.WarpRoutingEnabled) + packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger, orchestrator.WarpRoutingEnabled) return &QUICConnection{ session: session, diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 2a57b415..caa67bb4 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -11,7 +11,6 @@ import ( "context" "fmt" "math" - "net" "net/netip" "strconv" "sync" @@ -129,10 +128,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle }, nil } -func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { - if pk == nil { - return errPacketNil - } +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { originalEcho, err := getICMPEcho(pk.Message) if err != nil { return err @@ -152,13 +148,11 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er if err != nil { return nil, err } - originSender := originSender{ - conn: ip.conn, - echoIDTracker: ip.echoIDTracker, - echoIDTrackerKey: echoIDTrackerKey, - assignedEchoID: assignedEchoID, + closeCallback := func() error { + ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID) + return nil } - icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, int(assignedEchoID), originalEcho.ID, ip.encoder) + icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID, ip.encoder) return icmpFlow, nil } funnelID := echoFunnelID(assignedEchoID) @@ -250,23 +244,3 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error { } return icmpFlow.returnToSrc(reply) } - -// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface -type originSender struct { - conn *icmp.PacketConn - echoIDTracker *echoIDTracker - echoIDTrackerKey flow3Tuple - assignedEchoID uint16 -} - -func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error { - _, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{ - IP: dst.AsSlice(), - }) - return err -} - -func (os *originSender) Close() error { - os.echoIDTracker.release(os.echoIDTrackerKey, os.assignedEchoID) - return nil -} diff --git a/ingress/icmp_generic.go b/ingress/icmp_generic.go index e1c66e81..88e2581d 100644 --- a/ingress/icmp_generic.go +++ b/ingress/icmp_generic.go @@ -18,7 +18,7 @@ var errICMPProxyNotImplemented = fmt.Errorf("ICMP proxy is not implemented on %s type icmpProxy struct{} -func (ip icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { +func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { return errICMPProxyNotImplemented } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index 10ef3e76..a1936ad3 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -97,10 +97,7 @@ func checkInPingGroup() error { return fmt.Errorf("did not find group range in %s", pingGroupPath) } -func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { - if pk == nil { - return errPacketNil - } +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { originalEcho, err := getICMPEcho(pk.Message) if err != nil { return err @@ -113,13 +110,15 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er } ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr()) newConnChan <- conn + closeCallback := func() error { + return conn.Close() + } localUDPAddr, ok := conn.LocalAddr().(*net.UDPAddr) if !ok { return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr()) } - originSender := originSender{conn: conn} echoID := localUDPAddr.Port - icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, echoID, originalEcho.ID, packet.NewEncoder()) + icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder()) return icmpFlow, nil } funnelID := flow3Tuple{ @@ -187,22 +186,6 @@ func (ip *icmpProxy) listenResponse(flow *icmpEchoFlow, conn *icmp.PacketConn) e } } -// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface -type originSender struct { - conn *icmp.PacketConn -} - -func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error { - _, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{ - IP: dst.AsSlice(), - }) - return err -} - -func (os *originSender) Close() error { - return os.conn.Close() -} - // Only linux uses flow3Tuple as FunnelID func (ft flow3Tuple) Type() string { return "srcIP_dstIP_echoID" diff --git a/ingress/icmp_posix.go b/ingress/icmp_posix.go index 0badb685..95962667 100644 --- a/ingress/icmp_posix.go +++ b/ingress/icmp_posix.go @@ -43,24 +43,54 @@ type flow3Tuple struct { // icmpEchoFlow implements the packet.Funnel interface. type icmpEchoFlow struct { - *packet.RawPacketFunnel + *packet.ActivityTracker + closeCallback func() error + src netip.Addr + originConn *icmp.PacketConn + responder *packetResponder assignedEchoID int originalEchoID int // it's up to the user to ensure respEncoder is not used concurrently respEncoder *packet.Encoder } -func newICMPEchoFlow(src netip.Addr, sendPipe, returnPipe packet.FunnelUniPipe, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow { +func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder *packetResponder, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow { return &icmpEchoFlow{ - RawPacketFunnel: packet.NewRawPacketFunnel(src, sendPipe, returnPipe), + ActivityTracker: packet.NewActivityTracker(), + closeCallback: closeCallback, + src: src, + originConn: originConn, + responder: responder, assignedEchoID: assignedEchoID, originalEchoID: originalEchoID, respEncoder: respEncoder, } } +func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool { + otherICMPFlow, ok := other.(*icmpEchoFlow) + if !ok { + return false + } + if otherICMPFlow.src != ief.src { + return false + } + if otherICMPFlow.originalEchoID != ief.originalEchoID { + return false + } + if otherICMPFlow.assignedEchoID != ief.assignedEchoID { + return false + } + return true +} + +func (ief *icmpEchoFlow) Close() error { + return ief.closeCallback() +} + // sendToDst rewrites the echo ID to the one assigned to this flow func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error { + ief.UpdateLastActive() originalEcho, err := getICMPEcho(msg) if err != nil { return err @@ -80,17 +110,21 @@ func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error { if err != nil { return err } - return ief.SendToDst(dst, packet.RawPacket{Data: serializedPacket}) + _, err = ief.originConn.WriteTo(serializedPacket, &net.UDPAddr{ + IP: dst.AsSlice(), + }) + return err } // returnToSrc rewrites the echo ID to the original echo ID from the eyeball func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error { + ief.UpdateLastActive() reply.echo.ID = ief.originalEchoID reply.msg.Body = reply.echo pk := packet.ICMP{ IP: &packet.IP{ Src: reply.from, - Dst: ief.Src, + Dst: ief.src, Protocol: layers.IPProtocol(reply.msg.Type.Protocol()), TTL: packet.DefaultTTL, }, @@ -100,7 +134,7 @@ func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error { if err != nil { return err } - return ief.ReturnToSrc(serializedPacket) + return ief.responder.returnPacket(serializedPacket) } type echoReply struct { diff --git a/ingress/icmp_posix_test.go b/ingress/icmp_posix_test.go index bbadd196..d4b49a1e 100644 --- a/ingress/icmp_posix_test.go +++ b/ingress/icmp_posix_test.go @@ -52,24 +52,26 @@ func TestFunnelIdleTimeout(t *testing.T) { }, }, } - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte), + muxer := newMockMuxer(0) + responder := packetResponder{ + datagramMuxer: muxer, } - require.NoError(t, proxy.Request(&pk, &responder)) - responder.validate(t, &pk) + require.NoError(t, proxy.Request(ctx, &pk, &responder)) + validateEchoFlow(t, muxer, &pk) // Send second request, should reuse the funnel - require.NoError(t, proxy.Request(&pk, nil)) - responder.validate(t, &pk) + require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{ + datagramMuxer: nil, + })) + validateEchoFlow(t, muxer, &pk) time.Sleep(idleTimeout * 2) - newResponder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte), + newMuxer := newMockMuxer(0) + newResponder := packetResponder{ + datagramMuxer: newMuxer, } - require.NoError(t, proxy.Request(&pk, &newResponder)) - newResponder.validate(t, &pk) + require.NoError(t, proxy.Request(ctx, &pk, &newResponder)) + validateEchoFlow(t, newMuxer, &pk) cancel() <-proxyDone diff --git a/ingress/icmp_windows.go b/ingress/icmp_windows.go index 7d9a46e0..c32b6f1d 100644 --- a/ingress/icmp_windows.go +++ b/ingress/icmp_windows.go @@ -265,7 +265,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { // Request sends an ICMP echo request and wait for a reply or timeout. // The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version. // It's possible that a slow request will block other requests, so we set the timeout to only 1s. -func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { +func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { if pk == nil { return errPacketNil } @@ -292,7 +292,7 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er return nil } -func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder packet.FunnelUniPipe) error { +func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder *packetResponder) error { var replyType icmp.Type if request.Dst.Is4() { replyType = ipv4.ICMPTypeEchoReply @@ -331,7 +331,7 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d if err != nil { return err } - return responder.SendPacket(request.Src, serializedPacket) + return responder.returnPacket(serializedPacket) } func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, error) { diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index 4b6826b4..466608b3 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -37,7 +37,9 @@ func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerol ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout) ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout) if ipv4Err != nil && ipv6Err != nil { - return nil, fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err) + err := fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err) + logger.Debug().Err(err).Msg("ICMP proxy feature is disabled") + return nil, err } if ipv4Err != nil { logger.Debug().Err(ipv4Err).Msg("failed to create ICMPv4 proxy, only ICMPv6 proxy is created") @@ -73,15 +75,18 @@ func (ir *icmpRouter) Serve(ctx context.Context) error { return fmt.Errorf("ICMPv4 proxy and ICMPv6 proxy are both nil") } -func (ir *icmpRouter) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error { +func (ir *icmpRouter) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { + if pk == nil { + return errPacketNil + } if pk.Dst.Is4() { if ir.ipv4Proxy != nil { - return ir.ipv4Proxy.Request(pk, responder) + return ir.ipv4Proxy.Request(ctx, pk, responder) } return fmt.Errorf("ICMPv4 proxy was not instantiated") } if ir.ipv6Proxy != nil { - return ir.ipv6Proxy.Request(pk, responder) + return ir.ipv6Proxy.Request(ctx, pk, responder) } return fmt.Errorf("ICMPv6 proxy was not instantiated") } diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index d080f46a..a6370e7c 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -52,9 +52,9 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { close(proxyDone) }() - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte, 1), + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } protocol := layers.IPProtocolICMPv6 @@ -90,8 +90,8 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { }, }, } - require.NoError(t, router.Request(&pk, &responder)) - responder.validate(t, &pk) + require.NoError(t, router.Request(ctx, &pk, &responder)) + validateEchoFlow(t, muxer, &pk) } } cancel() @@ -123,9 +123,10 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { echoID := 38451 + i go func() { defer wg.Done() - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte, 1), + + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } for seq := 0; seq < endSeq; seq++ { pk := &packet.ICMP{ @@ -145,15 +146,15 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, }, } - require.NoError(t, router.Request(pk, &responder)) - responder.validate(t, pk) + require.NoError(t, router.Request(ctx, pk, &responder)) + validateEchoFlow(t, muxer, pk) } }() go func() { defer wg.Done() - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte, 1), + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } for seq := 0; seq < endSeq; seq++ { pk := &packet.ICMP{ @@ -173,8 +174,8 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, }, } - require.NoError(t, router.Request(pk, &responder)) - responder.validate(t, pk) + require.NoError(t, router.Request(ctx, pk, &responder)) + validateEchoFlow(t, muxer, pk) } }() } @@ -241,9 +242,9 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp. router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) require.NoError(t, err) - responder := echoFlowResponder{ - decoder: packet.NewICMPDecoder(), - respChan: make(chan []byte), + muxer := newMockMuxer(1) + responder := packetResponder{ + datagramMuxer: muxer, } protocol := layers.IPProtocolICMPv4 if srcDstIP.Is6() { @@ -259,7 +260,7 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp. }, Message: &m, } - require.Error(t, router.Request(&pk, &responder)) + require.Error(t, router.Request(context.Background(), &pk, &responder)) } } @@ -285,9 +286,10 @@ func (efr *echoFlowResponder) Close() error { return nil } -func (efr *echoFlowResponder) validate(t *testing.T, echoReq *packet.ICMP) { - pk := <-efr.respChan - decoded, err := efr.decoder.Decode(packet.RawPacket{Data: pk}) +func validateEchoFlow(t *testing.T, muxer *mockMuxer, echoReq *packet.ICMP) { + pk := <-muxer.cfdToEdge + decoder := packet.NewICMPDecoder() + decoded, err := decoder.Decode(packet.RawPacket{Data: pk.Payload()}) require.NoError(t, err) require.Equal(t, decoded.Src, echoReq.Dst) require.Equal(t, decoded.Dst, echoReq.Src) diff --git a/ingress/packet_router.go b/ingress/packet_router.go new file mode 100644 index 00000000..88777869 --- /dev/null +++ b/ingress/packet_router.go @@ -0,0 +1,134 @@ +package ingress + +import ( + "context" + "fmt" + "net/netip" + + "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/packet" + quicpogs "github.com/cloudflare/cloudflared/quic" +) + +// Upstream of raw packets +type muxer interface { + SendPacket(pk quicpogs.Packet) error + // ReceivePacket waits for the next raw packet from upstream + ReceivePacket(ctx context.Context) (quicpogs.Packet, error) +} + +// PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets +type PacketRouter struct { + globalConfig *GlobalRouterConfig + muxer muxer + logger *zerolog.Logger + checkRouterEnabledFunc func() bool + icmpDecoder *packet.ICMPDecoder + encoder *packet.Encoder +} + +// GlobalRouterConfig is the configuration shared by all instance of Router. +type GlobalRouterConfig struct { + ICMPRouter *icmpRouter + IPv4Src netip.Addr + IPv6Src netip.Addr + Zone string +} + +// NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil. +func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *PacketRouter { + return &PacketRouter{ + globalConfig: globalConfig, + muxer: muxer, + logger: logger, + checkRouterEnabledFunc: checkRouterEnabledFunc, + icmpDecoder: packet.NewICMPDecoder(), + encoder: packet.NewEncoder(), + } +} + +func (r *PacketRouter) Serve(ctx context.Context) error { + for { + rawPacket, responder, err := r.nextPacket(ctx) + if err != nil { + return err + } + r.handlePacket(ctx, rawPacket, responder) + } +} + +func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packetResponder, error) { + pk, err := r.muxer.ReceivePacket(ctx) + if err != nil { + return packet.RawPacket{}, nil, err + } + responder := &packetResponder{ + datagramMuxer: r.muxer, + } + switch pk.Type() { + case quicpogs.DatagramTypeIP: + return packet.RawPacket{Data: pk.Payload()}, responder, nil + case quicpogs.DatagramTypeIPWithTrace: + return packet.RawPacket{}, responder, fmt.Errorf("TODO: TUN-6604 Handle IP packet with trace") + default: + return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type()) + } +} + +func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPacket, responder *packetResponder) { + // ICMP Proxy feature is disabled, drop packets + if r.globalConfig == nil { + return + } + + if enabled := r.checkRouterEnabledFunc(); !enabled { + return + } + + icmpPacket, err := r.icmpDecoder.Decode(rawPacket) + if err != nil { + r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram") + return + } + + if icmpPacket.TTL <= 1 { + if err := r.sendTTLExceedMsg(ctx, icmpPacket, rawPacket, r.encoder); err != nil { + r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error") + } + return + } + icmpPacket.TTL-- + + if err := r.globalConfig.ICMPRouter.Request(ctx, icmpPacket, responder); err != nil { + r.logger.Err(err). + Str("src", icmpPacket.Src.String()). + Str("dst", icmpPacket.Dst.String()). + Interface("type", icmpPacket.Type). + Msg("Failed to send ICMP packet") + } +} + +func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, rawPacket packet.RawPacket, encoder *packet.Encoder) error { + var srcIP netip.Addr + if pk.Dst.Is4() { + srcIP = r.globalConfig.IPv4Src + } else { + srcIP = r.globalConfig.IPv6Src + } + ttlExceedPacket := packet.NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP) + + encodedTTLExceed, err := encoder.Encode(ttlExceedPacket) + if err != nil { + return err + } + return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed)) +} + +type packetResponder struct { + datagramMuxer muxer +} + +func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error { + return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket)) +} diff --git a/packet/router_test.go b/ingress/packet_router_test.go similarity index 57% rename from packet/router_test.go rename to ingress/packet_router_test.go index 4998456e..7738d294 100644 --- a/packet/router_test.go +++ b/ingress/packet_router_test.go @@ -1,4 +1,4 @@ -package packet +package ingress import ( "bytes" @@ -10,32 +10,28 @@ import ( "time" "github.com/google/gopacket/layers" - "github.com/rs/zerolog" "github.com/stretchr/testify/require" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" + + "github.com/cloudflare/cloudflared/packet" + quicpogs "github.com/cloudflare/cloudflared/quic" ) var ( - noopLogger = zerolog.Nop() packetConfig = &GlobalRouterConfig{ - ICMPRouter: &mockICMPRouter{}, + ICMPRouter: nil, IPv4Src: netip.MustParseAddr("172.16.0.1"), IPv6Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"), } ) func TestRouterReturnTTLExceed(t *testing.T) { - upstream := &mockUpstream{ - source: make(chan RawPacket), - } - returnPipe := &mockFunnelUniPipe{ - uniPipe: make(chan RawPacket), - } + muxer := newMockMuxer(0) routerEnabled := &routerEnabledChecker{} routerEnabled.set(true) - router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled) + router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled) ctx, cancel := context.WithCancel(context.Background()) routerStopped := make(chan struct{}) go func() { @@ -43,8 +39,8 @@ func TestRouterReturnTTLExceed(t *testing.T) { close(routerStopped) }() - pk := ICMP{ - IP: &IP{ + pk := packet.ICMP{ + IP: &packet.IP{ Src: netip.MustParseAddr("192.168.1.1"), Dst: netip.MustParseAddr("10.0.0.1"), Protocol: layers.IPProtocolICMPv4, @@ -60,9 +56,9 @@ func TestRouterReturnTTLExceed(t *testing.T) { }, }, } - assertTTLExceed(t, &pk, router.globalConfig.IPv4Src, upstream, returnPipe) - pk = ICMP{ - IP: &IP{ + assertTTLExceed(t, &pk, router.globalConfig.IPv4Src, muxer) + pk = packet.ICMP{ + IP: &packet.IP{ Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"), Dst: netip.MustParseAddr("fd51:2391:697:f4ee::2"), Protocol: layers.IPProtocolICMPv6, @@ -78,21 +74,16 @@ func TestRouterReturnTTLExceed(t *testing.T) { }, }, } - assertTTLExceed(t, &pk, router.globalConfig.IPv6Src, upstream, returnPipe) + assertTTLExceed(t, &pk, router.globalConfig.IPv6Src, muxer) cancel() <-routerStopped } func TestRouterCheckEnabled(t *testing.T) { - upstream := &mockUpstream{ - source: make(chan RawPacket), - } - returnPipe := &mockFunnelUniPipe{ - uniPipe: make(chan RawPacket), - } + muxer := newMockMuxer(0) routerEnabled := &routerEnabledChecker{} - router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled) + router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled) ctx, cancel := context.WithCancel(context.Background()) routerStopped := make(chan struct{}) go func() { @@ -100,8 +91,8 @@ func TestRouterCheckEnabled(t *testing.T) { close(routerStopped) }() - pk := ICMP{ - IP: &IP{ + pk := packet.ICMP{ + IP: &packet.IP{ Src: netip.MustParseAddr("192.168.1.1"), Dst: netip.MustParseAddr("10.0.0.1"), Protocol: layers.IPProtocolICMPv4, @@ -119,23 +110,28 @@ func TestRouterCheckEnabled(t *testing.T) { } // router is disabled - require.NoError(t, upstream.send(&pk)) + encoder := packet.NewEncoder() + encodedPacket, err := encoder.Encode(&pk) + require.NoError(t, err) + sendPacket := quicpogs.RawPacket(encodedPacket) + + muxer.edgeToCfd <- sendPacket select { case <-time.After(time.Millisecond * 10): - case <-returnPipe.uniPipe: + case <-muxer.cfdToEdge: t.Error("Unexpected reply when router is disabled") } routerEnabled.set(true) // router is enabled, expects reply - require.NoError(t, upstream.send(&pk)) - <-returnPipe.uniPipe + muxer.edgeToCfd <- sendPacket + <-muxer.cfdToEdge routerEnabled.set(false) // router is disabled - require.NoError(t, upstream.send(&pk)) + muxer.edgeToCfd <- sendPacket select { case <-time.After(time.Millisecond * 10): - case <-returnPipe.uniPipe: + case <-muxer.cfdToEdge: t.Error("Unexpected reply when router is disabled") } @@ -143,66 +139,83 @@ func TestRouterCheckEnabled(t *testing.T) { <-routerStopped } -func assertTTLExceed(t *testing.T, originalPacket *ICMP, expectedSrc netip.Addr, upstream *mockUpstream, returnPipe *mockFunnelUniPipe) { - encoder := NewEncoder() +func assertTTLExceed(t *testing.T, originalPacket *packet.ICMP, expectedSrc netip.Addr, muxer *mockMuxer) { + encoder := packet.NewEncoder() rawPacket, err := encoder.Encode(originalPacket) require.NoError(t, err) - upstream.source <- rawPacket + muxer.edgeToCfd <- quicpogs.RawPacket(rawPacket) - resp := <-returnPipe.uniPipe - decoder := NewICMPDecoder() - decoded, err := decoder.Decode(resp) + resp := <-muxer.cfdToEdge + decoder := packet.NewICMPDecoder() + decoded, err := decoder.Decode(packet.RawPacket(resp.(quicpogs.RawPacket))) require.NoError(t, err) require.Equal(t, expectedSrc, decoded.Src) require.Equal(t, originalPacket.Src, decoded.Dst) require.Equal(t, originalPacket.Protocol, decoded.Protocol) - require.Equal(t, DefaultTTL, decoded.TTL) + require.Equal(t, packet.DefaultTTL, decoded.TTL) if originalPacket.Dst.Is4() { require.Equal(t, ipv4.ICMPTypeTimeExceeded, decoded.Type) } else { require.Equal(t, ipv6.ICMPTypeTimeExceeded, decoded.Type) } require.Equal(t, 0, decoded.Code) - assertICMPChecksum(t, decoded) timeExceed, ok := decoded.Body.(*icmp.TimeExceeded) require.True(t, ok) require.True(t, bytes.Equal(rawPacket.Data, timeExceed.Data)) } -type mockUpstream struct { - source chan RawPacket +type mockMuxer struct { + cfdToEdge chan quicpogs.Packet + edgeToCfd chan quicpogs.Packet } -func (ms *mockUpstream) send(pk Packet) error { - encoder := NewEncoder() - rawPacket, err := encoder.Encode(pk) - if err != nil { - return err +func newMockMuxer(capacity int) *mockMuxer { + return &mockMuxer{ + cfdToEdge: make(chan quicpogs.Packet, capacity), + edgeToCfd: make(chan quicpogs.Packet, capacity), } - ms.source <- rawPacket +} + +// Copy packet, because icmpProxy expects the encoder buffer to be reusable after the packet is sent +func (mm *mockMuxer) SendPacket(pk quicpogs.Packet) error { + payload := pk.Payload() + copiedPayload := make([]byte, len(payload)) + copy(copiedPayload, payload) + + metadata := pk.Metadata() + copiedMetadata := make([]byte, len(metadata)) + copy(copiedMetadata, metadata) + + var copiedPacket quicpogs.Packet + switch pk.Type() { + case quicpogs.DatagramTypeIP: + copiedPacket = quicpogs.RawPacket(packet.RawPacket{ + Data: copiedPayload, + }) + case quicpogs.DatagramTypeIPWithTrace: + copiedPacket = &quicpogs.TracedPacket{ + Packet: packet.RawPacket{ + Data: copiedPayload, + }, + TracingIdentity: copiedMetadata, + } + default: + return fmt.Errorf("unexpected metadata type %d", pk.Type()) + } + mm.cfdToEdge <- copiedPacket return nil } -func (ms *mockUpstream) ReceivePacket(ctx context.Context) (RawPacket, error) { +func (mm *mockMuxer) ReceivePacket(ctx context.Context) (quicpogs.Packet, error) { select { case <-ctx.Done(): - return RawPacket{}, ctx.Err() - case pk := <-ms.source: + return nil, ctx.Err() + case pk := <-mm.edgeToCfd: return pk, nil } } -type mockICMPRouter struct{} - -func (mir mockICMPRouter) Serve(ctx context.Context) error { - return fmt.Errorf("Serve not implemented by mockICMPRouter") -} - -func (mir mockICMPRouter) Request(pk *ICMP, responder FunnelUniPipe) error { - return fmt.Errorf("Request not implemented by mockICMPRouter") -} - type routerEnabledChecker struct { enabled uint32 } diff --git a/packet/funnel.go b/packet/funnel.go index f0124070..547161bc 100644 --- a/packet/funnel.go +++ b/packet/funnel.go @@ -16,10 +16,6 @@ var ( // Funnel is an abstraction to pipe from 1 src to 1 or more destinations type Funnel interface { - // SendToDst sends a raw packet to a destination - SendToDst(dst netip.Addr, pk RawPacket) error - // ReturnToSrc returns a raw packet to the source - ReturnToSrc(pk RawPacket) error // LastActive returns the last time SendToDst or ReturnToSrc is called LastActive() time.Time // Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error @@ -36,73 +32,26 @@ type FunnelUniPipe interface { Close() error } -// RawPacketFunnel is an implementation of Funnel that sends raw packets. It can be embedded in other structs to -// satisfy the Funnel interface. -type RawPacketFunnel struct { - Src netip.Addr +type ActivityTracker struct { // last active unix time. Unit is seconds lastActive int64 - sendPipe FunnelUniPipe - returnPipe FunnelUniPipe } -func NewRawPacketFunnel(src netip.Addr, sendPipe, returnPipe FunnelUniPipe) *RawPacketFunnel { - return &RawPacketFunnel{ - Src: src, +func NewActivityTracker() *ActivityTracker { + return &ActivityTracker{ lastActive: time.Now().Unix(), - sendPipe: sendPipe, - returnPipe: returnPipe, } } -func (rpf *RawPacketFunnel) SendToDst(dst netip.Addr, pk RawPacket) error { - rpf.updateLastActive() - return rpf.sendPipe.SendPacket(dst, pk) +func (at *ActivityTracker) UpdateLastActive() { + atomic.StoreInt64(&at.lastActive, time.Now().Unix()) } -func (rpf *RawPacketFunnel) ReturnToSrc(pk RawPacket) error { - rpf.updateLastActive() - return rpf.returnPipe.SendPacket(rpf.Src, pk) -} - -func (rpf *RawPacketFunnel) updateLastActive() { - atomic.StoreInt64(&rpf.lastActive, time.Now().Unix()) -} - -func (rpf *RawPacketFunnel) LastActive() time.Time { - lastActive := atomic.LoadInt64(&rpf.lastActive) +func (at *ActivityTracker) LastActive() time.Time { + lastActive := atomic.LoadInt64(&at.lastActive) return time.Unix(lastActive, 0) } -func (rpf *RawPacketFunnel) Close() error { - sendPipeErr := rpf.sendPipe.Close() - returnPipeErr := rpf.returnPipe.Close() - if sendPipeErr != nil { - return sendPipeErr - } - if returnPipeErr != nil { - return returnPipeErr - } - return nil -} - -func (rpf *RawPacketFunnel) Equal(other Funnel) bool { - otherRawFunnel, ok := other.(*RawPacketFunnel) - if !ok { - return false - } - if rpf.Src != otherRawFunnel.Src { - return false - } - if rpf.sendPipe != otherRawFunnel.sendPipe { - return false - } - if rpf.returnPipe != otherRawFunnel.returnPipe { - return false - } - return true -} - // FunnelID represents a key type that can be used by FunnelTracker type FunnelID interface { // Type returns the name of the type that implements the FunnelID diff --git a/packet/router.go b/packet/router.go deleted file mode 100644 index 29a5d839..00000000 --- a/packet/router.go +++ /dev/null @@ -1,108 +0,0 @@ -package packet - -import ( - "context" - "net/netip" - - "github.com/rs/zerolog" -) - -// ICMPRouter sends ICMP messages and listens for their responses -type ICMPRouter interface { - // Serve starts listening for responses to the requests until context is done - Serve(ctx context.Context) error - // Request sends an ICMP message. Implementations should not modify pk after the function returns. - Request(pk *ICMP, responder FunnelUniPipe) error -} - -// Upstream of raw packets -type Upstream interface { - // ReceivePacket waits for the next raw packet from upstream - ReceivePacket(ctx context.Context) (RawPacket, error) -} - -// Router routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets -type Router struct { - upstream Upstream - returnPipe FunnelUniPipe - globalConfig *GlobalRouterConfig - logger *zerolog.Logger - checkRouterEnabledFunc func() bool -} - -// GlobalRouterConfig is the configuration shared by all instance of Router. -type GlobalRouterConfig struct { - ICMPRouter ICMPRouter - IPv4Src netip.Addr - IPv6Src netip.Addr - Zone string -} - -func NewRouter(globalConfig *GlobalRouterConfig, upstream Upstream, returnPipe FunnelUniPipe, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *Router { - return &Router{ - upstream: upstream, - returnPipe: returnPipe, - globalConfig: globalConfig, - logger: logger, - checkRouterEnabledFunc: checkRouterEnabledFunc, - } -} - -func (r *Router) Serve(ctx context.Context) error { - icmpDecoder := NewICMPDecoder() - encoder := NewEncoder() - for { - rawPacket, err := r.upstream.ReceivePacket(ctx) - if err != nil { - return err - } - - // Drop packets if ICMPRouter wasn't created - if r.globalConfig == nil { - continue - } - - if enabled := r.checkRouterEnabledFunc(); !enabled { - continue - } - - icmpPacket, err := icmpDecoder.Decode(rawPacket) - if err != nil { - r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram") - continue - } - - if icmpPacket.TTL <= 1 { - if err := r.sendTTLExceedMsg(icmpPacket, rawPacket, encoder); err != nil { - r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error") - } - continue - } - icmpPacket.TTL-- - - if err := r.globalConfig.ICMPRouter.Request(icmpPacket, r.returnPipe); err != nil { - r.logger.Err(err). - Str("src", icmpPacket.Src.String()). - Str("dst", icmpPacket.Dst.String()). - Interface("type", icmpPacket.Type). - Msg("Failed to send ICMP packet") - continue - } - } -} - -func (r *Router) sendTTLExceedMsg(pk *ICMP, rawPacket RawPacket, encoder *Encoder) error { - var srcIP netip.Addr - if pk.Dst.Is4() { - srcIP = r.globalConfig.IPv4Src - } else { - srcIP = r.globalConfig.IPv6Src - } - ttlExceedPacket := NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP) - - encodedTTLExceed, err := encoder.Encode(ttlExceedPacket) - if err != nil { - return err - } - return r.returnPipe.SendPacket(pk.Src, encodedTTLExceed) -} diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 6c0c4cd7..95271ab4 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -20,8 +20,8 @@ import ( "github.com/cloudflare/cloudflared/edgediscovery" "github.com/cloudflare/cloudflared/edgediscovery/allregions" "github.com/cloudflare/cloudflared/h2mux" + "github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/orchestration" - "github.com/cloudflare/cloudflared/packet" quicpogs "github.com/cloudflare/cloudflared/quic" "github.com/cloudflare/cloudflared/retry" "github.com/cloudflare/cloudflared/signal" @@ -70,7 +70,7 @@ type TunnelConfig struct { MuxerConfig *connection.MuxerConfig ProtocolSelector connection.ProtocolSelector EdgeTLSConfigs map[connection.Protocol]*tls.Config - PacketConfig *packet.GlobalRouterConfig + PacketConfig *ingress.GlobalRouterConfig } func (c *TunnelConfig) registrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {