diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go new file mode 100644 index 00000000..20509b2f --- /dev/null +++ b/ingress/icmp_darwin.go @@ -0,0 +1,231 @@ +//go:build darwin + +package ingress + +import ( + "context" + "fmt" + "math" + "net" + "net/netip" + "strconv" + "sync" + + "github.com/google/gopacket/layers" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "golang.org/x/net/icmp" + + "github.com/cloudflare/cloudflared/packet" +) + +// TODO: TUN-6654 Extend support to IPv6 +// On Darwin, a non-privileged ICMP socket can read messages from all echo IDs, so we use it for all sources. +type icmpProxy struct { + // TODO: TUN-6588 clean up flows + srcFlowTracker *packet.FlowTracker + echoIDTracker *echoIDTracker + conn *icmp.PacketConn + logger *zerolog.Logger + encoder *packet.Encoder +} + +// echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end, +// then from the beginning to lastAssignment. +// ICMP echo are short lived. By the time an ID is revisited, it should have been released. +type echoIDTracker struct { + lock sync.RWMutex + // maps the source IP to an echo ID obtained from assignment + srcIPMapping map[netip.Addr]uint16 + // assignment tracks if an ID is assigned using index as the ID + // The size of the array is math.MaxUint16 because echo ID is 2 bytes + assignment [math.MaxUint16]bool + // nextAssignment is the next number to check for assigment + nextAssignment uint16 +} + +func newEchoIDTracker() *echoIDTracker { + return &echoIDTracker{ + srcIPMapping: make(map[netip.Addr]uint16), + } +} + +func (eit *echoIDTracker) get(srcIP netip.Addr) (uint16, bool) { + eit.lock.RLock() + defer eit.lock.RUnlock() + id, ok := eit.srcIPMapping[srcIP] + return id, ok +} + +func (eit *echoIDTracker) assign(srcIP netip.Addr) (uint16, bool) { + eit.lock.Lock() + defer eit.lock.Unlock() + + if eit.nextAssignment == math.MaxUint16 { + eit.nextAssignment = 0 + } + + for i, assigned := range eit.assignment[eit.nextAssignment:] { + if !assigned { + echoID := uint16(i) + eit.nextAssignment + eit.set(srcIP, echoID) + return echoID, true + } + } + for i, assigned := range eit.assignment[0:eit.nextAssignment] { + if !assigned { + echoID := uint16(i) + eit.set(srcIP, echoID) + return echoID, true + } + } + return 0, false +} + +// Caller should hold the lock +func (eit *echoIDTracker) set(srcIP netip.Addr, echoID uint16) { + eit.assignment[echoID] = true + eit.srcIPMapping[srcIP] = echoID + eit.nextAssignment = echoID + 1 +} + +func (eit *echoIDTracker) release(srcIP netip.Addr, id uint16) bool { + eit.lock.Lock() + defer eit.lock.Unlock() + + currentID, ok := eit.srcIPMapping[srcIP] + if ok && id == currentID { + delete(eit.srcIPMapping, srcIP) + eit.assignment[id] = false + return true + } + return false +} + +type echoFlowID uint16 + +func (snf echoFlowID) Type() string { + return "echoID" +} + +func (snf echoFlowID) String() string { + return strconv.FormatUint(uint64(snf), 10) +} + +func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) { + network := "udp6" + if listenIP.To4() != nil { + network = "udp4" + } + // Opens a non-privileged ICMP socket + conn, err := icmp.ListenPacket(network, listenIP.String()) + if err != nil { + return nil, err + } + return &icmpProxy{ + srcFlowTracker: packet.NewFlowTracker(), + echoIDTracker: newEchoIDTracker(), + conn: conn, + logger: logger, + encoder: packet.NewEncoder(), + }, nil +} + +func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error { + switch body := pk.Message.Body.(type) { + case *icmp.Echo: + return ip.sendICMPEchoRequest(pk, body, responder) + default: + return fmt.Errorf("sending ICMP %s is not implemented", pk.Type) + } +} + +func (ip *icmpProxy) ListenResponse(ctx context.Context) error { + go func() { + <-ctx.Done() + ip.conn.Close() + }() + buf := make([]byte, 1500) + for { + n, src, err := ip.conn.ReadFrom(buf) + if err != nil { + return err + } + // TODO: TUN-6654 Check for IPv6 + msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), buf[:n]) + if err != nil { + ip.logger.Error().Err(err).Str("src", src.String()).Msg("Failed to parse ICMP message") + continue + } + switch body := msg.Body.(type) { + case *icmp.Echo: + if err := ip.handleEchoResponse(msg, body); err != nil { + ip.logger.Error().Err(err). + Str("src", src.String()). + Str("flowID", echoFlowID(body.ID).String()). + Msg("Failed to handle ICMP response") + continue + } + default: + ip.logger.Warn(). + Str("icmpType", fmt.Sprintf("%s", msg.Type)). + Msgf("Responding to this type of ICMP is not implemented") + continue + } + } +} + +func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error { + echoID, ok := ip.echoIDTracker.get(pk.Src) + if !ok { + echoID, ok = ip.echoIDTracker.assign(pk.Src) + if !ok { + return fmt.Errorf("failed to assign unique echo ID") + } + flowID := echoFlowID(echoID) + flow := packet.Flow{ + Src: pk.Src, + Dst: pk.Dst, + Responder: responder, + } + if replaced := ip.srcFlowTracker.Register(flowID, &flow, true); replaced { + ip.logger.Info().Str("src", flow.Src.String()).Str("dst", flow.Dst.String()).Msg("Replaced flow") + } + } + + echo.ID = int(echoID) + var pseudoHeader []byte = nil + serializedMsg, err := pk.Marshal(pseudoHeader) + if err != nil { + return errors.Wrap(err, "Failed to encode ICMP message") + } + // The address needs to be of type UDPAddr when conn is created without priviledge + _, err = ip.conn.WriteTo(serializedMsg, &net.UDPAddr{ + IP: pk.Dst.AsSlice(), + }) + return err +} + +func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) error { + flowID := echoFlowID(echo.ID) + flow, ok := ip.srcFlowTracker.Get(flowID) + if !ok { + return fmt.Errorf("flow not found") + } + icmpPacket := packet.ICMP{ + IP: &packet.IP{ + Src: flow.Dst, + Dst: flow.Src, + Protocol: layers.IPProtocol(msg.Type.Protocol()), + }, + Message: msg, + } + serializedPacket, err := ip.encoder.Encode(&icmpPacket) + if err != nil { + return errors.Wrap(err, "Failed to encode ICMP message") + } + if err := flow.Responder.SendPacket(serializedPacket); err != nil { + return errors.Wrap(err, "Failed to send packet to the edge") + } + return nil +} diff --git a/ingress/icmp_darwin_test.go b/ingress/icmp_darwin_test.go new file mode 100644 index 00000000..8baccb58 --- /dev/null +++ b/ingress/icmp_darwin_test.go @@ -0,0 +1,91 @@ +//go:build darwin + +package ingress + +import ( + "math" + "net/netip" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSingleEchoIDTracker(t *testing.T) { + tracker := newEchoIDTracker() + srcIP := netip.MustParseAddr("127.0.0.1") + echoID, ok := tracker.get(srcIP) + require.False(t, ok) + require.Equal(t, uint16(0), echoID) + + // not assigned yet, so nothing to release + require.False(t, tracker.release(srcIP, echoID)) + + echoID, ok = tracker.assign(srcIP) + require.True(t, ok) + require.Equal(t, uint16(0), echoID) + + echoID, ok = tracker.get(srcIP) + require.True(t, ok) + require.Equal(t, uint16(0), echoID) + + // releasing a different ID returns false + require.False(t, tracker.release(srcIP, 1999)) + require.True(t, tracker.release(srcIP, echoID)) + // releasing the second time returns false + require.False(t, tracker.release(srcIP, echoID)) + + echoID, ok = tracker.get(srcIP) + require.False(t, ok) + require.Equal(t, uint16(0), echoID) + + // Move to the next IP + echoID, ok = tracker.assign(srcIP) + require.True(t, ok) + require.Equal(t, uint16(1), echoID) +} + +func TestFullEchoIDTracker(t *testing.T) { + tracker := newEchoIDTracker() + firstIP := netip.MustParseAddr("172.16.0.1") + srcIP := firstIP + + for i := uint16(0); i < math.MaxUint16; i++ { + echoID, ok := tracker.assign(srcIP) + require.True(t, ok) + require.Equal(t, i, echoID) + + echoID, ok = tracker.get(srcIP) + require.True(t, ok) + require.Equal(t, i, echoID) + srcIP = srcIP.Next() + } + + // All echo IDs are assigned + echoID, ok := tracker.assign(srcIP.Next()) + require.False(t, ok) + require.Equal(t, uint16(0), echoID) + + srcIP = firstIP + for i := uint16(0); i < math.MaxUint16; i++ { + ok := tracker.release(srcIP, i) + require.True(t, ok) + + echoID, ok = tracker.get(srcIP) + require.False(t, ok) + require.Equal(t, uint16(0), echoID) + srcIP = srcIP.Next() + } + + // The IDs are assignable again + srcIP = firstIP + for i := uint16(0); i < math.MaxUint16; i++ { + echoID, ok := tracker.assign(srcIP) + require.True(t, ok) + require.Equal(t, i, echoID) + + echoID, ok = tracker.get(srcIP) + require.True(t, ok) + require.Equal(t, i, echoID) + srcIP = srcIP.Next() + } +} diff --git a/ingress/icmp_generic.go b/ingress/icmp_generic.go new file mode 100644 index 00000000..dafa96e7 --- /dev/null +++ b/ingress/icmp_generic.go @@ -0,0 +1,15 @@ +//go:build !darwin + +package ingress + +import ( + "fmt" + "net" + "runtime" + + "github.com/rs/zerolog" +) + +func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) { + return nil, fmt.Errorf("ICMP proxy is not implemented on %s", runtime.GOOS) +} diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index 672e833e..aabaa51f 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -2,14 +2,9 @@ package ingress import ( "context" - "fmt" "net" - "strconv" - "github.com/google/gopacket/layers" - "github.com/pkg/errors" "github.com/rs/zerolog" - "golang.org/x/net/icmp" "github.com/cloudflare/cloudflared/packet" ) @@ -22,118 +17,6 @@ type ICMPProxy interface { ListenResponse(ctx context.Context) error } -// TODO: TUN-6654 Extend support to IPv6 -type icmpProxy struct { - srcFlowTracker *packet.FlowTracker - conn *icmp.PacketConn - logger *zerolog.Logger - encoder *packet.Encoder -} - -// TODO: TUN-6586: Use echo ID as FlowID -type seqNumFlowID int - -func (snf seqNumFlowID) ID() string { - return strconv.FormatInt(int64(snf), 10) -} - -func NewICMPProxy(network string, listenIP net.IP, logger *zerolog.Logger) (*icmpProxy, error) { - conn, err := icmp.ListenPacket(network, listenIP.String()) - if err != nil { - return nil, err - } - return &icmpProxy{ - srcFlowTracker: packet.NewFlowTracker(), - conn: conn, - logger: logger, - encoder: packet.NewEncoder(), - }, nil -} - -func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error { - switch body := pk.Message.Body.(type) { - case *icmp.Echo: - return ip.sendICMPEchoRequest(pk, body, responder) - default: - return fmt.Errorf("sending ICMP %s is not implemented", pk.Type) - } -} - -func (ip *icmpProxy) ListenResponse(ctx context.Context) error { - go func() { - <-ctx.Done() - ip.conn.Close() - }() - buf := make([]byte, 1500) - for { - n, src, err := ip.conn.ReadFrom(buf) - if err != nil { - return err - } - // TODO: TUN-6654 Check for IPv6 - msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), buf[:n]) - if err != nil { - ip.logger.Error().Err(err).Str("src", src.String()).Msg("Failed to parse ICMP message") - continue - } - switch body := msg.Body.(type) { - case *icmp.Echo: - if err := ip.handleEchoResponse(msg, body); err != nil { - ip.logger.Error().Err(err).Str("src", src.String()).Msg("Failed to handle ICMP response") - continue - } - default: - ip.logger.Warn(). - Str("icmpType", fmt.Sprintf("%s", msg.Type)). - Msgf("Responding to this type of ICMP is not implemented") - continue - } - } -} - -func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error { - flow := packet.Flow{ - Src: pk.Src, - Dst: pk.Dst, - Responder: responder, - } - // TODO: TUN-6586 rewrite ICMP echo request identifier and use it to track flows - flowID := seqNumFlowID(echo.Seq) - // TODO: TUN-6588 clean up flows - if replaced := ip.srcFlowTracker.Register(flowID, &flow, true); replaced { - ip.logger.Info().Str("src", flow.Src.String()).Str("dst", flow.Dst.String()).Msg("Replaced flow") - } - var pseudoHeader []byte = nil - serializedMsg, err := pk.Marshal(pseudoHeader) - if err != nil { - return errors.Wrap(err, "Failed to encode ICMP message") - } - // The address needs to be of type UDPAddr when conn is created without priviledge - _, err = ip.conn.WriteTo(serializedMsg, &net.UDPAddr{ - IP: pk.Dst.AsSlice(), - }) - return err -} - -func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) error { - flow, ok := ip.srcFlowTracker.Get(seqNumFlowID(echo.Seq)) - if !ok { - return fmt.Errorf("flow not found") - } - icmpPacket := packet.ICMP{ - IP: &packet.IP{ - Src: flow.Dst, - Dst: flow.Src, - Protocol: layers.IPProtocol(msg.Type.Protocol()), - }, - Message: msg, - } - serializedPacket, err := ip.encoder.Encode(&icmpPacket) - if err != nil { - return errors.Wrap(err, "Failed to encode ICMP message") - } - if err := flow.Responder.SendPacket(serializedPacket); err != nil { - return errors.Wrap(err, "Failed to send packet to the edge") - } - return nil +func NewICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) { + return newICMPProxy(listenIP, logger) } diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index 9ca8ebee..eb03a008 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -24,12 +24,13 @@ var ( // TestICMPProxyEcho makes sure we can send ICMP echo via the Request method and receives response via the // ListenResponse method func TestICMPProxyEcho(t *testing.T) { - skipWindows(t) + skipNonDarwin(t) const ( echoID = 36571 endSeq = 100 ) - proxy, err := NewICMPProxy("udp4", localhostIP.AsSlice(), &noopLogger) + + proxy, err := NewICMPProxy(localhostIP.AsSlice(), &noopLogger) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -71,7 +72,7 @@ func TestICMPProxyEcho(t *testing.T) { // TestICMPProxyRejectNotEcho makes sure it rejects messages other than echo func TestICMPProxyRejectNotEcho(t *testing.T) { - skipWindows(t) + skipNonDarwin(t) msgs := []icmp.Message{ { Type: ipv4.ICMPTypeDestinationUnreachable, @@ -96,7 +97,7 @@ func TestICMPProxyRejectNotEcho(t *testing.T) { }, }, } - proxy, err := NewICMPProxy("udp4", localhostIP.AsSlice(), &noopLogger) + proxy, err := NewICMPProxy(localhostIP.AsSlice(), &noopLogger) require.NoError(t, err) responder := echoFlowResponder{ @@ -116,8 +117,8 @@ func TestICMPProxyRejectNotEcho(t *testing.T) { } } -func skipWindows(t *testing.T) { - if runtime.GOOS == "windows" { +func skipNonDarwin(t *testing.T) { + if runtime.GOOS != "darwin" { t.Skip("Cannot create non-privileged datagram-oriented ICMP endpoint on Windows") } } @@ -146,5 +147,5 @@ func (efr *echoFlowResponder) validate(t *testing.T, echoReq *packet.ICMP) { require.Equal(t, 0, decoded.Code) require.NotZero(t, decoded.Checksum) // TODO: TUN-6586: Enable this validation when ICMP echo ID matches on Linux - //require.Equal(t, echoReq.Body, decoded.Body) + require.Equal(t, echoReq.Body, decoded.Body) } diff --git a/packet/flow.go b/packet/flow.go index 2b82e562..188f3063 100644 --- a/packet/flow.go +++ b/packet/flow.go @@ -2,6 +2,7 @@ package packet import ( "errors" + "fmt" "net/netip" "sync" ) @@ -12,7 +13,9 @@ var ( // FlowID represents a key type that can be used by FlowTracker type FlowID interface { - ID() string + // Type returns the name of the type that implements the FlowID + Type() string + fmt.Stringer } type Flow struct { diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 135c59a7..c6bca29e 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -119,11 +119,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato if useDatagramV2(config) { // For non-privileged datagram-oriented ICMP endpoints, network must be "udp4" or "udp6" // TODO: TUN-6654 listen for IPv6 and decide if it should listen on specific IP - icmpProxy, err := ingress.NewICMPProxy("udp4", net.IPv4zero, config.Log) + icmpProxy, err := ingress.NewICMPProxy(net.IPv4zero, config.Log) if err != nil { - return nil, err + log.Logger().Warn().Err(err).Msg("Failed to create icmp proxy, will continue to use datagram v1") + } else { + edgeTunnelServer.icmpProxy = icmpProxy } - edgeTunnelServer.icmpProxy = icmpProxy } useReconnectToken := false