TUN-6695: Implement ICMP proxy for linux

This commit is contained in:
cthuang 2022-08-25 12:34:19 +01:00 committed by Chung-Ting Huang
parent faa86ffeca
commit fc20a22685
7 changed files with 374 additions and 33 deletions

View File

@ -27,7 +27,6 @@ type icmpProxy struct {
echoIDTracker *echoIDTracker
conn *icmp.PacketConn
logger *zerolog.Logger
encoder *packet.Encoder
}
// echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
@ -112,13 +111,8 @@ func (snf echoFlowID) String() string {
return strconv.FormatUint(uint64(snf), 10)
}
func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
network := "udp6"
if listenIP.To4() != nil {
network = "udp4"
}
// Opens a non-privileged ICMP socket
conn, err := icmp.ListenPacket(network, listenIP.String())
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
conn, err := newICMPConn(listenIP)
if err != nil {
return nil, err
}
@ -127,11 +121,13 @@ func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
echoIDTracker: newEchoIDTracker(),
conn: conn,
logger: logger,
encoder: packet.NewEncoder(),
}, nil
}
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error {
if pk == nil {
return errPacketNil
}
switch body := pk.Message.Body.(type) {
case *icmp.Echo:
return ip.sendICMPEchoRequest(pk, body, responder)
@ -140,12 +136,14 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) er
}
}
func (ip *icmpProxy) ListenResponse(ctx context.Context) error {
// Serve listens for responses to the requests until context is done
func (ip *icmpProxy) Serve(ctx context.Context) error {
go func() {
<-ctx.Done()
ip.conn.Close()
}()
buf := make([]byte, 1500)
buf := make([]byte, mtu)
encoder := packet.NewEncoder()
for {
n, src, err := ip.conn.ReadFrom(buf)
if err != nil {
@ -159,7 +157,7 @@ func (ip *icmpProxy) ListenResponse(ctx context.Context) error {
}
switch body := msg.Body.(type) {
case *icmp.Echo:
if err := ip.handleEchoResponse(msg, body); err != nil {
if err := ip.handleEchoResponse(encoder, msg, body); err != nil {
ip.logger.Error().Err(err).
Str("src", src.String()).
Str("flowID", echoFlowID(body.ID).String()).
@ -206,7 +204,7 @@ func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, respo
return err
}
func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) error {
func (ip *icmpProxy) handleEchoResponse(encoder *packet.Encoder, msg *icmp.Message, echo *icmp.Echo) error {
flowID := echoFlowID(echo.ID)
flow, ok := ip.srcFlowTracker.Get(flowID)
if !ok {
@ -220,7 +218,7 @@ func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) erro
},
Message: msg,
}
serializedPacket, err := ip.encoder.Encode(&icmpPacket)
serializedPacket, err := encoder.Encode(&icmpPacket)
if err != nil {
return errors.Wrap(err, "Failed to encode ICMP message")
}

View File

@ -1,15 +1,15 @@
//go:build !darwin
//go:build !darwin && !linux
package ingress
import (
"fmt"
"net"
"net/netip"
"runtime"
"github.com/rs/zerolog"
)
func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
return nil, fmt.Errorf("ICMP proxy is not implemented on %s", runtime.GOOS)
}

267
ingress/icmp_linux.go Normal file
View File

@ -0,0 +1,267 @@
//go:build linux
package ingress
import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"sync/atomic"
"time"
"github.com/google/gopacket/layers"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"golang.org/x/net/icmp"
"github.com/cloudflare/cloudflared/packet"
)
// The request echo ID is rewritten to the port of the socket. The kernel uses the reply echo ID to demultiplex
// We can open a socket for each source so multiple sources requesting the same destination doesn't collide
type icmpProxy struct {
srcToFlowTracker *srcToFlowTracker
listenIP netip.Addr
logger *zerolog.Logger
shutdownC chan struct{}
}
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
if err := testPermission(listenIP); err != nil {
return nil, err
}
return &icmpProxy{
srcToFlowTracker: newSrcToConnTracker(),
listenIP: listenIP,
logger: logger,
shutdownC: make(chan struct{}),
}, nil
}
func testPermission(listenIP netip.Addr) error {
// Opens a non-privileged ICMP socket. On Linux the group ID of the process needs to be in ping_group_range
// For more information, see https://man7.org/linux/man-pages/man7/icmp.7.html and https://lwn.net/Articles/422330/
conn, err := newICMPConn(listenIP)
if err != nil {
// TODO: TUN-6715 check if cloudflared is in ping_group_range if the check failed. If not log instruction to
// change the group ID
return err
}
// This conn is only to test if cloudflared has permission to open this type of socket
conn.Close()
return nil
}
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error {
if pk == nil {
return errPacketNil
}
switch body := pk.Message.Body.(type) {
case *icmp.Echo:
return ip.sendICMPEchoRequest(pk, body, responder)
default:
return fmt.Errorf("sending ICMP %s is not implemented", pk.Type)
}
}
func (ip *icmpProxy) Serve(ctx context.Context) error {
<-ctx.Done()
close(ip.shutdownC)
return ctx.Err()
}
func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error {
icmpFlow, ok := ip.srcToFlowTracker.get(pk.Src)
if ok {
return icmpFlow.send(pk)
}
conn, err := newICMPConn(ip.listenIP)
if err != nil {
return err
}
flow := packet.Flow{
Src: pk.Src,
Dst: pk.Dst,
Responder: responder,
}
icmpFlow = newICMPFlow(conn, &flow, uint16(echo.ID), ip.logger)
go func() {
defer ip.srcToFlowTracker.delete(pk.Src)
if err := icmpFlow.serve(ip.shutdownC, defaultCloseAfterIdle); err != nil {
ip.logger.Debug().Err(err).Uint16("flowID", icmpFlow.echoID).Msg("flow terminated")
}
}()
ip.srcToFlowTracker.set(pk.Src, icmpFlow)
return icmpFlow.send(pk)
}
type srcIPFlowID netip.Addr
func (sifd srcIPFlowID) Type() string {
return "srcIP"
}
func (sifd srcIPFlowID) String() string {
return netip.Addr(sifd).String()
}
type srcToFlowTracker struct {
lock sync.RWMutex
// srcIPToConn tracks source IP to ICMP connection
srcToFlow map[netip.Addr]*icmpFlow
}
func newSrcToConnTracker() *srcToFlowTracker {
return &srcToFlowTracker{
srcToFlow: make(map[netip.Addr]*icmpFlow),
}
}
func (sft *srcToFlowTracker) get(srcIP netip.Addr) (*icmpFlow, bool) {
sft.lock.RLock()
defer sft.lock.RUnlock()
flow, ok := sft.srcToFlow[srcIP]
return flow, ok
}
func (sft *srcToFlowTracker) set(srcIP netip.Addr, flow *icmpFlow) {
sft.lock.Lock()
defer sft.lock.Unlock()
sft.srcToFlow[srcIP] = flow
}
func (sft *srcToFlowTracker) delete(srcIP netip.Addr) {
sft.lock.Lock()
defer sft.lock.Unlock()
delete(sft.srcToFlow, srcIP)
}
type icmpFlow struct {
conn *icmp.PacketConn
flow *packet.Flow
echoID uint16
// last active unix time. Unit is seconds
lastActive int64
logger *zerolog.Logger
}
func newICMPFlow(conn *icmp.PacketConn, flow *packet.Flow, echoID uint16, logger *zerolog.Logger) *icmpFlow {
return &icmpFlow{
conn: conn,
flow: flow,
echoID: echoID,
lastActive: time.Now().Unix(),
logger: logger,
}
}
func (f *icmpFlow) serve(shutdownC chan struct{}, closeAfterIdle time.Duration) error {
errC := make(chan error)
go func() {
errC <- f.listenResponse()
}()
checkIdleTicker := time.NewTicker(closeAfterIdle)
defer f.conn.Close()
defer checkIdleTicker.Stop()
for {
select {
case err := <-errC:
return err
case <-shutdownC:
return nil
case <-checkIdleTicker.C:
now := time.Now().Unix()
lastActive := atomic.LoadInt64(&f.lastActive)
if now > lastActive+int64(closeAfterIdle.Seconds()) {
return errFlowInactive
}
}
}
}
func (f *icmpFlow) send(pk *packet.ICMP) error {
f.updateLastActive()
// For IPv4, the pseudoHeader is not used because the checksum is always calculated
var pseudoHeader []byte = nil
serializedMsg, err := pk.Marshal(pseudoHeader)
if err != nil {
return errors.Wrap(err, "Failed to encode ICMP message")
}
// The address needs to be of type UDPAddr when conn is created without priviledge
_, err = f.conn.WriteTo(serializedMsg, &net.UDPAddr{
IP: pk.Dst.AsSlice(),
})
return err
}
func (f *icmpFlow) listenResponse() error {
buf := make([]byte, mtu)
encoder := packet.NewEncoder()
for {
n, src, err := f.conn.ReadFrom(buf)
if err != nil {
return err
}
f.updateLastActive()
if err := f.handleResponse(encoder, src, buf[:n]); err != nil {
f.logger.Err(err).Str("dst", src.String()).Msg("Failed to handle ICMP response")
continue
}
}
}
func (f *icmpFlow) handleResponse(encoder *packet.Encoder, from net.Addr, rawPacket []byte) error {
// TODO: TUN-6654 Check for IPv6
msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), rawPacket)
if err != nil {
return err
}
echo, ok := msg.Body.(*icmp.Echo)
if !ok {
return fmt.Errorf("received unexpected icmp type %s from non-privileged ICMP socket", msg.Type)
}
addrPort, err := netip.ParseAddrPort(from.String())
if err != nil {
return err
}
icmpPacket := packet.ICMP{
IP: &packet.IP{
Src: addrPort.Addr(),
Dst: f.flow.Src,
Protocol: layers.IPProtocol(msg.Type.Protocol()),
},
Message: &icmp.Message{
Type: msg.Type,
Code: msg.Code,
Body: &icmp.Echo{
ID: int(f.echoID),
Seq: echo.Seq,
Data: echo.Data,
},
},
}
serializedPacket, err := encoder.Encode(&icmpPacket)
if err != nil {
return errors.Wrap(err, "Failed to encode ICMP message")
}
if err := f.flow.Responder.SendPacket(serializedPacket); err != nil {
return errors.Wrap(err, "Failed to send packet to the edge")
}
return nil
}
func (f *icmpFlow) updateLastActive() {
atomic.StoreInt64(&f.lastActive, time.Now().Unix())
}

View File

@ -0,0 +1,52 @@
//go:build linux
package ingress
import (
"errors"
"net"
"net/netip"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/cloudflare/cloudflared/packet"
)
func TestCloseIdleFlow(t *testing.T) {
const (
echoID = 19234
idleTimeout = time.Millisecond * 100
)
conn, err := newICMPConn(localhostIP)
require.NoError(t, err)
flow := packet.Flow{
Src: netip.MustParseAddr("172.16.0.1"),
}
icmpFlow := newICMPFlow(conn, &flow, echoID, &noopLogger)
shutdownC := make(chan struct{})
flowErr := make(chan error)
go func() {
flowErr <- icmpFlow.serve(shutdownC, idleTimeout)
}()
require.Equal(t, errFlowInactive, <-flowErr)
}
func TestCloseConnStopFlow(t *testing.T) {
const (
echoID = 19234
)
conn, err := newICMPConn(localhostIP)
require.NoError(t, err)
flow := packet.Flow{
Src: netip.MustParseAddr("172.16.0.1"),
}
icmpFlow := newICMPFlow(conn, &flow, echoID, &noopLogger)
shutdownC := make(chan struct{})
conn.Close()
err = icmpFlow.serve(shutdownC, defaultCloseAfterIdle)
require.True(t, errors.Is(err, net.ErrClosed))
}

View File

@ -2,21 +2,43 @@ package ingress
import (
"context"
"net"
"fmt"
"net/netip"
"time"
"github.com/rs/zerolog"
"golang.org/x/net/icmp"
"github.com/cloudflare/cloudflared/packet"
)
const (
defaultCloseAfterIdle = time.Second * 15
mtu = 1500
)
var (
errFlowInactive = fmt.Errorf("flow is inactive")
errPacketNil = fmt.Errorf("packet is nil")
)
// ICMPProxy sends ICMP messages and listens for their responses
type ICMPProxy interface {
// Serve starts listening for responses to the requests until context is done
Serve(ctx context.Context) error
// Request sends an ICMP message
Request(pk *packet.ICMP, responder packet.FlowResponder) error
// ListenResponse listens for responses to the requests until context is done
ListenResponse(ctx context.Context) error
}
func NewICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
func NewICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
return newICMPProxy(listenIP, logger)
}
// Opens a non-privileged ICMP socket on Linux and Darwin
func newICMPConn(listenIP netip.Addr) (*icmp.PacketConn, error) {
network := "udp6"
if listenIP.Is4() {
network = "udp4"
}
return icmp.ListenPacket(network, listenIP.String())
}

View File

@ -24,19 +24,19 @@ var (
// TestICMPProxyEcho makes sure we can send ICMP echo via the Request method and receives response via the
// ListenResponse method
func TestICMPProxyEcho(t *testing.T) {
skipNonDarwin(t)
onlyDarwinOrLinux(t)
const (
echoID = 36571
endSeq = 100
)
proxy, err := NewICMPProxy(localhostIP.AsSlice(), &noopLogger)
proxy, err := NewICMPProxy(localhostIP, &noopLogger)
require.NoError(t, err)
proxyDone := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
proxy.ListenResponse(ctx)
proxy.Serve(ctx)
close(proxyDone)
}()
@ -72,7 +72,7 @@ func TestICMPProxyEcho(t *testing.T) {
// TestICMPProxyRejectNotEcho makes sure it rejects messages other than echo
func TestICMPProxyRejectNotEcho(t *testing.T) {
skipNonDarwin(t)
onlyDarwinOrLinux(t)
msgs := []icmp.Message{
{
Type: ipv4.ICMPTypeDestinationUnreachable,
@ -97,7 +97,7 @@ func TestICMPProxyRejectNotEcho(t *testing.T) {
},
},
}
proxy, err := NewICMPProxy(localhostIP.AsSlice(), &noopLogger)
proxy, err := NewICMPProxy(localhostIP, &noopLogger)
require.NoError(t, err)
responder := echoFlowResponder{
@ -117,8 +117,8 @@ func TestICMPProxyRejectNotEcho(t *testing.T) {
}
}
func skipNonDarwin(t *testing.T) {
if runtime.GOOS != "darwin" {
func onlyDarwinOrLinux(t *testing.T) {
if runtime.GOOS != "darwin" && runtime.GOOS != "linux" {
t.Skip("Cannot create non-privileged datagram-oriented ICMP endpoint on Windows")
}
}
@ -146,6 +146,5 @@ func (efr *echoFlowResponder) validate(t *testing.T, echoReq *packet.ICMP) {
require.Equal(t, ipv4.ICMPTypeEchoReply, decoded.Type)
require.Equal(t, 0, decoded.Code)
require.NotZero(t, decoded.Checksum)
// TODO: TUN-6586: Enable this validation when ICMP echo ID matches on Linux
require.Equal(t, echoReq.Body, decoded.Body)
}

View File

@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"strings"
"time"
@ -117,9 +117,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
connAwareLogger: log,
}
if useDatagramV2(config) {
// For non-privileged datagram-oriented ICMP endpoints, network must be "udp4" or "udp6"
// TODO: TUN-6654 listen for IPv6 and decide if it should listen on specific IP
icmpProxy, err := ingress.NewICMPProxy(net.IPv4zero, config.Log)
listenIP, err := netip.ParseAddr("0.0.0.0")
if err != nil {
return nil, err
}
icmpProxy, err := ingress.NewICMPProxy(listenIP, config.Log)
if err != nil {
log.Logger().Warn().Err(err).Msg("Failed to create icmp proxy, will continue to use datagram v1")
} else {
@ -156,7 +159,7 @@ func (s *Supervisor) Run(
) error {
if s.edgeTunnelServer.icmpProxy != nil {
go func() {
if err := s.edgeTunnelServer.icmpProxy.ListenResponse(ctx); err != nil {
if err := s.edgeTunnelServer.icmpProxy.Serve(ctx); err != nil {
s.log.Logger().Err(err).Msg("icmp proxy terminated")
}
}()