TUN-6380: Enforce connect and keep-alive timeouts for TCP connections in both WARP routing and websocket based TCP proxy.

For WARP routing the defaults for these new settings are 5 seconds for connect timeout and 30 seconds for keep-alive timeout. These values can be configured either remotely or locally. Local config lives under "warp-routing" section in config.yaml.

For websocket-based proxy, the defaults come from originConfig settings (either global or per-service) and use the same defaults as HTTP proxying.
This commit is contained in:
Igor Postelnik 2022-06-13 11:44:27 -05:00
parent 978e01f77e
commit f2339a7244
15 changed files with 144 additions and 88 deletions

View File

@ -358,7 +358,7 @@ func prepareTunnelConfig(
} }
orchestratorConfig := &orchestration.Config{ orchestratorConfig := &orchestration.Config{
Ingress: &ingressRules, Ingress: &ingressRules,
WarpRoutingEnabled: warpRoutingEnabled, WarpRouting: ingress.NewWarpRoutingConfig(&cfg.WarpRouting),
ConfigurationFlags: parseConfigFlags(c), ConfigurationFlags: parseConfigFlags(c),
} }
return tunnelConfig, orchestratorConfig, nil return tunnelConfig, orchestratorConfig, nil

View File

@ -245,6 +245,8 @@ type Configuration struct {
type WarpRoutingConfig struct { type WarpRoutingConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"` Enabled bool `yaml:"enabled" json:"enabled"`
ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
} }
type configFileSettings struct { type configFileSettings struct {

View File

@ -24,6 +24,8 @@ func TestConfigFileSettings(t *testing.T) {
} }
warpRouting = WarpRoutingConfig{ warpRouting = WarpRoutingConfig{
Enabled: true, Enabled: true,
ConnectTimeout: &CustomDuration{Duration: 2 * time.Second},
TCPKeepAlive: &CustomDuration{Duration: 10 * time.Second},
} }
) )
rawYAML := ` rawYAML := `
@ -48,6 +50,9 @@ ingress:
service: https://localhost:8001 service: https://localhost:8001
warp-routing: warp-routing:
enabled: true enabled: true
connectTimeout: 2s
tcpKeepAlive: 10s
retries: 5 retries: 5
grace-period: 30s grace-period: 30s
percentage: 3.14 percentage: 3.14

View File

@ -12,7 +12,8 @@ import (
) )
var ( var (
defaultConnectTimeout = config.CustomDuration{Duration: 30 * time.Second} defaultHTTPConnectTimeout = config.CustomDuration{Duration: 30 * time.Second}
defaultWarpRoutingConnectTimeout = config.CustomDuration{Duration: 5 * time.Second}
defaultTLSTimeout = config.CustomDuration{Duration: 10 * time.Second} defaultTLSTimeout = config.CustomDuration{Duration: 10 * time.Second}
defaultTCPKeepAlive = config.CustomDuration{Duration: 30 * time.Second} defaultTCPKeepAlive = config.CustomDuration{Duration: 30 * time.Second}
defaultKeepAliveTimeout = config.CustomDuration{Duration: 90 * time.Second} defaultKeepAliveTimeout = config.CustomDuration{Duration: 90 * time.Second}
@ -41,10 +42,44 @@ const (
socksProxy = "socks" socksProxy = "socks"
) )
type WarpRoutingConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
}
func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
cfg := WarpRoutingConfig{
Enabled: raw.Enabled,
ConnectTimeout: defaultWarpRoutingConnectTimeout,
TCPKeepAlive: defaultTCPKeepAlive,
}
if raw.ConnectTimeout != nil {
cfg.ConnectTimeout = *raw.ConnectTimeout
}
if raw.TCPKeepAlive != nil {
cfg.TCPKeepAlive = *raw.TCPKeepAlive
}
return cfg
}
func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig {
raw := config.WarpRoutingConfig{
Enabled: c.Enabled,
}
if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration {
raw.ConnectTimeout = &c.ConnectTimeout
}
if c.TCPKeepAlive.Duration != defaultTCPKeepAlive.Duration {
raw.TCPKeepAlive = &c.TCPKeepAlive
}
return raw
}
// RemoteConfig models ingress settings that can be managed remotely, for example through the dashboard. // RemoteConfig models ingress settings that can be managed remotely, for example through the dashboard.
type RemoteConfig struct { type RemoteConfig struct {
Ingress Ingress Ingress Ingress
WarpRouting config.WarpRoutingConfig WarpRouting WarpRoutingConfig
} }
type RemoteConfigJSON struct { type RemoteConfigJSON struct {
@ -72,18 +107,18 @@ func (rc *RemoteConfig) UnmarshalJSON(b []byte) error {
} }
rc.Ingress = ingress rc.Ingress = ingress
rc.WarpRouting = rawConfig.WarpRouting rc.WarpRouting = NewWarpRoutingConfig(&rawConfig.WarpRouting)
return nil return nil
} }
func originRequestFromSingeRule(c *cli.Context) OriginRequestConfig { func originRequestFromSingeRule(c *cli.Context) OriginRequestConfig {
var connectTimeout config.CustomDuration = defaultConnectTimeout var connectTimeout = defaultHTTPConnectTimeout
var tlsTimeout config.CustomDuration = defaultTLSTimeout var tlsTimeout = defaultTLSTimeout
var tcpKeepAlive config.CustomDuration = defaultTCPKeepAlive var tcpKeepAlive = defaultTCPKeepAlive
var noHappyEyeballs bool var noHappyEyeballs bool
var keepAliveConnections int = defaultKeepAliveConnections var keepAliveConnections = defaultKeepAliveConnections
var keepAliveTimeout config.CustomDuration = defaultKeepAliveTimeout var keepAliveTimeout = defaultKeepAliveTimeout
var httpHostHeader string var httpHostHeader string
var originServerName string var originServerName string
var caPool string var caPool string
@ -160,7 +195,7 @@ func originRequestFromSingeRule(c *cli.Context) OriginRequestConfig {
func originRequestFromConfig(c config.OriginRequestConfig) OriginRequestConfig { func originRequestFromConfig(c config.OriginRequestConfig) OriginRequestConfig {
out := OriginRequestConfig{ out := OriginRequestConfig{
ConnectTimeout: defaultConnectTimeout, ConnectTimeout: defaultHTTPConnectTimeout,
TLSTimeout: defaultTLSTimeout, TLSTimeout: defaultTLSTimeout,
TCPKeepAlive: defaultTCPKeepAlive, TCPKeepAlive: defaultTCPKeepAlive,
KeepAliveConnections: defaultKeepAliveConnections, KeepAliveConnections: defaultKeepAliveConnections,
@ -404,7 +439,7 @@ func ConvertToRawOriginConfig(c OriginRequestConfig) config.OriginRequestConfig
var keepAliveTimeout *config.CustomDuration var keepAliveTimeout *config.CustomDuration
var proxyAddress *string var proxyAddress *string
if c.ConnectTimeout != defaultConnectTimeout { if c.ConnectTimeout != defaultHTTPConnectTimeout {
connectTimeout = &c.ConnectTimeout connectTimeout = &c.ConnectTimeout
} }
if c.TLSTimeout != defaultTLSTimeout { if c.TLSTimeout != defaultTLSTimeout {

View File

@ -274,7 +274,7 @@ func TestOriginRequestConfigDefaults(t *testing.T) {
// Rule 0 didn't override anything, so it inherits the cloudflared defaults // Rule 0 didn't override anything, so it inherits the cloudflared defaults
actual0 := ing.Rules[0].Config actual0 := ing.Rules[0].Config
expected0 := OriginRequestConfig{ expected0 := OriginRequestConfig{
ConnectTimeout: defaultConnectTimeout, ConnectTimeout: defaultHTTPConnectTimeout,
TLSTimeout: defaultTLSTimeout, TLSTimeout: defaultTLSTimeout,
TCPKeepAlive: defaultTCPKeepAlive, TCPKeepAlive: defaultTCPKeepAlive,
KeepAliveConnections: defaultKeepAliveConnections, KeepAliveConnections: defaultKeepAliveConnections,
@ -404,7 +404,7 @@ func TestDefaultConfigFromCLI(t *testing.T) {
c := cli.NewContext(nil, set, nil) c := cli.NewContext(nil, set, nil)
expected := OriginRequestConfig{ expected := OriginRequestConfig{
ConnectTimeout: defaultConnectTimeout, ConnectTimeout: defaultHTTPConnectTimeout,
TLSTimeout: defaultTLSTimeout, TLSTimeout: defaultTLSTimeout,
TCPKeepAlive: defaultTCPKeepAlive, TCPKeepAlive: defaultTCPKeepAlive,
KeepAliveConnections: defaultKeepAliveConnections, KeepAliveConnections: defaultKeepAliveConnections,

View File

@ -97,8 +97,16 @@ type WarpRoutingService struct {
Proxy StreamBasedOriginProxy Proxy StreamBasedOriginProxy
} }
func NewWarpRoutingService() *WarpRoutingService { func NewWarpRoutingService(config WarpRoutingConfig) *WarpRoutingService {
return &WarpRoutingService{Proxy: &rawTCPService{name: ServiceWarpRouting}} svc := &rawTCPService{
name: ServiceWarpRouting,
dialer: net.Dialer{
Timeout: config.ConnectTimeout.Duration,
KeepAlive: config.TCPKeepAlive.Duration,
},
}
return &WarpRoutingService{Proxy: svc}
} }
// Get a single origin service from the CLI/config. // Get a single origin service from the CLI/config.

View File

@ -1,26 +1,20 @@
package ingress package ingress
import ( import (
"context"
"fmt" "fmt"
"net"
"net/http" "net/http"
"github.com/pkg/errors"
)
var (
errUnsupportedConnectionType = errors.New("internal error: unsupported connection type")
) )
// HTTPOriginProxy can be implemented by origin services that want to proxy http requests. // HTTPOriginProxy can be implemented by origin services that want to proxy http requests.
type HTTPOriginProxy interface { type HTTPOriginProxy interface {
// RoundTrip is how cloudflared proxies eyeball requests to the actual origin services // RoundTripper is how cloudflared proxies eyeball requests to the actual origin services
http.RoundTripper http.RoundTripper
} }
// StreamBasedOriginProxy can be implemented by origin services that want to proxy ws/TCP. // StreamBasedOriginProxy can be implemented by origin services that want to proxy ws/TCP.
type StreamBasedOriginProxy interface { type StreamBasedOriginProxy interface {
EstablishConnection(dest string) (OriginConnection, error) EstablishConnection(ctx context.Context, dest string) (OriginConnection, error)
} }
func (o *unixSocketPath) RoundTrip(req *http.Request) (*http.Response, error) { func (o *unixSocketPath) RoundTrip(req *http.Request) (*http.Response, error) {
@ -59,8 +53,8 @@ func (o *statusCode) RoundTrip(_ *http.Request) (*http.Response, error) {
return resp, nil return resp, nil
} }
func (o *rawTCPService) EstablishConnection(dest string) (OriginConnection, error) { func (o *rawTCPService) EstablishConnection(ctx context.Context, dest string) (OriginConnection, error) {
conn, err := net.Dial("tcp", dest) conn, err := o.dialer.DialContext(ctx, "tcp", dest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -71,13 +65,13 @@ func (o *rawTCPService) EstablishConnection(dest string) (OriginConnection, erro
return originConn, nil return originConn, nil
} }
func (o *tcpOverWSService) EstablishConnection(dest string) (OriginConnection, error) { func (o *tcpOverWSService) EstablishConnection(ctx context.Context, dest string) (OriginConnection, error) {
var err error var err error
if !o.isBastion { if !o.isBastion {
dest = o.dest dest = o.dest
} }
conn, err := net.Dial("tcp", dest) conn, err := o.dialer.DialContext(ctx, "tcp", dest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -89,6 +83,6 @@ func (o *tcpOverWSService) EstablishConnection(dest string) (OriginConnection, e
} }
func (o *socksProxyOverWSService) EstablishConnection(dest string) (OriginConnection, error) { func (o *socksProxyOverWSService) EstablishConnection(_ctx context.Context, _dest string) (OriginConnection, error) {
return o.conn, nil return o.conn, nil
} }

View File

@ -36,7 +36,7 @@ func TestRawTCPServiceEstablishConnection(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Origin not listening for new connection, should return an error // Origin not listening for new connection, should return an error
_, err = rawTCPService.EstablishConnection(req.URL.String()) _, err = rawTCPService.EstablishConnection(context.Background(), req.URL.String())
require.Error(t, err) require.Error(t, err)
} }
@ -87,7 +87,7 @@ func TestTCPOverWSServiceEstablishConnection(t *testing.T) {
t.Run(test.testCase, func(t *testing.T) { t.Run(test.testCase, func(t *testing.T) {
if test.expectErr { if test.expectErr {
bastionHost, _ := carrier.ResolveBastionDest(test.req) bastionHost, _ := carrier.ResolveBastionDest(test.req)
_, err := test.service.EstablishConnection(bastionHost) _, err := test.service.EstablishConnection(context.Background(), bastionHost)
assert.Error(t, err) assert.Error(t, err)
} }
}) })
@ -99,7 +99,7 @@ func TestTCPOverWSServiceEstablishConnection(t *testing.T) {
for _, service := range []*tcpOverWSService{newTCPOverWSService(originURL), newBastionService()} { for _, service := range []*tcpOverWSService{newTCPOverWSService(originURL), newBastionService()} {
// Origin not listening for new connection, should return an error // Origin not listening for new connection, should return an error
bastionHost, _ := carrier.ResolveBastionDest(bastionReq) bastionHost, _ := carrier.ResolveBastionDest(bastionReq)
_, err := service.EstablishConnection(bastionHost) _, err := service.EstablishConnection(context.Background(), bastionHost)
assert.Error(t, err) assert.Error(t, err)
} }
} }

View File

@ -92,6 +92,7 @@ func (o httpService) MarshalJSON() ([]byte, error) {
// It's used by warp routing // It's used by warp routing
type rawTCPService struct { type rawTCPService struct {
name string name string
dialer net.Dialer
} }
func (o *rawTCPService) String() string { func (o *rawTCPService) String() string {
@ -113,6 +114,7 @@ type tcpOverWSService struct {
dest string dest string
isBastion bool isBastion bool
streamHandler streamHandlerFunc streamHandler streamHandlerFunc
dialer net.Dialer
} }
type socksProxyOverWSService struct { type socksProxyOverWSService struct {
@ -176,6 +178,8 @@ func (o *tcpOverWSService) start(log *zerolog.Logger, _ <-chan struct{}, cfg Ori
} else { } else {
o.streamHandler = DefaultStreamHandler o.streamHandler = DefaultStreamHandler
} }
o.dialer.Timeout = cfg.ConnectTimeout.Duration
o.dialer.KeepAlive = cfg.TCPKeepAlive.Duration
return nil return nil
} }

View File

@ -20,7 +20,7 @@ type newLocalConfig struct {
// Config is the original config as read and parsed by cloudflared. // Config is the original config as read and parsed by cloudflared.
type Config struct { type Config struct {
Ingress *ingress.Ingress Ingress *ingress.Ingress
WarpRoutingEnabled bool WarpRouting ingress.WarpRoutingConfig
// Extra settings used to configure this instance but that are not eligible for remotely management // Extra settings used to configure this instance but that are not eligible for remotely management
// ie. (--protocol, --loglevel, ...) // ie. (--protocol, --loglevel, ...)
@ -37,7 +37,7 @@ func (rc *newLocalConfig) MarshalJSON() ([]byte, error) {
// UI doesn't support top level configs, so we reconcile to individual ingress configs. // UI doesn't support top level configs, so we reconcile to individual ingress configs.
GlobalOriginRequest: nil, GlobalOriginRequest: nil,
IngressRules: convertToUnvalidatedIngressRules(rc.RemoteConfig.Ingress), IngressRules: convertToUnvalidatedIngressRules(rc.RemoteConfig.Ingress),
WarpRouting: rc.RemoteConfig.WarpRouting, WarpRouting: rc.RemoteConfig.WarpRouting.RawConfig(),
}, },
} }

View File

@ -3,16 +3,17 @@ package orchestration
import ( import (
"encoding/json" "encoding/json"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/cloudflare/cloudflared/config"
"github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/ingress"
) )
// TestNewLocalConfig_MarshalJSON tests that we are able to converte a compiled and validated config back // TestNewLocalConfig_MarshalJSON tests that we are able to converte a compiled and validated config back
// into an "unvalidated" format which is compatible with Remote Managed configurations. // into an "unvalidated" format which is compatible with Remote Managed configurations.
func TestNewLocalConfig_MarshalJSON(t *testing.T) { func TestNewLocalConfig_MarshalJSON(t *testing.T) {
rawConfig := []byte(` rawConfig := []byte(`
{ {
"originRequest": { "originRequest": {
@ -57,7 +58,11 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) {
] ]
} }
} }
] ],
"warp-routing": {
"enabled": true,
"connectTimeout": 1
}
} }
`) `)
@ -73,10 +78,18 @@ func TestNewLocalConfig_MarshalJSON(t *testing.T) {
jsonSerde, err := json.Marshal(c) jsonSerde, err := json.Marshal(c)
require.NoError(t, err) require.NoError(t, err)
var config ingress.RemoteConfig var remoteConfig ingress.RemoteConfig
err = json.Unmarshal(jsonSerde, &config) err = json.Unmarshal(jsonSerde, &remoteConfig)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, config.WarpRouting.Enabled, false) require.Equal(t, remoteConfig.WarpRouting, ingress.WarpRoutingConfig{
require.Equal(t, config.Ingress.Rules, expectedConfig.Ingress.Rules) Enabled: true,
ConnectTimeout: config.CustomDuration{
Duration: time.Second,
},
TCPKeepAlive: config.CustomDuration{
Duration: 30 * time.Second, // default value is 30 seconds
},
})
require.Equal(t, remoteConfig.Ingress.Rules, expectedConfig.Ingress.Rules)
} }

View File

@ -47,7 +47,7 @@ func NewOrchestrator(ctx context.Context, config *Config, tags []tunnelpogs.Tag,
log: log, log: log,
shutdownC: ctx.Done(), shutdownC: ctx.Done(),
} }
if err := o.updateIngress(*config.Ingress, config.WarpRoutingEnabled); err != nil { if err := o.updateIngress(*config.Ingress, config.WarpRouting); err != nil {
return nil, err return nil, err
} }
go o.waitToCloseLastProxy() go o.waitToCloseLastProxy()
@ -80,7 +80,7 @@ func (o *Orchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.Up
} }
} }
if err := o.updateIngress(newConf.Ingress, newConf.WarpRouting.Enabled); err != nil { if err := o.updateIngress(newConf.Ingress, newConf.WarpRouting); err != nil {
o.log.Err(err). o.log.Err(err).
Int32("version", version). Int32("version", version).
Str("config", string(config)). Str("config", string(config)).
@ -103,7 +103,7 @@ func (o *Orchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.Up
} }
// The caller is responsible to make sure there is no concurrent access // The caller is responsible to make sure there is no concurrent access
func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRoutingEnabled bool) error { func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting ingress.WarpRoutingConfig) error {
select { select {
case <-o.shutdownC: case <-o.shutdownC:
return fmt.Errorf("cloudflared already shutdown") return fmt.Errorf("cloudflared already shutdown")
@ -118,10 +118,10 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRoutingEn
if err := ingressRules.StartOrigins(o.log, proxyShutdownC); err != nil { if err := ingressRules.StartOrigins(o.log, proxyShutdownC); err != nil {
return errors.Wrap(err, "failed to start origin") return errors.Wrap(err, "failed to start origin")
} }
newProxy := proxy.NewOriginProxy(ingressRules, warpRoutingEnabled, o.tags, o.log) newProxy := proxy.NewOriginProxy(ingressRules, warpRouting, o.tags, o.log)
o.proxy.Store(newProxy) o.proxy.Store(newProxy)
o.config.Ingress = &ingressRules o.config.Ingress = &ingressRules
o.config.WarpRoutingEnabled = warpRoutingEnabled o.config.WarpRouting = warpRouting
// If proxyShutdownC is nil, there is no previous running proxy // If proxyShutdownC is nil, there is no previous running proxy
if o.proxyShutdownC != nil { if o.proxyShutdownC != nil {
@ -139,7 +139,7 @@ func (o *Orchestrator) GetConfigJSON() ([]byte, error) {
c := &newLocalConfig{ c := &newLocalConfig{
RemoteConfig: ingress.RemoteConfig{ RemoteConfig: ingress.RemoteConfig{
Ingress: *o.config.Ingress, Ingress: *o.config.Ingress,
WarpRouting: config.WarpRoutingConfig{Enabled: o.config.WarpRoutingEnabled}, WarpRouting: o.config.WarpRouting,
}, },
ConfigurationFlags: o.config.ConfigurationFlags, ConfigurationFlags: o.config.ConfigurationFlags,
} }
@ -166,7 +166,7 @@ func (o *Orchestrator) GetVersionedConfigJSON() ([]byte, error) {
OriginRequest ingress.OriginRequestConfig `json:"originRequest"` OriginRequest ingress.OriginRequestConfig `json:"originRequest"`
}{ }{
Ingress: o.config.Ingress.Rules, Ingress: o.config.Ingress.Rules,
WarpRouting: config.WarpRoutingConfig{Enabled: o.config.WarpRoutingEnabled}, WarpRouting: o.config.WarpRouting.RawConfig(),
OriginRequest: o.config.Ingress.Defaults, OriginRequest: o.config.Ingress.Defaults,
}, },
} }

View File

@ -49,7 +49,6 @@ var (
func TestUpdateConfiguration(t *testing.T) { func TestUpdateConfiguration(t *testing.T) {
initConfig := &Config{ initConfig := &Config{
Ingress: &ingress.Ingress{}, Ingress: &ingress.Ingress{},
WarpRoutingEnabled: false,
} }
orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, &testLogger) orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, &testLogger)
require.NoError(t, err) require.NoError(t, err)
@ -87,7 +86,8 @@ func TestUpdateConfiguration(t *testing.T) {
} }
], ],
"warp-routing": { "warp-routing": {
"enabled": true "enabled": true,
"connectTimeout": 10
} }
} }
`) `)
@ -121,7 +121,8 @@ func TestUpdateConfiguration(t *testing.T) {
require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout) require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout)
require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify) require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify)
require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs) require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs)
require.True(t, configV2.WarpRoutingEnabled) require.True(t, configV2.WarpRouting.Enabled)
require.Equal(t, configV2.WarpRouting.ConnectTimeout.Duration, 10*time.Second)
originProxyV2, err := orchestrator.GetOriginProxy() originProxyV2, err := orchestrator.GetOriginProxy()
require.NoError(t, err) require.NoError(t, err)
@ -164,7 +165,7 @@ func TestUpdateConfiguration(t *testing.T) {
require.Len(t, configV10.Ingress.Rules, 1) require.Len(t, configV10.Ingress.Rules, 1)
require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10")) require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10"))
require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String()) require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String())
require.False(t, configV10.WarpRoutingEnabled) require.False(t, configV10.WarpRouting.Enabled)
originProxyV10, err := orchestrator.GetOriginProxy() originProxyV10, err := orchestrator.GetOriginProxy()
require.NoError(t, err) require.NoError(t, err)
@ -247,7 +248,6 @@ func TestConcurrentUpdateAndRead(t *testing.T) {
initConfig = &Config{ initConfig = &Config{
Ingress: &ingress.Ingress{}, Ingress: &ingress.Ingress{},
WarpRoutingEnabled: false,
} }
) )
@ -477,7 +477,6 @@ func TestClosePreviousProxies(t *testing.T) {
`) `)
initConfig = &Config{ initConfig = &Config{
Ingress: &ingress.Ingress{}, Ingress: &ingress.Ingress{},
WarpRoutingEnabled: false,
} }
) )
@ -535,7 +534,6 @@ func TestPersistentConnection(t *testing.T) {
msg := t.Name() msg := t.Name()
initConfig := &Config{ initConfig := &Config{
Ingress: &ingress.Ingress{}, Ingress: &ingress.Ingress{},
WarpRoutingEnabled: false,
} }
orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, &testLogger) orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, &testLogger)
require.NoError(t, err) require.NoError(t, err)
@ -646,7 +644,6 @@ func TestSerializeLocalConfig(t *testing.T) {
c := &newLocalConfig{ c := &newLocalConfig{
RemoteConfig: ingress.RemoteConfig{ RemoteConfig: ingress.RemoteConfig{
Ingress: ingress.Ingress{}, Ingress: ingress.Ingress{},
WarpRouting: config.WarpRoutingConfig{},
}, },
ConfigurationFlags: map[string]string{"a": "b"}, ConfigurationFlags: map[string]string{"a": "b"},
} }

View File

@ -42,7 +42,7 @@ type Proxy struct {
// NewOriginProxy returns a new instance of the Proxy struct. // NewOriginProxy returns a new instance of the Proxy struct.
func NewOriginProxy( func NewOriginProxy(
ingressRules ingress.Ingress, ingressRules ingress.Ingress,
warpRoutingEnabled bool, warpRouting ingress.WarpRoutingConfig,
tags []tunnelpogs.Tag, tags []tunnelpogs.Tag,
log *zerolog.Logger, log *zerolog.Logger,
) *Proxy { ) *Proxy {
@ -51,8 +51,8 @@ func NewOriginProxy(
tags: tags, tags: tags,
log: log, log: log,
} }
if warpRoutingEnabled { if warpRouting.Enabled {
proxy.warpRouting = ingress.NewWarpRoutingService() proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting)
log.Info().Msgf("Warp-routing is enabled") log.Info().Msgf("Warp-routing is enabled")
} }
@ -108,7 +108,7 @@ func (p *Proxy) ProxyHTTP(
} }
rws := connection.NewHTTPResponseReadWriterAcker(w, req) rws := connection.NewHTTPResponseReadWriterAcker(w, req)
if err := p.proxyStream(req.Context(), rws, dest, originProxy, logFields); err != nil { if err := p.proxyStream(req.Context(), rws, dest, originProxy); err != nil {
rule, srv := ruleField(p.ingressRules, ruleNum) rule, srv := ruleField(p.ingressRules, ruleNum)
p.logRequestError(err, cfRay, "", rule, srv) p.logRequestError(err, cfRay, "", rule, srv)
return err return err
@ -137,15 +137,9 @@ func (p *Proxy) ProxyTCP(
serveCtx, cancel := context.WithCancel(ctx) serveCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
logFields := logFields{
cfRay: req.CFRay,
lbProbe: req.LBProbe,
rule: ingress.ServiceWarpRouting,
flowID: req.FlowID,
}
p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream started") p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream started")
if err := p.proxyStream(serveCtx, rwa, req.Dest, p.warpRouting.Proxy, logFields); err != nil {
if err := p.proxyStream(serveCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil {
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting) p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
return err return err
} }
@ -255,9 +249,8 @@ func (p *Proxy) proxyStream(
rwa connection.ReadWriteAcker, rwa connection.ReadWriteAcker,
dest string, dest string,
connectionProxy ingress.StreamBasedOriginProxy, connectionProxy ingress.StreamBasedOriginProxy,
fields logFields,
) error { ) error {
originConn, err := connectionProxy.EstablishConnection(dest) originConn, err := connectionProxy.EstablishConnection(ctx, dest)
if err != nil { if err != nil {
return err return err
} }

View File

@ -33,6 +33,11 @@ import (
var ( var (
testTags = []tunnelpogs.Tag{tunnelpogs.Tag{Name: "Name", Value: "value"}} testTags = []tunnelpogs.Tag{tunnelpogs.Tag{Name: "Name", Value: "value"}}
noWarpRouting = ingress.WarpRoutingConfig{}
testWarpRouting = ingress.WarpRoutingConfig{
Enabled: true,
ConnectTimeout: config.CustomDuration{Duration: time.Second},
}
) )
type mockHTTPRespWriter struct { type mockHTTPRespWriter struct {
@ -138,7 +143,7 @@ func TestProxySingleOrigin(t *testing.T) {
require.NoError(t, ingressRule.StartOrigins(&log, ctx.Done())) require.NoError(t, ingressRule.StartOrigins(&log, ctx.Done()))
proxy := NewOriginProxy(ingressRule, false, testTags, &log) proxy := NewOriginProxy(ingressRule, noWarpRouting, testTags, &log)
t.Run("testProxyHTTP", testProxyHTTP(proxy)) t.Run("testProxyHTTP", testProxyHTTP(proxy))
t.Run("testProxyWebsocket", testProxyWebsocket(proxy)) t.Run("testProxyWebsocket", testProxyWebsocket(proxy))
t.Run("testProxySSE", testProxySSE(proxy)) t.Run("testProxySSE", testProxySSE(proxy))
@ -345,7 +350,7 @@ func runIngressTestScenarios(t *testing.T, unvalidatedIngress []config.Unvalidat
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, ingress.StartOrigins(&log, ctx.Done())) require.NoError(t, ingress.StartOrigins(&log, ctx.Done()))
proxy := NewOriginProxy(ingress, false, testTags, &log) proxy := NewOriginProxy(ingress, noWarpRouting, testTags, &log)
for _, test := range tests { for _, test := range tests {
responseWriter := newMockHTTPRespWriter() responseWriter := newMockHTTPRespWriter()
@ -393,7 +398,7 @@ func TestProxyError(t *testing.T) {
log := zerolog.Nop() log := zerolog.Nop()
proxy := NewOriginProxy(ing, false, testTags, &log) proxy := NewOriginProxy(ing, noWarpRouting, testTags, &log)
responseWriter := newMockHTTPRespWriter() responseWriter := newMockHTTPRespWriter()
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1", nil) req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1", nil)
@ -509,7 +514,7 @@ func TestConnections(t *testing.T) {
originService: runEchoTCPService, originService: runEchoTCPService,
eyeballResponseWriter: newTCPRespWriter(replayer), eyeballResponseWriter: newTCPRespWriter(replayer),
eyeballRequestBody: newTCPRequestBody([]byte("test2")), eyeballRequestBody: newTCPRequestBody([]byte("test2")),
warpRoutingService: ingress.NewWarpRoutingService(), warpRoutingService: ingress.NewWarpRoutingService(testWarpRouting),
connectionType: connection.TypeTCP, connectionType: connection.TypeTCP,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
"Cf-Cloudflared-Proxy-Src": {"non-blank-value"}, "Cf-Cloudflared-Proxy-Src": {"non-blank-value"},
@ -526,7 +531,7 @@ func TestConnections(t *testing.T) {
originService: runEchoWSService, originService: runEchoWSService,
// eyeballResponseWriter gets set after roundtrip dial. // eyeballResponseWriter gets set after roundtrip dial.
eyeballRequestBody: newPipedWSRequestBody([]byte("test3")), eyeballRequestBody: newPipedWSRequestBody([]byte("test3")),
warpRoutingService: ingress.NewWarpRoutingService(), warpRoutingService: ingress.NewWarpRoutingService(testWarpRouting),
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
"Cf-Cloudflared-Proxy-Src": {"non-blank-value"}, "Cf-Cloudflared-Proxy-Src": {"non-blank-value"},
}, },
@ -652,7 +657,7 @@ func TestConnections(t *testing.T) {
ingressRule := createSingleIngressConfig(t, test.args.ingressServiceScheme+ln.Addr().String()) ingressRule := createSingleIngressConfig(t, test.args.ingressServiceScheme+ln.Addr().String())
ingressRule.StartOrigins(logger, ctx.Done()) ingressRule.StartOrigins(logger, ctx.Done())
proxy := NewOriginProxy(ingressRule, true, testTags, logger) proxy := NewOriginProxy(ingressRule, testWarpRouting, testTags, logger)
proxy.warpRouting = test.args.warpRoutingService proxy.warpRouting = test.args.warpRoutingService
dest := ln.Addr().String() dest := ln.Addr().String()