From fc0ecf41855bc4a3c0e2088c14736549cdee9870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveirinha?= Date: Fri, 8 Sep 2023 18:05:13 +0100 Subject: [PATCH] TUN-7776: Remove warp-routing flag from cloudflared --- CHANGES.md | 3 ++ config/configuration.go | 1 - config/configuration_test.go | 1 - connection/connection.go | 1 - connection/quic.go | 2 +- ingress/config.go | 6 +-- ingress/packet_router.go | 28 +++++-------- ingress/packet_router_test.go | 64 +----------------------------- orchestration/config_test.go | 2 - orchestration/orchestrator.go | 18 ++------- orchestration/orchestrator_test.go | 16 -------- proxy/proxy.go | 6 +-- proxy/proxy_test.go | 1 - 13 files changed, 23 insertions(+), 126 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 411d480d..2a6d7c0f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,6 @@ +## 2023.9.0 +### Notices +- The `warp-routing` `enabled: boolean` flag is no longer supported in the configuration file. Warp Routing traffic (eg TCP, UDP, ICMP) traffic is proxied to cloudflared if routes to the target tunnel are configured. This change does not affect remotely managed tunnels, but for locally managed tunnels, users that might be relying on this feature flag to block traffic should instead guarantee that tunnel has no Private Routes configured for the tunnel. ## 2023.7.0 ### New Features - You can now enable additional diagnostics over the management.argotunnel.com service for your active cloudflared connectors via a new runtime flag `--management-diagnostics` (or env `TUNNEL_MANAGEMENT_DIAGNOSTICS`). This feature is provided as opt-in and requires the flag to enable. Endpoints such as /metrics provides your prometheus metrics endpoint another mechanism to be reached. Additionally /debug/pprof/(goroutine|heap) are also introduced to allow for remotely retrieving active pprof information from a running cloudflared connector. diff --git a/config/configuration.go b/config/configuration.go index 73d45fbc..0a8228b7 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -257,7 +257,6 @@ type Configuration struct { } type WarpRoutingConfig struct { - Enabled bool `yaml:"enabled" json:"enabled"` ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"` TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"` } diff --git a/config/configuration_test.go b/config/configuration_test.go index b613b8d1..3240a2a0 100644 --- a/config/configuration_test.go +++ b/config/configuration_test.go @@ -23,7 +23,6 @@ func TestConfigFileSettings(t *testing.T) { Service: "https://localhost:8001", } warpRouting = WarpRoutingConfig{ - Enabled: true, ConnectTimeout: &CustomDuration{Duration: 2 * time.Second}, TCPKeepAlive: &CustomDuration{Duration: 10 * time.Second}, } diff --git a/connection/connection.go b/connection/connection.go index eade8ded..b24ef4ea 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -40,7 +40,6 @@ type Orchestrator interface { UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse GetConfigJSON() ([]byte, error) GetOriginProxy() (OriginProxy, error) - WarpRoutingEnabled() (enabled bool) } type NamedTunnelProperties struct { diff --git a/connection/quic.go b/connection/quic.go index ab968594..79c39028 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -103,7 +103,7 @@ func NewQUICConnection( sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity) datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan) sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan) - packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger, orchestrator.WarpRoutingEnabled) + packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger) return &QUICConnection{ session: session, diff --git a/ingress/config.go b/ingress/config.go index 25f7993a..2c0c97c8 100644 --- a/ingress/config.go +++ b/ingress/config.go @@ -44,14 +44,12 @@ const ( ) type WarpRoutingConfig struct { - Enabled bool `yaml:"enabled" json:"enabled"` ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"` TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"` } func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig { cfg := WarpRoutingConfig{ - Enabled: raw.Enabled, ConnectTimeout: defaultWarpRoutingConnectTimeout, TCPKeepAlive: defaultTCPKeepAlive, } @@ -65,9 +63,7 @@ func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig { } func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig { - raw := config.WarpRoutingConfig{ - Enabled: c.Enabled, - } + raw := config.WarpRoutingConfig{} if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration { raw.ConnectTimeout = &c.ConnectTimeout } diff --git a/ingress/packet_router.go b/ingress/packet_router.go index 9d08b4f9..1e15163a 100644 --- a/ingress/packet_router.go +++ b/ingress/packet_router.go @@ -23,12 +23,11 @@ type muxer interface { // PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets type PacketRouter struct { - globalConfig *GlobalRouterConfig - muxer muxer - logger *zerolog.Logger - checkRouterEnabledFunc func() bool - icmpDecoder *packet.ICMPDecoder - encoder *packet.Encoder + globalConfig *GlobalRouterConfig + muxer muxer + logger *zerolog.Logger + icmpDecoder *packet.ICMPDecoder + encoder *packet.Encoder } // GlobalRouterConfig is the configuration shared by all instance of Router. @@ -40,14 +39,13 @@ type GlobalRouterConfig struct { } // NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil. -func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *PacketRouter { +func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger) *PacketRouter { return &PacketRouter{ - globalConfig: globalConfig, - muxer: muxer, - logger: logger, - checkRouterEnabledFunc: checkRouterEnabledFunc, - icmpDecoder: packet.NewICMPDecoder(), - encoder: packet.NewEncoder(), + globalConfig: globalConfig, + muxer: muxer, + logger: logger, + icmpDecoder: packet.NewICMPDecoder(), + encoder: packet.NewEncoder(), } } @@ -92,10 +90,6 @@ func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPac return } - if enabled := r.checkRouterEnabledFunc(); !enabled { - return - } - icmpPacket, err := r.icmpDecoder.Decode(rawPacket) if err != nil { r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram") diff --git a/ingress/packet_router_test.go b/ingress/packet_router_test.go index ca618c50..403a2274 100644 --- a/ingress/packet_router_test.go +++ b/ingress/packet_router_test.go @@ -7,7 +7,6 @@ import ( "net/netip" "sync/atomic" "testing" - "time" "github.com/google/gopacket/layers" "github.com/stretchr/testify/require" @@ -29,9 +28,7 @@ var ( func TestRouterReturnTTLExceed(t *testing.T) { muxer := newMockMuxer(0) - routerEnabled := &routerEnabledChecker{} - routerEnabled.set(true) - router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled) + router := NewPacketRouter(packetConfig, muxer, &noopLogger) ctx, cancel := context.WithCancel(context.Background()) routerStopped := make(chan struct{}) go func() { @@ -80,65 +77,6 @@ func TestRouterReturnTTLExceed(t *testing.T) { <-routerStopped } -func TestRouterCheckEnabled(t *testing.T) { - muxer := newMockMuxer(0) - routerEnabled := &routerEnabledChecker{} - router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled) - ctx, cancel := context.WithCancel(context.Background()) - routerStopped := make(chan struct{}) - go func() { - router.Serve(ctx) - close(routerStopped) - }() - - pk := packet.ICMP{ - IP: &packet.IP{ - Src: netip.MustParseAddr("192.168.1.1"), - Dst: netip.MustParseAddr("10.0.0.1"), - Protocol: layers.IPProtocolICMPv4, - TTL: 1, - }, - Message: &icmp.Message{ - Type: ipv4.ICMPTypeEcho, - Code: 0, - Body: &icmp.Echo{ - ID: 12481, - Seq: 8036, - Data: []byte(t.Name()), - }, - }, - } - - // router is disabled - encoder := packet.NewEncoder() - encodedPacket, err := encoder.Encode(&pk) - require.NoError(t, err) - sendPacket := quicpogs.RawPacket(encodedPacket) - - muxer.edgeToCfd <- sendPacket - select { - case <-time.After(time.Millisecond * 10): - case <-muxer.cfdToEdge: - t.Error("Unexpected reply when router is disabled") - } - routerEnabled.set(true) - // router is enabled, expects reply - muxer.edgeToCfd <- sendPacket - <-muxer.cfdToEdge - - routerEnabled.set(false) - // router is disabled - muxer.edgeToCfd <- sendPacket - select { - case <-time.After(time.Millisecond * 10): - case <-muxer.cfdToEdge: - t.Error("Unexpected reply when router is disabled") - } - - cancel() - <-routerStopped -} - func assertTTLExceed(t *testing.T, originalPacket *packet.ICMP, expectedSrc netip.Addr, muxer *mockMuxer) { encoder := packet.NewEncoder() rawPacket, err := encoder.Encode(originalPacket) diff --git a/orchestration/config_test.go b/orchestration/config_test.go index 04c15359..affac27a 100644 --- a/orchestration/config_test.go +++ b/orchestration/config_test.go @@ -60,7 +60,6 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) { } ], "warp-routing": { - "enabled": true, "connectTimeout": 1 } } @@ -83,7 +82,6 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) { require.NoError(t, err) require.Equal(t, remoteConfig.WarpRouting, ingress.WarpRoutingConfig{ - Enabled: true, ConnectTimeout: config.CustomDuration{ Duration: time.Second, }, diff --git a/orchestration/orchestrator.go b/orchestration/orchestrator.go index 0d43e59d..3ff763a2 100644 --- a/orchestration/orchestrator.go +++ b/orchestration/orchestrator.go @@ -29,11 +29,10 @@ type Orchestrator struct { // Underlying value is proxy.Proxy, can be read without the lock, but still needs the lock to update proxy atomic.Value // Set of internal ingress rules defined at cloudflared startup (separate from user-defined ingress rules) - internalRules []ingress.Rule - warpRoutingEnabled atomic.Bool - config *Config - tags []tunnelpogs.Tag - log *zerolog.Logger + internalRules []ingress.Rule + config *Config + tags []tunnelpogs.Tag + log *zerolog.Logger // orchestrator must not handle any more updates after shutdownC is closed shutdownC <-chan struct{} @@ -136,11 +135,6 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i o.proxy.Store(proxy) o.config.Ingress = &ingressRules o.config.WarpRouting = warpRouting - if warpRouting.Enabled { - o.warpRoutingEnabled.Store(true) - } else { - o.warpRoutingEnabled.Store(false) - } // If proxyShutdownC is nil, there is no previous running proxy if o.proxyShutdownC != nil { @@ -209,10 +203,6 @@ func (o *Orchestrator) GetOriginProxy() (connection.OriginProxy, error) { return proxy, nil } -func (o *Orchestrator) WarpRoutingEnabled() bool { - return o.warpRoutingEnabled.Load() -} - func (o *Orchestrator) waitToCloseLastProxy() { <-o.shutdownC o.lock.Lock() diff --git a/orchestration/orchestrator_test.go b/orchestration/orchestrator_test.go index 01addb8e..0cfd3176 100644 --- a/orchestration/orchestrator_test.go +++ b/orchestration/orchestrator_test.go @@ -55,7 +55,6 @@ func TestUpdateConfiguration(t *testing.T) { initOriginProxy, err := orchestrator.GetOriginProxy() require.NoError(t, err) require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy) - require.False(t, orchestrator.WarpRoutingEnabled()) configJSONV2 := []byte(` { @@ -87,7 +86,6 @@ func TestUpdateConfiguration(t *testing.T) { } ], "warp-routing": { - "enabled": true, "connectTimeout": 10 } } @@ -126,8 +124,6 @@ func TestUpdateConfiguration(t *testing.T) { require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout) require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify) require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs) - require.True(t, configV2.WarpRouting.Enabled) - require.Equal(t, configV2.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled()) require.Equal(t, configV2.WarpRouting.ConnectTimeout.Duration, 10*time.Second) originProxyV2, err := orchestrator.GetOriginProxy() @@ -162,7 +158,6 @@ func TestUpdateConfiguration(t *testing.T) { } ], "warp-routing": { - "enabled": false } } `) @@ -171,8 +166,6 @@ func TestUpdateConfiguration(t *testing.T) { require.Len(t, configV10.Ingress.Rules, 1) require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10")) require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String()) - require.False(t, configV10.WarpRouting.Enabled) - require.Equal(t, configV10.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled()) originProxyV10, err := orchestrator.GetOriginProxy() require.NoError(t, err) @@ -191,7 +184,6 @@ func TestUpdateConfiguration_FromMigration(t *testing.T) { initOriginProxy, err := orchestrator.GetOriginProxy() require.NoError(t, err) require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy) - require.False(t, orchestrator.WarpRoutingEnabled()) configJSONV2 := []byte(` { @@ -201,7 +193,6 @@ func TestUpdateConfiguration_FromMigration(t *testing.T) { } ], "warp-routing": { - "enabled": true } } `) @@ -271,7 +262,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) { } ], "warp-routing": { - "enabled": true } } `, hostname, httpOrigin.URL, expectedHost)) @@ -283,7 +273,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) { } ], "warp-routing": { - "enabled": false } } `) @@ -296,7 +285,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) { } ], "warp-routing": { - "enabled": true } } `) @@ -516,7 +504,6 @@ func TestClosePreviousProxies(t *testing.T) { } ], "warp-routing": { - "enabled": true } } `, hostname)) @@ -529,7 +516,6 @@ func TestClosePreviousProxies(t *testing.T) { } ], "warp-routing": { - "enabled": true } } `) @@ -612,7 +598,6 @@ func TestPersistentConnection(t *testing.T) { } ], "warp-routing": { - "enabled": true } } `, wsOrigin.URL)) @@ -679,7 +664,6 @@ func TestPersistentConnection(t *testing.T) { } ], "warp-routing": { - "enabled": false } } `) diff --git a/proxy/proxy.go b/proxy/proxy.go index 2acab46a..38989a5f 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -57,10 +57,8 @@ func NewOriginProxy( tags: tags, log: log, } - if warpRouting.Enabled { - proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting) - log.Info().Msgf("Warp-routing is enabled") - } + + proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting) return proxy } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 86132b7c..49a48f16 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -37,7 +37,6 @@ var ( testTags = []tunnelpogs.Tag{{Name: "Name", Value: "value"}} noWarpRouting = ingress.WarpRoutingConfig{} testWarpRouting = ingress.WarpRoutingConfig{ - Enabled: true, ConnectTimeout: config.CustomDuration{Duration: time.Second}, } )