TUN-7776: Remove warp-routing flag from cloudflared

This commit is contained in:
João Oliveirinha 2023-09-08 18:05:13 +01:00 committed by Jean Khawand
parent dcfc831752
commit 68bec886cf
13 changed files with 23 additions and 126 deletions

View File

@ -1,3 +1,6 @@
## 2023.9.0
### Notices
- The `warp-routing` `enabled: boolean` flag is no longer supported in the configuration file. Warp Routing traffic (eg TCP, UDP, ICMP) traffic is proxied to cloudflared if routes to the target tunnel are configured. This change does not affect remotely managed tunnels, but for locally managed tunnels, users that might be relying on this feature flag to block traffic should instead guarantee that tunnel has no Private Routes configured for the tunnel.
## 2023.7.0 ## 2023.7.0
### New Features ### New Features
- You can now enable additional diagnostics over the management.argotunnel.com service for your active cloudflared connectors via a new runtime flag `--management-diagnostics` (or env `TUNNEL_MANAGEMENT_DIAGNOSTICS`). This feature is provided as opt-in and requires the flag to enable. Endpoints such as /metrics provides your prometheus metrics endpoint another mechanism to be reached. Additionally /debug/pprof/(goroutine|heap) are also introduced to allow for remotely retrieving active pprof information from a running cloudflared connector. - You can now enable additional diagnostics over the management.argotunnel.com service for your active cloudflared connectors via a new runtime flag `--management-diagnostics` (or env `TUNNEL_MANAGEMENT_DIAGNOSTICS`). This feature is provided as opt-in and requires the flag to enable. Endpoints such as /metrics provides your prometheus metrics endpoint another mechanism to be reached. Additionally /debug/pprof/(goroutine|heap) are also introduced to allow for remotely retrieving active pprof information from a running cloudflared connector.

View File

@ -257,7 +257,6 @@ type Configuration struct {
} }
type WarpRoutingConfig struct { type WarpRoutingConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"` ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"` TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
} }

View File

@ -23,7 +23,6 @@ func TestConfigFileSettings(t *testing.T) {
Service: "https://localhost:8001", Service: "https://localhost:8001",
} }
warpRouting = WarpRoutingConfig{ warpRouting = WarpRoutingConfig{
Enabled: true,
ConnectTimeout: &CustomDuration{Duration: 2 * time.Second}, ConnectTimeout: &CustomDuration{Duration: 2 * time.Second},
TCPKeepAlive: &CustomDuration{Duration: 10 * time.Second}, TCPKeepAlive: &CustomDuration{Duration: 10 * time.Second},
} }

View File

@ -40,7 +40,6 @@ type Orchestrator interface {
UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
GetConfigJSON() ([]byte, error) GetConfigJSON() ([]byte, error)
GetOriginProxy() (OriginProxy, error) GetOriginProxy() (OriginProxy, error)
WarpRoutingEnabled() (enabled bool)
} }
type NamedTunnelProperties struct { type NamedTunnelProperties struct {

View File

@ -103,7 +103,7 @@ func NewQUICConnection(
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity) sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan) datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan) sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger, orchestrator.WarpRoutingEnabled) packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger)
return &QUICConnection{ return &QUICConnection{
session: session, session: session,

View File

@ -44,14 +44,12 @@ const (
) )
type WarpRoutingConfig struct { type WarpRoutingConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"` ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"` TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
} }
func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig { func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
cfg := WarpRoutingConfig{ cfg := WarpRoutingConfig{
Enabled: raw.Enabled,
ConnectTimeout: defaultWarpRoutingConnectTimeout, ConnectTimeout: defaultWarpRoutingConnectTimeout,
TCPKeepAlive: defaultTCPKeepAlive, TCPKeepAlive: defaultTCPKeepAlive,
} }
@ -65,9 +63,7 @@ func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
} }
func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig { func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig {
raw := config.WarpRoutingConfig{ raw := config.WarpRoutingConfig{}
Enabled: c.Enabled,
}
if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration { if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration {
raw.ConnectTimeout = &c.ConnectTimeout raw.ConnectTimeout = &c.ConnectTimeout
} }

View File

@ -23,12 +23,11 @@ type muxer interface {
// PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets // PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets
type PacketRouter struct { type PacketRouter struct {
globalConfig *GlobalRouterConfig globalConfig *GlobalRouterConfig
muxer muxer muxer muxer
logger *zerolog.Logger logger *zerolog.Logger
checkRouterEnabledFunc func() bool icmpDecoder *packet.ICMPDecoder
icmpDecoder *packet.ICMPDecoder encoder *packet.Encoder
encoder *packet.Encoder
} }
// GlobalRouterConfig is the configuration shared by all instance of Router. // GlobalRouterConfig is the configuration shared by all instance of Router.
@ -40,14 +39,13 @@ type GlobalRouterConfig struct {
} }
// NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil. // NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil.
func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger, checkRouterEnabledFunc func() bool) *PacketRouter { func NewPacketRouter(globalConfig *GlobalRouterConfig, muxer muxer, logger *zerolog.Logger) *PacketRouter {
return &PacketRouter{ return &PacketRouter{
globalConfig: globalConfig, globalConfig: globalConfig,
muxer: muxer, muxer: muxer,
logger: logger, logger: logger,
checkRouterEnabledFunc: checkRouterEnabledFunc, icmpDecoder: packet.NewICMPDecoder(),
icmpDecoder: packet.NewICMPDecoder(), encoder: packet.NewEncoder(),
encoder: packet.NewEncoder(),
} }
} }
@ -92,10 +90,6 @@ func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPac
return return
} }
if enabled := r.checkRouterEnabledFunc(); !enabled {
return
}
icmpPacket, err := r.icmpDecoder.Decode(rawPacket) icmpPacket, err := r.icmpDecoder.Decode(rawPacket)
if err != nil { if err != nil {
r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram") r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram")

View File

@ -7,7 +7,6 @@ import (
"net/netip" "net/netip"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time"
"github.com/google/gopacket/layers" "github.com/google/gopacket/layers"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -29,9 +28,7 @@ var (
func TestRouterReturnTTLExceed(t *testing.T) { func TestRouterReturnTTLExceed(t *testing.T) {
muxer := newMockMuxer(0) muxer := newMockMuxer(0)
routerEnabled := &routerEnabledChecker{} router := NewPacketRouter(packetConfig, muxer, &noopLogger)
routerEnabled.set(true)
router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
routerStopped := make(chan struct{}) routerStopped := make(chan struct{})
go func() { go func() {
@ -80,65 +77,6 @@ func TestRouterReturnTTLExceed(t *testing.T) {
<-routerStopped <-routerStopped
} }
func TestRouterCheckEnabled(t *testing.T) {
muxer := newMockMuxer(0)
routerEnabled := &routerEnabledChecker{}
router := NewPacketRouter(packetConfig, muxer, &noopLogger, routerEnabled.isEnabled)
ctx, cancel := context.WithCancel(context.Background())
routerStopped := make(chan struct{})
go func() {
router.Serve(ctx)
close(routerStopped)
}()
pk := packet.ICMP{
IP: &packet.IP{
Src: netip.MustParseAddr("192.168.1.1"),
Dst: netip.MustParseAddr("10.0.0.1"),
Protocol: layers.IPProtocolICMPv4,
TTL: 1,
},
Message: &icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: &icmp.Echo{
ID: 12481,
Seq: 8036,
Data: []byte(t.Name()),
},
},
}
// router is disabled
encoder := packet.NewEncoder()
encodedPacket, err := encoder.Encode(&pk)
require.NoError(t, err)
sendPacket := quicpogs.RawPacket(encodedPacket)
muxer.edgeToCfd <- sendPacket
select {
case <-time.After(time.Millisecond * 10):
case <-muxer.cfdToEdge:
t.Error("Unexpected reply when router is disabled")
}
routerEnabled.set(true)
// router is enabled, expects reply
muxer.edgeToCfd <- sendPacket
<-muxer.cfdToEdge
routerEnabled.set(false)
// router is disabled
muxer.edgeToCfd <- sendPacket
select {
case <-time.After(time.Millisecond * 10):
case <-muxer.cfdToEdge:
t.Error("Unexpected reply when router is disabled")
}
cancel()
<-routerStopped
}
func assertTTLExceed(t *testing.T, originalPacket *packet.ICMP, expectedSrc netip.Addr, muxer *mockMuxer) { func assertTTLExceed(t *testing.T, originalPacket *packet.ICMP, expectedSrc netip.Addr, muxer *mockMuxer) {
encoder := packet.NewEncoder() encoder := packet.NewEncoder()
rawPacket, err := encoder.Encode(originalPacket) rawPacket, err := encoder.Encode(originalPacket)

View File

@ -60,7 +60,6 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true,
"connectTimeout": 1 "connectTimeout": 1
} }
} }
@ -83,7 +82,6 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, remoteConfig.WarpRouting, ingress.WarpRoutingConfig{ require.Equal(t, remoteConfig.WarpRouting, ingress.WarpRoutingConfig{
Enabled: true,
ConnectTimeout: config.CustomDuration{ ConnectTimeout: config.CustomDuration{
Duration: time.Second, Duration: time.Second,
}, },

View File

@ -29,11 +29,10 @@ type Orchestrator struct {
// Underlying value is proxy.Proxy, can be read without the lock, but still needs the lock to update // Underlying value is proxy.Proxy, can be read without the lock, but still needs the lock to update
proxy atomic.Value proxy atomic.Value
// Set of internal ingress rules defined at cloudflared startup (separate from user-defined ingress rules) // Set of internal ingress rules defined at cloudflared startup (separate from user-defined ingress rules)
internalRules []ingress.Rule internalRules []ingress.Rule
warpRoutingEnabled atomic.Bool config *Config
config *Config tags []tunnelpogs.Tag
tags []tunnelpogs.Tag log *zerolog.Logger
log *zerolog.Logger
// orchestrator must not handle any more updates after shutdownC is closed // orchestrator must not handle any more updates after shutdownC is closed
shutdownC <-chan struct{} shutdownC <-chan struct{}
@ -136,11 +135,6 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
o.proxy.Store(proxy) o.proxy.Store(proxy)
o.config.Ingress = &ingressRules o.config.Ingress = &ingressRules
o.config.WarpRouting = warpRouting o.config.WarpRouting = warpRouting
if warpRouting.Enabled {
o.warpRoutingEnabled.Store(true)
} else {
o.warpRoutingEnabled.Store(false)
}
// If proxyShutdownC is nil, there is no previous running proxy // If proxyShutdownC is nil, there is no previous running proxy
if o.proxyShutdownC != nil { if o.proxyShutdownC != nil {
@ -209,10 +203,6 @@ func (o *Orchestrator) GetOriginProxy() (connection.OriginProxy, error) {
return proxy, nil return proxy, nil
} }
func (o *Orchestrator) WarpRoutingEnabled() bool {
return o.warpRoutingEnabled.Load()
}
func (o *Orchestrator) waitToCloseLastProxy() { func (o *Orchestrator) waitToCloseLastProxy() {
<-o.shutdownC <-o.shutdownC
o.lock.Lock() o.lock.Lock()

View File

@ -55,7 +55,6 @@ func TestUpdateConfiguration(t *testing.T) {
initOriginProxy, err := orchestrator.GetOriginProxy() initOriginProxy, err := orchestrator.GetOriginProxy()
require.NoError(t, err) require.NoError(t, err)
require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy) require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
require.False(t, orchestrator.WarpRoutingEnabled())
configJSONV2 := []byte(` configJSONV2 := []byte(`
{ {
@ -87,7 +86,6 @@ func TestUpdateConfiguration(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true,
"connectTimeout": 10 "connectTimeout": 10
} }
} }
@ -126,8 +124,6 @@ func TestUpdateConfiguration(t *testing.T) {
require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout) require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout)
require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify) require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify)
require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs) require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs)
require.True(t, configV2.WarpRouting.Enabled)
require.Equal(t, configV2.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled())
require.Equal(t, configV2.WarpRouting.ConnectTimeout.Duration, 10*time.Second) require.Equal(t, configV2.WarpRouting.ConnectTimeout.Duration, 10*time.Second)
originProxyV2, err := orchestrator.GetOriginProxy() originProxyV2, err := orchestrator.GetOriginProxy()
@ -162,7 +158,6 @@ func TestUpdateConfiguration(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": false
} }
} }
`) `)
@ -171,8 +166,6 @@ func TestUpdateConfiguration(t *testing.T) {
require.Len(t, configV10.Ingress.Rules, 1) require.Len(t, configV10.Ingress.Rules, 1)
require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10")) require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10"))
require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String()) require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String())
require.False(t, configV10.WarpRouting.Enabled)
require.Equal(t, configV10.WarpRouting.Enabled, orchestrator.WarpRoutingEnabled())
originProxyV10, err := orchestrator.GetOriginProxy() originProxyV10, err := orchestrator.GetOriginProxy()
require.NoError(t, err) require.NoError(t, err)
@ -191,7 +184,6 @@ func TestUpdateConfiguration_FromMigration(t *testing.T) {
initOriginProxy, err := orchestrator.GetOriginProxy() initOriginProxy, err := orchestrator.GetOriginProxy()
require.NoError(t, err) require.NoError(t, err)
require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy) require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
require.False(t, orchestrator.WarpRoutingEnabled())
configJSONV2 := []byte(` configJSONV2 := []byte(`
{ {
@ -201,7 +193,6 @@ func TestUpdateConfiguration_FromMigration(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true
} }
} }
`) `)
@ -271,7 +262,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true
} }
} }
`, hostname, httpOrigin.URL, expectedHost)) `, hostname, httpOrigin.URL, expectedHost))
@ -283,7 +273,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": false
} }
} }
`) `)
@ -296,7 +285,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true
} }
} }
`) `)
@ -516,7 +504,6 @@ func TestClosePreviousProxies(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true
} }
} }
`, hostname)) `, hostname))
@ -529,7 +516,6 @@ func TestClosePreviousProxies(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true
} }
} }
`) `)
@ -612,7 +598,6 @@ func TestPersistentConnection(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true
} }
} }
`, wsOrigin.URL)) `, wsOrigin.URL))
@ -679,7 +664,6 @@ func TestPersistentConnection(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": false
} }
} }
`) `)

View File

@ -57,10 +57,8 @@ func NewOriginProxy(
tags: tags, tags: tags,
log: log, log: log,
} }
if warpRouting.Enabled {
proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting) proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting)
log.Info().Msgf("Warp-routing is enabled")
}
return proxy return proxy
} }

View File

@ -37,7 +37,6 @@ var (
testTags = []tunnelpogs.Tag{{Name: "Name", Value: "value"}} testTags = []tunnelpogs.Tag{{Name: "Name", Value: "value"}}
noWarpRouting = ingress.WarpRoutingConfig{} noWarpRouting = ingress.WarpRoutingConfig{}
testWarpRouting = ingress.WarpRoutingConfig{ testWarpRouting = ingress.WarpRoutingConfig{
Enabled: true,
ConnectTimeout: config.CustomDuration{Duration: time.Second}, ConnectTimeout: config.CustomDuration{Duration: time.Second},
} }
) )