diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 20509b2f..2b1c9cdb 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -27,7 +27,6 @@ type icmpProxy struct { echoIDTracker *echoIDTracker conn *icmp.PacketConn logger *zerolog.Logger - encoder *packet.Encoder } // echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end, @@ -112,13 +111,8 @@ func (snf echoFlowID) String() string { return strconv.FormatUint(uint64(snf), 10) } -func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) { - network := "udp6" - if listenIP.To4() != nil { - network = "udp4" - } - // Opens a non-privileged ICMP socket - conn, err := icmp.ListenPacket(network, listenIP.String()) +func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) { + conn, err := newICMPConn(listenIP) if err != nil { return nil, err } @@ -127,11 +121,13 @@ func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) { echoIDTracker: newEchoIDTracker(), conn: conn, logger: logger, - encoder: packet.NewEncoder(), }, nil } func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error { + if pk == nil { + return errPacketNil + } switch body := pk.Message.Body.(type) { case *icmp.Echo: return ip.sendICMPEchoRequest(pk, body, responder) @@ -140,12 +136,14 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) er } } -func (ip *icmpProxy) ListenResponse(ctx context.Context) error { +// Serve listens for responses to the requests until context is done +func (ip *icmpProxy) Serve(ctx context.Context) error { go func() { <-ctx.Done() ip.conn.Close() }() - buf := make([]byte, 1500) + buf := make([]byte, mtu) + encoder := packet.NewEncoder() for { n, src, err := ip.conn.ReadFrom(buf) if err != nil { @@ -159,7 +157,7 @@ func (ip *icmpProxy) ListenResponse(ctx context.Context) error { } switch body := msg.Body.(type) { case *icmp.Echo: - if err := ip.handleEchoResponse(msg, body); err != nil { + if err := ip.handleEchoResponse(encoder, msg, body); err != nil { ip.logger.Error().Err(err). Str("src", src.String()). Str("flowID", echoFlowID(body.ID).String()). @@ -206,7 +204,7 @@ func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, respo return err } -func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) error { +func (ip *icmpProxy) handleEchoResponse(encoder *packet.Encoder, msg *icmp.Message, echo *icmp.Echo) error { flowID := echoFlowID(echo.ID) flow, ok := ip.srcFlowTracker.Get(flowID) if !ok { @@ -220,7 +218,7 @@ func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) erro }, Message: msg, } - serializedPacket, err := ip.encoder.Encode(&icmpPacket) + serializedPacket, err := encoder.Encode(&icmpPacket) if err != nil { return errors.Wrap(err, "Failed to encode ICMP message") } diff --git a/ingress/icmp_generic.go b/ingress/icmp_generic.go index dafa96e7..e2c9aae7 100644 --- a/ingress/icmp_generic.go +++ b/ingress/icmp_generic.go @@ -1,15 +1,15 @@ -//go:build !darwin +//go:build !darwin && !linux package ingress import ( "fmt" - "net" + "net/netip" "runtime" "github.com/rs/zerolog" ) -func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) { +func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) { return nil, fmt.Errorf("ICMP proxy is not implemented on %s", runtime.GOOS) } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go new file mode 100644 index 00000000..9803bb37 --- /dev/null +++ b/ingress/icmp_linux.go @@ -0,0 +1,267 @@ +//go:build linux + +package ingress + +import ( + "context" + "fmt" + "net" + "net/netip" + "sync" + "sync/atomic" + "time" + + "github.com/google/gopacket/layers" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "golang.org/x/net/icmp" + + "github.com/cloudflare/cloudflared/packet" +) + +// The request echo ID is rewritten to the port of the socket. The kernel uses the reply echo ID to demultiplex +// We can open a socket for each source so multiple sources requesting the same destination doesn't collide +type icmpProxy struct { + srcToFlowTracker *srcToFlowTracker + listenIP netip.Addr + logger *zerolog.Logger + shutdownC chan struct{} +} + +func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) { + if err := testPermission(listenIP); err != nil { + return nil, err + } + return &icmpProxy{ + srcToFlowTracker: newSrcToConnTracker(), + listenIP: listenIP, + logger: logger, + shutdownC: make(chan struct{}), + }, nil +} + +func testPermission(listenIP netip.Addr) error { + // Opens a non-privileged ICMP socket. On Linux the group ID of the process needs to be in ping_group_range + // For more information, see https://man7.org/linux/man-pages/man7/icmp.7.html and https://lwn.net/Articles/422330/ + conn, err := newICMPConn(listenIP) + if err != nil { + // TODO: TUN-6715 check if cloudflared is in ping_group_range if the check failed. If not log instruction to + // change the group ID + return err + } + // This conn is only to test if cloudflared has permission to open this type of socket + conn.Close() + return nil +} + +func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error { + if pk == nil { + return errPacketNil + } + switch body := pk.Message.Body.(type) { + case *icmp.Echo: + return ip.sendICMPEchoRequest(pk, body, responder) + default: + return fmt.Errorf("sending ICMP %s is not implemented", pk.Type) + } +} + +func (ip *icmpProxy) Serve(ctx context.Context) error { + <-ctx.Done() + close(ip.shutdownC) + return ctx.Err() +} + +func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error { + icmpFlow, ok := ip.srcToFlowTracker.get(pk.Src) + if ok { + return icmpFlow.send(pk) + } + + conn, err := newICMPConn(ip.listenIP) + if err != nil { + return err + } + flow := packet.Flow{ + Src: pk.Src, + Dst: pk.Dst, + Responder: responder, + } + icmpFlow = newICMPFlow(conn, &flow, uint16(echo.ID), ip.logger) + go func() { + defer ip.srcToFlowTracker.delete(pk.Src) + + if err := icmpFlow.serve(ip.shutdownC, defaultCloseAfterIdle); err != nil { + ip.logger.Debug().Err(err).Uint16("flowID", icmpFlow.echoID).Msg("flow terminated") + } + }() + ip.srcToFlowTracker.set(pk.Src, icmpFlow) + return icmpFlow.send(pk) +} + +type srcIPFlowID netip.Addr + +func (sifd srcIPFlowID) Type() string { + return "srcIP" +} + +func (sifd srcIPFlowID) String() string { + return netip.Addr(sifd).String() +} + +type srcToFlowTracker struct { + lock sync.RWMutex + // srcIPToConn tracks source IP to ICMP connection + srcToFlow map[netip.Addr]*icmpFlow +} + +func newSrcToConnTracker() *srcToFlowTracker { + return &srcToFlowTracker{ + srcToFlow: make(map[netip.Addr]*icmpFlow), + } +} + +func (sft *srcToFlowTracker) get(srcIP netip.Addr) (*icmpFlow, bool) { + sft.lock.RLock() + defer sft.lock.RUnlock() + + flow, ok := sft.srcToFlow[srcIP] + return flow, ok +} + +func (sft *srcToFlowTracker) set(srcIP netip.Addr, flow *icmpFlow) { + sft.lock.Lock() + defer sft.lock.Unlock() + + sft.srcToFlow[srcIP] = flow +} + +func (sft *srcToFlowTracker) delete(srcIP netip.Addr) { + sft.lock.Lock() + defer sft.lock.Unlock() + + delete(sft.srcToFlow, srcIP) +} + +type icmpFlow struct { + conn *icmp.PacketConn + flow *packet.Flow + echoID uint16 + // last active unix time. Unit is seconds + lastActive int64 + logger *zerolog.Logger +} + +func newICMPFlow(conn *icmp.PacketConn, flow *packet.Flow, echoID uint16, logger *zerolog.Logger) *icmpFlow { + return &icmpFlow{ + conn: conn, + flow: flow, + echoID: echoID, + lastActive: time.Now().Unix(), + logger: logger, + } +} + +func (f *icmpFlow) serve(shutdownC chan struct{}, closeAfterIdle time.Duration) error { + errC := make(chan error) + go func() { + errC <- f.listenResponse() + }() + + checkIdleTicker := time.NewTicker(closeAfterIdle) + defer f.conn.Close() + defer checkIdleTicker.Stop() + for { + select { + case err := <-errC: + return err + case <-shutdownC: + return nil + case <-checkIdleTicker.C: + now := time.Now().Unix() + lastActive := atomic.LoadInt64(&f.lastActive) + if now > lastActive+int64(closeAfterIdle.Seconds()) { + return errFlowInactive + } + } + } +} + +func (f *icmpFlow) send(pk *packet.ICMP) error { + f.updateLastActive() + + // For IPv4, the pseudoHeader is not used because the checksum is always calculated + var pseudoHeader []byte = nil + serializedMsg, err := pk.Marshal(pseudoHeader) + if err != nil { + return errors.Wrap(err, "Failed to encode ICMP message") + } + // The address needs to be of type UDPAddr when conn is created without priviledge + _, err = f.conn.WriteTo(serializedMsg, &net.UDPAddr{ + IP: pk.Dst.AsSlice(), + }) + return err +} + +func (f *icmpFlow) listenResponse() error { + buf := make([]byte, mtu) + encoder := packet.NewEncoder() + for { + n, src, err := f.conn.ReadFrom(buf) + if err != nil { + return err + } + f.updateLastActive() + + if err := f.handleResponse(encoder, src, buf[:n]); err != nil { + f.logger.Err(err).Str("dst", src.String()).Msg("Failed to handle ICMP response") + continue + } + } +} + +func (f *icmpFlow) handleResponse(encoder *packet.Encoder, from net.Addr, rawPacket []byte) error { + // TODO: TUN-6654 Check for IPv6 + msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), rawPacket) + if err != nil { + return err + } + + echo, ok := msg.Body.(*icmp.Echo) + if !ok { + return fmt.Errorf("received unexpected icmp type %s from non-privileged ICMP socket", msg.Type) + } + + addrPort, err := netip.ParseAddrPort(from.String()) + if err != nil { + return err + } + icmpPacket := packet.ICMP{ + IP: &packet.IP{ + Src: addrPort.Addr(), + Dst: f.flow.Src, + Protocol: layers.IPProtocol(msg.Type.Protocol()), + }, + Message: &icmp.Message{ + Type: msg.Type, + Code: msg.Code, + Body: &icmp.Echo{ + ID: int(f.echoID), + Seq: echo.Seq, + Data: echo.Data, + }, + }, + } + serializedPacket, err := encoder.Encode(&icmpPacket) + if err != nil { + return errors.Wrap(err, "Failed to encode ICMP message") + } + if err := f.flow.Responder.SendPacket(serializedPacket); err != nil { + return errors.Wrap(err, "Failed to send packet to the edge") + } + return nil +} + +func (f *icmpFlow) updateLastActive() { + atomic.StoreInt64(&f.lastActive, time.Now().Unix()) +} diff --git a/ingress/icmp_linux_test.go b/ingress/icmp_linux_test.go new file mode 100644 index 00000000..40aee7a3 --- /dev/null +++ b/ingress/icmp_linux_test.go @@ -0,0 +1,52 @@ +//go:build linux + +package ingress + +import ( + "errors" + "net" + "net/netip" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cloudflare/cloudflared/packet" +) + +func TestCloseIdleFlow(t *testing.T) { + const ( + echoID = 19234 + idleTimeout = time.Millisecond * 100 + ) + conn, err := newICMPConn(localhostIP) + require.NoError(t, err) + flow := packet.Flow{ + Src: netip.MustParseAddr("172.16.0.1"), + } + icmpFlow := newICMPFlow(conn, &flow, echoID, &noopLogger) + shutdownC := make(chan struct{}) + flowErr := make(chan error) + go func() { + flowErr <- icmpFlow.serve(shutdownC, idleTimeout) + }() + + require.Equal(t, errFlowInactive, <-flowErr) +} + +func TestCloseConnStopFlow(t *testing.T) { + const ( + echoID = 19234 + ) + conn, err := newICMPConn(localhostIP) + require.NoError(t, err) + flow := packet.Flow{ + Src: netip.MustParseAddr("172.16.0.1"), + } + icmpFlow := newICMPFlow(conn, &flow, echoID, &noopLogger) + shutdownC := make(chan struct{}) + conn.Close() + + err = icmpFlow.serve(shutdownC, defaultCloseAfterIdle) + require.True(t, errors.Is(err, net.ErrClosed)) +} diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index aabaa51f..0ac851be 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -2,21 +2,43 @@ package ingress import ( "context" - "net" + "fmt" + "net/netip" + "time" "github.com/rs/zerolog" + "golang.org/x/net/icmp" "github.com/cloudflare/cloudflared/packet" ) +const ( + defaultCloseAfterIdle = time.Second * 15 + mtu = 1500 +) + +var ( + errFlowInactive = fmt.Errorf("flow is inactive") + errPacketNil = fmt.Errorf("packet is nil") +) + // ICMPProxy sends ICMP messages and listens for their responses type ICMPProxy interface { + // Serve starts listening for responses to the requests until context is done + Serve(ctx context.Context) error // Request sends an ICMP message Request(pk *packet.ICMP, responder packet.FlowResponder) error - // ListenResponse listens for responses to the requests until context is done - ListenResponse(ctx context.Context) error } -func NewICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) { +func NewICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) { return newICMPProxy(listenIP, logger) } + +// Opens a non-privileged ICMP socket on Linux and Darwin +func newICMPConn(listenIP netip.Addr) (*icmp.PacketConn, error) { + network := "udp6" + if listenIP.Is4() { + network = "udp4" + } + return icmp.ListenPacket(network, listenIP.String()) +} diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index eb03a008..b38f2cd9 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -24,19 +24,19 @@ var ( // TestICMPProxyEcho makes sure we can send ICMP echo via the Request method and receives response via the // ListenResponse method func TestICMPProxyEcho(t *testing.T) { - skipNonDarwin(t) + onlyDarwinOrLinux(t) const ( echoID = 36571 endSeq = 100 ) - proxy, err := NewICMPProxy(localhostIP.AsSlice(), &noopLogger) + proxy, err := NewICMPProxy(localhostIP, &noopLogger) require.NoError(t, err) proxyDone := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) go func() { - proxy.ListenResponse(ctx) + proxy.Serve(ctx) close(proxyDone) }() @@ -72,7 +72,7 @@ func TestICMPProxyEcho(t *testing.T) { // TestICMPProxyRejectNotEcho makes sure it rejects messages other than echo func TestICMPProxyRejectNotEcho(t *testing.T) { - skipNonDarwin(t) + onlyDarwinOrLinux(t) msgs := []icmp.Message{ { Type: ipv4.ICMPTypeDestinationUnreachable, @@ -97,7 +97,7 @@ func TestICMPProxyRejectNotEcho(t *testing.T) { }, }, } - proxy, err := NewICMPProxy(localhostIP.AsSlice(), &noopLogger) + proxy, err := NewICMPProxy(localhostIP, &noopLogger) require.NoError(t, err) responder := echoFlowResponder{ @@ -117,8 +117,8 @@ func TestICMPProxyRejectNotEcho(t *testing.T) { } } -func skipNonDarwin(t *testing.T) { - if runtime.GOOS != "darwin" { +func onlyDarwinOrLinux(t *testing.T) { + if runtime.GOOS != "darwin" && runtime.GOOS != "linux" { t.Skip("Cannot create non-privileged datagram-oriented ICMP endpoint on Windows") } } @@ -146,6 +146,5 @@ func (efr *echoFlowResponder) validate(t *testing.T, echoReq *packet.ICMP) { require.Equal(t, ipv4.ICMPTypeEchoReply, decoded.Type) require.Equal(t, 0, decoded.Code) require.NotZero(t, decoded.Checksum) - // TODO: TUN-6586: Enable this validation when ICMP echo ID matches on Linux require.Equal(t, echoReq.Body, decoded.Body) } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index bc076f0c..52ce46a4 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "net" + "net/netip" "strings" "time" @@ -117,9 +117,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato connAwareLogger: log, } if useDatagramV2(config) { - // For non-privileged datagram-oriented ICMP endpoints, network must be "udp4" or "udp6" // TODO: TUN-6654 listen for IPv6 and decide if it should listen on specific IP - icmpProxy, err := ingress.NewICMPProxy(net.IPv4zero, config.Log) + listenIP, err := netip.ParseAddr("0.0.0.0") + if err != nil { + return nil, err + } + icmpProxy, err := ingress.NewICMPProxy(listenIP, config.Log) if err != nil { log.Logger().Warn().Err(err).Msg("Failed to create icmp proxy, will continue to use datagram v1") } else { @@ -156,7 +159,7 @@ func (s *Supervisor) Run( ) error { if s.edgeTunnelServer.icmpProxy != nil { go func() { - if err := s.edgeTunnelServer.icmpProxy.ListenResponse(ctx); err != nil { + if err := s.edgeTunnelServer.icmpProxy.Serve(ctx); err != nil { s.log.Logger().Err(err).Msg("icmp proxy terminated") } }()