TUN-6856: Refactor to lay foundation for tracing ICMP

Remove send and return methods from Funnel interface. Users of Funnel can provide their own send and return methods without wrapper to comply with the interface.
Move packet router to ingress package to avoid circular dependency
This commit is contained in:
cthuang 2022-10-13 11:01:25 +01:00
parent 225c344ceb
commit 495f9fb8bd
15 changed files with 323 additions and 337 deletions

View File

@ -22,7 +22,6 @@ import (
"github.com/cloudflare/cloudflared/cmd/cloudflared/cliutil"
"github.com/cloudflare/cloudflared/edgediscovery/allregions"
"github.com/cloudflare/cloudflared/packet"
"github.com/cloudflare/cloudflared/config"
"github.com/cloudflare/cloudflared/connection"
@ -463,7 +462,7 @@ func parseConfigIPVersion(version string) (v allregions.ConfigIPVersion, err err
return
}
func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRouterConfig, error) {
func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRouterConfig, error) {
ipv4Src, err := determineICMPv4Src(c.String("icmpv4-src"), logger)
if err != nil {
return nil, errors.Wrap(err, "failed to determine IPv4 source address for ICMP proxy")
@ -484,7 +483,7 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRout
if err != nil {
return nil, err
}
return &packet.GlobalRouterConfig{
return &ingress.GlobalRouterConfig{
ICMPRouter: icmpRouter,
IPv4Src: ipv4Src,
IPv6Src: ipv6Src,

View File

@ -57,7 +57,7 @@ type QUICConnection struct {
sessionManager datagramsession.Manager
// datagramMuxer mux/demux datagrams from quic connection
datagramMuxer *quicpogs.DatagramMuxerV2
packetRouter *packet.Router
packetRouter *ingress.PacketRouter
controlStreamHandler ControlStreamHandler
connOptions *tunnelpogs.ConnectionOptions
}
@ -72,7 +72,7 @@ func NewQUICConnection(
connOptions *tunnelpogs.ConnectionOptions,
controlStreamHandler ControlStreamHandler,
logger *zerolog.Logger,
packetRouterConfig *packet.GlobalRouterConfig,
packetRouterConfig *ingress.GlobalRouterConfig,
) (*QUICConnection, error) {
udpConn, err := createUDPConnForConnIndex(connIndex, logger)
if err != nil {
@ -93,8 +93,7 @@ func NewQUICConnection(
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
muxer := muxerWrapper{muxer: datagramMuxer}
packetRouter := packet.NewRouter(packetRouterConfig, &muxer, &muxer, logger, orchestrator.WarpRoutingEnabled)
packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger, orchestrator.WarpRoutingEnabled)
return &QUICConnection{
session: session,

View File

@ -11,7 +11,6 @@ import (
"context"
"fmt"
"math"
"net"
"net/netip"
"strconv"
"sync"
@ -129,10 +128,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle
}, nil
}
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
if pk == nil {
return errPacketNil
}
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
originalEcho, err := getICMPEcho(pk.Message)
if err != nil {
return err
@ -152,13 +148,11 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er
if err != nil {
return nil, err
}
originSender := originSender{
conn: ip.conn,
echoIDTracker: ip.echoIDTracker,
echoIDTrackerKey: echoIDTrackerKey,
assignedEchoID: assignedEchoID,
closeCallback := func() error {
ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID)
return nil
}
icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
return icmpFlow, nil
}
funnelID := echoFunnelID(assignedEchoID)
@ -250,23 +244,3 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error {
}
return icmpFlow.returnToSrc(reply)
}
// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface
type originSender struct {
conn *icmp.PacketConn
echoIDTracker *echoIDTracker
echoIDTrackerKey flow3Tuple
assignedEchoID uint16
}
func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
_, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{
IP: dst.AsSlice(),
})
return err
}
func (os *originSender) Close() error {
os.echoIDTracker.release(os.echoIDTrackerKey, os.assignedEchoID)
return nil
}

View File

@ -18,7 +18,7 @@ var errICMPProxyNotImplemented = fmt.Errorf("ICMP proxy is not implemented on %s
type icmpProxy struct{}
func (ip icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
return errICMPProxyNotImplemented
}

View File

@ -97,10 +97,7 @@ func checkInPingGroup() error {
return fmt.Errorf("did not find group range in %s", pingGroupPath)
}
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
if pk == nil {
return errPacketNil
}
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
originalEcho, err := getICMPEcho(pk.Message)
if err != nil {
return err
@ -113,13 +110,15 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er
}
ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr())
newConnChan <- conn
closeCallback := func() error {
return conn.Close()
}
localUDPAddr, ok := conn.LocalAddr().(*net.UDPAddr)
if !ok {
return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr())
}
originSender := originSender{conn: conn}
echoID := localUDPAddr.Port
icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, echoID, originalEcho.ID, packet.NewEncoder())
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder())
return icmpFlow, nil
}
funnelID := flow3Tuple{
@ -187,22 +186,6 @@ func (ip *icmpProxy) listenResponse(flow *icmpEchoFlow, conn *icmp.PacketConn) e
}
}
// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface
type originSender struct {
conn *icmp.PacketConn
}
func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
_, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{
IP: dst.AsSlice(),
})
return err
}
func (os *originSender) Close() error {
return os.conn.Close()
}
// Only linux uses flow3Tuple as FunnelID
func (ft flow3Tuple) Type() string {
return "srcIP_dstIP_echoID"

View File

@ -43,24 +43,54 @@ type flow3Tuple struct {
// icmpEchoFlow implements the packet.Funnel interface.
type icmpEchoFlow struct {
*packet.RawPacketFunnel
*packet.ActivityTracker
closeCallback func() error
src netip.Addr
originConn *icmp.PacketConn
responder *packetResponder
assignedEchoID int
originalEchoID int
// it's up to the user to ensure respEncoder is not used concurrently
respEncoder *packet.Encoder
}
func newICMPEchoFlow(src netip.Addr, sendPipe, returnPipe packet.FunnelUniPipe, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow {
func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder *packetResponder, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow {
return &icmpEchoFlow{
RawPacketFunnel: packet.NewRawPacketFunnel(src, sendPipe, returnPipe),
ActivityTracker: packet.NewActivityTracker(),
closeCallback: closeCallback,
src: src,
originConn: originConn,
responder: responder,
assignedEchoID: assignedEchoID,
originalEchoID: originalEchoID,
respEncoder: respEncoder,
}
}
func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool {
otherICMPFlow, ok := other.(*icmpEchoFlow)
if !ok {
return false
}
if otherICMPFlow.src != ief.src {
return false
}
if otherICMPFlow.originalEchoID != ief.originalEchoID {
return false
}
if otherICMPFlow.assignedEchoID != ief.assignedEchoID {
return false
}
return true
}
func (ief *icmpEchoFlow) Close() error {
return ief.closeCallback()
}
// sendToDst rewrites the echo ID to the one assigned to this flow
func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error {
ief.UpdateLastActive()
originalEcho, err := getICMPEcho(msg)
if err != nil {
return err
@ -80,17 +110,21 @@ func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error {
if err != nil {
return err
}
return ief.SendToDst(dst, packet.RawPacket{Data: serializedPacket})
_, err = ief.originConn.WriteTo(serializedPacket, &net.UDPAddr{
IP: dst.AsSlice(),
})
return err
}
// returnToSrc rewrites the echo ID to the original echo ID from the eyeball
func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error {
ief.UpdateLastActive()
reply.echo.ID = ief.originalEchoID
reply.msg.Body = reply.echo
pk := packet.ICMP{
IP: &packet.IP{
Src: reply.from,
Dst: ief.Src,
Dst: ief.src,
Protocol: layers.IPProtocol(reply.msg.Type.Protocol()),
TTL: packet.DefaultTTL,
},
@ -100,7 +134,7 @@ func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error {
if err != nil {
return err
}
return ief.ReturnToSrc(serializedPacket)
return ief.responder.returnPacket(serializedPacket)
}
type echoReply struct {

View File

@ -52,24 +52,26 @@ func TestFunnelIdleTimeout(t *testing.T) {
},
},
}
responder := echoFlowResponder{
decoder: packet.NewICMPDecoder(),
respChan: make(chan []byte),
muxer := newMockMuxer(0)
responder := packetResponder{
datagramMuxer: muxer,
}
require.NoError(t, proxy.Request(&pk, &responder))
responder.validate(t, &pk)
require.NoError(t, proxy.Request(ctx, &pk, &responder))
validateEchoFlow(t, muxer, &pk)
// Send second request, should reuse the funnel
require.NoError(t, proxy.Request(&pk, nil))
responder.validate(t, &pk)
require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{
datagramMuxer: nil,
}))
validateEchoFlow(t, muxer, &pk)
time.Sleep(idleTimeout * 2)
newResponder := echoFlowResponder{
decoder: packet.NewICMPDecoder(),
respChan: make(chan []byte),
newMuxer := newMockMuxer(0)
newResponder := packetResponder{
datagramMuxer: newMuxer,
}
require.NoError(t, proxy.Request(&pk, &newResponder))
newResponder.validate(t, &pk)
require.NoError(t, proxy.Request(ctx, &pk, &newResponder))
validateEchoFlow(t, newMuxer, &pk)
cancel()
<-proxyDone

View File

@ -265,7 +265,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
// Request sends an ICMP echo request and wait for a reply or timeout.
// The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version.
// It's possible that a slow request will block other requests, so we set the timeout to only 1s.
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
if pk == nil {
return errPacketNil
}
@ -292,7 +292,7 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er
return nil
}
func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder packet.FunnelUniPipe) error {
func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder *packetResponder) error {
var replyType icmp.Type
if request.Dst.Is4() {
replyType = ipv4.ICMPTypeEchoReply
@ -331,7 +331,7 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d
if err != nil {
return err
}
return responder.SendPacket(request.Src, serializedPacket)
return responder.returnPacket(serializedPacket)
}
func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, error) {

View File

@ -37,7 +37,9 @@ func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerol
ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout)
ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout)
if ipv4Err != nil && ipv6Err != nil {
return nil, fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err)
err := fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err)
logger.Debug().Err(err).Msg("ICMP proxy feature is disabled")
return nil, err
}
if ipv4Err != nil {
logger.Debug().Err(ipv4Err).Msg("failed to create ICMPv4 proxy, only ICMPv6 proxy is created")
@ -73,15 +75,18 @@ func (ir *icmpRouter) Serve(ctx context.Context) error {
return fmt.Errorf("ICMPv4 proxy and ICMPv6 proxy are both nil")
}
func (ir *icmpRouter) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
func (ir *icmpRouter) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
if pk == nil {
return errPacketNil
}
if pk.Dst.Is4() {
if ir.ipv4Proxy != nil {
return ir.ipv4Proxy.Request(pk, responder)
return ir.ipv4Proxy.Request(ctx, pk, responder)
}
return fmt.Errorf("ICMPv4 proxy was not instantiated")
}
if ir.ipv6Proxy != nil {
return ir.ipv6Proxy.Request(pk, responder)
return ir.ipv6Proxy.Request(ctx, pk, responder)
}
return fmt.Errorf("ICMPv6 proxy was not instantiated")
}

View File

@ -52,9 +52,9 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
close(proxyDone)
}()
responder := echoFlowResponder{
decoder: packet.NewICMPDecoder(),
respChan: make(chan []byte, 1),
muxer := newMockMuxer(1)
responder := packetResponder{
datagramMuxer: muxer,
}
protocol := layers.IPProtocolICMPv6
@ -90,8 +90,8 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
},
},
}
require.NoError(t, router.Request(&pk, &responder))
responder.validate(t, &pk)
require.NoError(t, router.Request(ctx, &pk, &responder))
validateEchoFlow(t, muxer, &pk)
}
}
cancel()
@ -123,9 +123,10 @@ func TestConcurrentRequestsToSameDst(t *testing.T) {
echoID := 38451 + i
go func() {
defer wg.Done()
responder := echoFlowResponder{
decoder: packet.NewICMPDecoder(),
respChan: make(chan []byte, 1),
muxer := newMockMuxer(1)
responder := packetResponder{
datagramMuxer: muxer,
}
for seq := 0; seq < endSeq; seq++ {
pk := &packet.ICMP{
@ -145,15 +146,15 @@ func TestConcurrentRequestsToSameDst(t *testing.T) {
},
},
}
require.NoError(t, router.Request(pk, &responder))
responder.validate(t, pk)
require.NoError(t, router.Request(ctx, pk, &responder))
validateEchoFlow(t, muxer, pk)
}
}()
go func() {
defer wg.Done()
responder := echoFlowResponder{
decoder: packet.NewICMPDecoder(),
respChan: make(chan []byte, 1),
muxer := newMockMuxer(1)
responder := packetResponder{
datagramMuxer: muxer,
}
for seq := 0; seq < endSeq; seq++ {
pk := &packet.ICMP{
@ -173,8 +174,8 @@ func TestConcurrentRequestsToSameDst(t *testing.T) {
},
},
}
require.NoError(t, router.Request(pk, &responder))
responder.validate(t, pk)
require.NoError(t, router.Request(ctx, pk, &responder))
validateEchoFlow(t, muxer, pk)
}
}()
}
@ -241,9 +242,9 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp.
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
require.NoError(t, err)
responder := echoFlowResponder{
decoder: packet.NewICMPDecoder(),
respChan: make(chan []byte),
muxer := newMockMuxer(1)
responder := packetResponder{
datagramMuxer: muxer,
}
protocol := layers.IPProtocolICMPv4
if srcDstIP.Is6() {
@ -259,7 +260,7 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp.
},
Message: &m,
}
require.Error(t, router.Request(&pk, &responder))
require.Error(t, router.Request(context.Background(), &pk, &responder))
}
}
@ -285,9 +286,10 @@ func (efr *echoFlowResponder) Close() error {
return nil
}
func (efr *echoFlowResponder) validate(t *testing.T, echoReq *packet.ICMP) {
pk := <-efr.respChan
decoded, err := efr.decoder.Decode(packet.RawPacket{Data: pk})
func validateEchoFlow(t *testing.T, muxer *mockMuxer, echoReq *packet.ICMP) {
pk := <-muxer.cfdToEdge
decoder := packet.NewICMPDecoder()
decoded, err := decoder.Decode(packet.RawPacket{Data: pk.Payload()})
require.NoError(t, err)
require.Equal(t, decoded.Src, echoReq.Dst)
require.Equal(t, decoded.Dst, echoReq.Src)

134
ingress/packet_router.go Normal file
View File

@ -0,0 +1,134 @@
package ingress
import (
"context"
"fmt"
"net/netip"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/packet"
quicpogs "github.com/cloudflare/cloudflared/quic"
)
// Upstream of raw packets
type muxer interface {
SendPacket(pk quicpogs.Packet) error
// ReceivePacket waits for the next raw packet from upstream
ReceivePacket(ctx context.Context) (quicpogs.Packet, error)
}
// PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets
type PacketRouter struct {
globalConfig *GlobalRouterConfig
muxer muxer
logger *zerolog.Logger
checkRouterEnabledFunc func() bool
icmpDecoder *packet.ICMPDecoder
encoder *packet.Encoder
}
// GlobalRouterConfig is the configuration shared by all instance of Router.
type GlobalRouterConfig struct {
ICMPRouter *icmpRouter
IPv4Src netip.Addr
IPv6Src netip.Addr
Zone string
}
// NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil.
func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *PacketRouter {
return &PacketRouter{
globalConfig: globalConfig,
muxer: muxer,
logger: logger,
checkRouterEnabledFunc: checkRouterEnabledFunc,
icmpDecoder: packet.NewICMPDecoder(),
encoder: packet.NewEncoder(),
}
}
func (r *PacketRouter) Serve(ctx context.Context) error {
for {
rawPacket, responder, err := r.nextPacket(ctx)
if err != nil {
return err
}
r.handlePacket(ctx, rawPacket, responder)
}
}
func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packetResponder, error) {
pk, err := r.muxer.ReceivePacket(ctx)
if err != nil {
return packet.RawPacket{}, nil, err
}
responder := &packetResponder{
datagramMuxer: r.muxer,
}
switch pk.Type() {
case quicpogs.DatagramTypeIP:
return packet.RawPacket{Data: pk.Payload()}, responder, nil
case quicpogs.DatagramTypeIPWithTrace:
return packet.RawPacket{}, responder, fmt.Errorf("TODO: TUN-6604 Handle IP packet with trace")
default:
return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type())
}
}
func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPacket, responder *packetResponder) {
// ICMP Proxy feature is disabled, drop packets
if r.globalConfig == nil {
return
}
if enabled := r.checkRouterEnabledFunc(); !enabled {
return
}
icmpPacket, err := r.icmpDecoder.Decode(rawPacket)
if err != nil {
r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram")
return
}
if icmpPacket.TTL <= 1 {
if err := r.sendTTLExceedMsg(ctx, icmpPacket, rawPacket, r.encoder); err != nil {
r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error")
}
return
}
icmpPacket.TTL--
if err := r.globalConfig.ICMPRouter.Request(ctx, icmpPacket, responder); err != nil {
r.logger.Err(err).
Str("src", icmpPacket.Src.String()).
Str("dst", icmpPacket.Dst.String()).
Interface("type", icmpPacket.Type).
Msg("Failed to send ICMP packet")
}
}
func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, rawPacket packet.RawPacket, encoder *packet.Encoder) error {
var srcIP netip.Addr
if pk.Dst.Is4() {
srcIP = r.globalConfig.IPv4Src
} else {
srcIP = r.globalConfig.IPv6Src
}
ttlExceedPacket := packet.NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP)
encodedTTLExceed, err := encoder.Encode(ttlExceedPacket)
if err != nil {
return err
}
return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed))
}
type packetResponder struct {
datagramMuxer muxer
}
func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error {
return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket))
}

View File

@ -1,4 +1,4 @@
package packet
package ingress
import (
"bytes"
@ -10,32 +10,28 @@ import (
"time"
"github.com/google/gopacket/layers"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"github.com/cloudflare/cloudflared/packet"
quicpogs "github.com/cloudflare/cloudflared/quic"
)
var (
noopLogger = zerolog.Nop()
packetConfig = &GlobalRouterConfig{
ICMPRouter: &mockICMPRouter{},
ICMPRouter: nil,
IPv4Src: netip.MustParseAddr("172.16.0.1"),
IPv6Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"),
}
)
func TestRouterReturnTTLExceed(t *testing.T) {
upstream := &mockUpstream{
source: make(chan RawPacket),
}
returnPipe := &mockFunnelUniPipe{
uniPipe: make(chan RawPacket),
}
muxer := newMockMuxer(0)
routerEnabled := &routerEnabledChecker{}
routerEnabled.set(true)
router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled)
router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled)
ctx, cancel := context.WithCancel(context.Background())
routerStopped := make(chan struct{})
go func() {
@ -43,8 +39,8 @@ func TestRouterReturnTTLExceed(t *testing.T) {
close(routerStopped)
}()
pk := ICMP{
IP: &IP{
pk := packet.ICMP{
IP: &packet.IP{
Src: netip.MustParseAddr("192.168.1.1"),
Dst: netip.MustParseAddr("10.0.0.1"),
Protocol: layers.IPProtocolICMPv4,
@ -60,9 +56,9 @@ func TestRouterReturnTTLExceed(t *testing.T) {
},
},
}
assertTTLExceed(t, &pk, router.globalConfig.IPv4Src, upstream, returnPipe)
pk = ICMP{
IP: &IP{
assertTTLExceed(t, &pk, router.globalConfig.IPv4Src, muxer)
pk = packet.ICMP{
IP: &packet.IP{
Src: netip.MustParseAddr("fd51:2391:523:f4ee::1"),
Dst: netip.MustParseAddr("fd51:2391:697:f4ee::2"),
Protocol: layers.IPProtocolICMPv6,
@ -78,21 +74,16 @@ func TestRouterReturnTTLExceed(t *testing.T) {
},
},
}
assertTTLExceed(t, &pk, router.globalConfig.IPv6Src, upstream, returnPipe)
assertTTLExceed(t, &pk, router.globalConfig.IPv6Src, muxer)
cancel()
<-routerStopped
}
func TestRouterCheckEnabled(t *testing.T) {
upstream := &mockUpstream{
source: make(chan RawPacket),
}
returnPipe := &mockFunnelUniPipe{
uniPipe: make(chan RawPacket),
}
muxer := newMockMuxer(0)
routerEnabled := &routerEnabledChecker{}
router := NewRouter(packetConfig, upstream, returnPipe, &noopLogger, routerEnabled.isEnabled)
router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled)
ctx, cancel := context.WithCancel(context.Background())
routerStopped := make(chan struct{})
go func() {
@ -100,8 +91,8 @@ func TestRouterCheckEnabled(t *testing.T) {
close(routerStopped)
}()
pk := ICMP{
IP: &IP{
pk := packet.ICMP{
IP: &packet.IP{
Src: netip.MustParseAddr("192.168.1.1"),
Dst: netip.MustParseAddr("10.0.0.1"),
Protocol: layers.IPProtocolICMPv4,
@ -119,23 +110,28 @@ func TestRouterCheckEnabled(t *testing.T) {
}
// router is disabled
require.NoError(t, upstream.send(&pk))
encoder := packet.NewEncoder()
encodedPacket, err := encoder.Encode(&pk)
require.NoError(t, err)
sendPacket := quicpogs.RawPacket(encodedPacket)
muxer.edgeToCfd <- sendPacket
select {
case <-time.After(time.Millisecond * 10):
case <-returnPipe.uniPipe:
case <-muxer.cfdToEdge:
t.Error("Unexpected reply when router is disabled")
}
routerEnabled.set(true)
// router is enabled, expects reply
require.NoError(t, upstream.send(&pk))
<-returnPipe.uniPipe
muxer.edgeToCfd <- sendPacket
<-muxer.cfdToEdge
routerEnabled.set(false)
// router is disabled
require.NoError(t, upstream.send(&pk))
muxer.edgeToCfd <- sendPacket
select {
case <-time.After(time.Millisecond * 10):
case <-returnPipe.uniPipe:
case <-muxer.cfdToEdge:
t.Error("Unexpected reply when router is disabled")
}
@ -143,66 +139,83 @@ func TestRouterCheckEnabled(t *testing.T) {
<-routerStopped
}
func assertTTLExceed(t *testing.T, originalPacket *ICMP, expectedSrc netip.Addr, upstream *mockUpstream, returnPipe *mockFunnelUniPipe) {
encoder := NewEncoder()
func assertTTLExceed(t *testing.T, originalPacket *packet.ICMP, expectedSrc netip.Addr, muxer *mockMuxer) {
encoder := packet.NewEncoder()
rawPacket, err := encoder.Encode(originalPacket)
require.NoError(t, err)
upstream.source <- rawPacket
muxer.edgeToCfd <- quicpogs.RawPacket(rawPacket)
resp := <-returnPipe.uniPipe
decoder := NewICMPDecoder()
decoded, err := decoder.Decode(resp)
resp := <-muxer.cfdToEdge
decoder := packet.NewICMPDecoder()
decoded, err := decoder.Decode(packet.RawPacket(resp.(quicpogs.RawPacket)))
require.NoError(t, err)
require.Equal(t, expectedSrc, decoded.Src)
require.Equal(t, originalPacket.Src, decoded.Dst)
require.Equal(t, originalPacket.Protocol, decoded.Protocol)
require.Equal(t, DefaultTTL, decoded.TTL)
require.Equal(t, packet.DefaultTTL, decoded.TTL)
if originalPacket.Dst.Is4() {
require.Equal(t, ipv4.ICMPTypeTimeExceeded, decoded.Type)
} else {
require.Equal(t, ipv6.ICMPTypeTimeExceeded, decoded.Type)
}
require.Equal(t, 0, decoded.Code)
assertICMPChecksum(t, decoded)
timeExceed, ok := decoded.Body.(*icmp.TimeExceeded)
require.True(t, ok)
require.True(t, bytes.Equal(rawPacket.Data, timeExceed.Data))
}
type mockUpstream struct {
source chan RawPacket
type mockMuxer struct {
cfdToEdge chan quicpogs.Packet
edgeToCfd chan quicpogs.Packet
}
func (ms *mockUpstream) send(pk Packet) error {
encoder := NewEncoder()
rawPacket, err := encoder.Encode(pk)
if err != nil {
return err
func newMockMuxer(capacity int) *mockMuxer {
return &mockMuxer{
cfdToEdge: make(chan quicpogs.Packet, capacity),
edgeToCfd: make(chan quicpogs.Packet, capacity),
}
ms.source <- rawPacket
}
// Copy packet, because icmpProxy expects the encoder buffer to be reusable after the packet is sent
func (mm *mockMuxer) SendPacket(pk quicpogs.Packet) error {
payload := pk.Payload()
copiedPayload := make([]byte, len(payload))
copy(copiedPayload, payload)
metadata := pk.Metadata()
copiedMetadata := make([]byte, len(metadata))
copy(copiedMetadata, metadata)
var copiedPacket quicpogs.Packet
switch pk.Type() {
case quicpogs.DatagramTypeIP:
copiedPacket = quicpogs.RawPacket(packet.RawPacket{
Data: copiedPayload,
})
case quicpogs.DatagramTypeIPWithTrace:
copiedPacket = &quicpogs.TracedPacket{
Packet: packet.RawPacket{
Data: copiedPayload,
},
TracingIdentity: copiedMetadata,
}
default:
return fmt.Errorf("unexpected metadata type %d", pk.Type())
}
mm.cfdToEdge <- copiedPacket
return nil
}
func (ms *mockUpstream) ReceivePacket(ctx context.Context) (RawPacket, error) {
func (mm *mockMuxer) ReceivePacket(ctx context.Context) (quicpogs.Packet, error) {
select {
case <-ctx.Done():
return RawPacket{}, ctx.Err()
case pk := <-ms.source:
return nil, ctx.Err()
case pk := <-mm.edgeToCfd:
return pk, nil
}
}
type mockICMPRouter struct{}
func (mir mockICMPRouter) Serve(ctx context.Context) error {
return fmt.Errorf("Serve not implemented by mockICMPRouter")
}
func (mir mockICMPRouter) Request(pk *ICMP, responder FunnelUniPipe) error {
return fmt.Errorf("Request not implemented by mockICMPRouter")
}
type routerEnabledChecker struct {
enabled uint32
}

View File

@ -16,10 +16,6 @@ var (
// Funnel is an abstraction to pipe from 1 src to 1 or more destinations
type Funnel interface {
// SendToDst sends a raw packet to a destination
SendToDst(dst netip.Addr, pk RawPacket) error
// ReturnToSrc returns a raw packet to the source
ReturnToSrc(pk RawPacket) error
// LastActive returns the last time SendToDst or ReturnToSrc is called
LastActive() time.Time
// Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error
@ -36,73 +32,26 @@ type FunnelUniPipe interface {
Close() error
}
// RawPacketFunnel is an implementation of Funnel that sends raw packets. It can be embedded in other structs to
// satisfy the Funnel interface.
type RawPacketFunnel struct {
Src netip.Addr
type ActivityTracker struct {
// last active unix time. Unit is seconds
lastActive int64
sendPipe FunnelUniPipe
returnPipe FunnelUniPipe
}
func NewRawPacketFunnel(src netip.Addr, sendPipe, returnPipe FunnelUniPipe) *RawPacketFunnel {
return &RawPacketFunnel{
Src: src,
func NewActivityTracker() *ActivityTracker {
return &ActivityTracker{
lastActive: time.Now().Unix(),
sendPipe: sendPipe,
returnPipe: returnPipe,
}
}
func (rpf *RawPacketFunnel) SendToDst(dst netip.Addr, pk RawPacket) error {
rpf.updateLastActive()
return rpf.sendPipe.SendPacket(dst, pk)
func (at *ActivityTracker) UpdateLastActive() {
atomic.StoreInt64(&at.lastActive, time.Now().Unix())
}
func (rpf *RawPacketFunnel) ReturnToSrc(pk RawPacket) error {
rpf.updateLastActive()
return rpf.returnPipe.SendPacket(rpf.Src, pk)
}
func (rpf *RawPacketFunnel) updateLastActive() {
atomic.StoreInt64(&rpf.lastActive, time.Now().Unix())
}
func (rpf *RawPacketFunnel) LastActive() time.Time {
lastActive := atomic.LoadInt64(&rpf.lastActive)
func (at *ActivityTracker) LastActive() time.Time {
lastActive := atomic.LoadInt64(&at.lastActive)
return time.Unix(lastActive, 0)
}
func (rpf *RawPacketFunnel) Close() error {
sendPipeErr := rpf.sendPipe.Close()
returnPipeErr := rpf.returnPipe.Close()
if sendPipeErr != nil {
return sendPipeErr
}
if returnPipeErr != nil {
return returnPipeErr
}
return nil
}
func (rpf *RawPacketFunnel) Equal(other Funnel) bool {
otherRawFunnel, ok := other.(*RawPacketFunnel)
if !ok {
return false
}
if rpf.Src != otherRawFunnel.Src {
return false
}
if rpf.sendPipe != otherRawFunnel.sendPipe {
return false
}
if rpf.returnPipe != otherRawFunnel.returnPipe {
return false
}
return true
}
// FunnelID represents a key type that can be used by FunnelTracker
type FunnelID interface {
// Type returns the name of the type that implements the FunnelID

View File

@ -1,108 +0,0 @@
package packet
import (
"context"
"net/netip"
"github.com/rs/zerolog"
)
// ICMPRouter sends ICMP messages and listens for their responses
type ICMPRouter interface {
// Serve starts listening for responses to the requests until context is done
Serve(ctx context.Context) error
// Request sends an ICMP message. Implementations should not modify pk after the function returns.
Request(pk *ICMP, responder FunnelUniPipe) error
}
// Upstream of raw packets
type Upstream interface {
// ReceivePacket waits for the next raw packet from upstream
ReceivePacket(ctx context.Context) (RawPacket, error)
}
// Router routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets
type Router struct {
upstream Upstream
returnPipe FunnelUniPipe
globalConfig *GlobalRouterConfig
logger *zerolog.Logger
checkRouterEnabledFunc func() bool
}
// GlobalRouterConfig is the configuration shared by all instance of Router.
type GlobalRouterConfig struct {
ICMPRouter ICMPRouter
IPv4Src netip.Addr
IPv6Src netip.Addr
Zone string
}
func NewRouter(globalConfig *GlobalRouterConfig, upstream Upstream, returnPipe FunnelUniPipe, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *Router {
return &Router{
upstream: upstream,
returnPipe: returnPipe,
globalConfig: globalConfig,
logger: logger,
checkRouterEnabledFunc: checkRouterEnabledFunc,
}
}
func (r *Router) Serve(ctx context.Context) error {
icmpDecoder := NewICMPDecoder()
encoder := NewEncoder()
for {
rawPacket, err := r.upstream.ReceivePacket(ctx)
if err != nil {
return err
}
// Drop packets if ICMPRouter wasn't created
if r.globalConfig == nil {
continue
}
if enabled := r.checkRouterEnabledFunc(); !enabled {
continue
}
icmpPacket, err := icmpDecoder.Decode(rawPacket)
if err != nil {
r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram")
continue
}
if icmpPacket.TTL <= 1 {
if err := r.sendTTLExceedMsg(icmpPacket, rawPacket, encoder); err != nil {
r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error")
}
continue
}
icmpPacket.TTL--
if err := r.globalConfig.ICMPRouter.Request(icmpPacket, r.returnPipe); err != nil {
r.logger.Err(err).
Str("src", icmpPacket.Src.String()).
Str("dst", icmpPacket.Dst.String()).
Interface("type", icmpPacket.Type).
Msg("Failed to send ICMP packet")
continue
}
}
}
func (r *Router) sendTTLExceedMsg(pk *ICMP, rawPacket RawPacket, encoder *Encoder) error {
var srcIP netip.Addr
if pk.Dst.Is4() {
srcIP = r.globalConfig.IPv4Src
} else {
srcIP = r.globalConfig.IPv6Src
}
ttlExceedPacket := NewICMPTTLExceedPacket(pk.IP, rawPacket, srcIP)
encodedTTLExceed, err := encoder.Encode(ttlExceedPacket)
if err != nil {
return err
}
return r.returnPipe.SendPacket(pk.Src, encodedTTLExceed)
}

View File

@ -20,8 +20,8 @@ import (
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/edgediscovery/allregions"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/orchestration"
"github.com/cloudflare/cloudflared/packet"
quicpogs "github.com/cloudflare/cloudflared/quic"
"github.com/cloudflare/cloudflared/retry"
"github.com/cloudflare/cloudflared/signal"
@ -70,7 +70,7 @@ type TunnelConfig struct {
MuxerConfig *connection.MuxerConfig
ProtocolSelector connection.ProtocolSelector
EdgeTLSConfigs map[connection.Protocol]*tls.Config
PacketConfig *packet.GlobalRouterConfig
PacketConfig *ingress.GlobalRouterConfig
}
func (c *TunnelConfig) registrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions {