diff --git a/config/configuration.go b/config/configuration.go index 05122d76..a3b65ad3 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -155,7 +155,7 @@ func FindOrCreateConfigPath() string { // i.e. it fails if a user specifies both --url and --unix-socket func ValidateUnixSocket(c *cli.Context) (string, error) { if c.IsSet("unix-socket") && (c.IsSet("url") || c.NArg() > 0) { - return "", errors.New("--unix-socket must be used exclusivly.") + return "", errors.New("--unix-socket must be used exclusively.") } return c.String("unix-socket"), nil } @@ -260,6 +260,7 @@ type Configuration struct { type WarpRoutingConfig struct { ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"` + MaxActiveFlows *uint64 `yaml:"maxActiveFlows" json:"maxActiveFlows,omitempty"` TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"` } diff --git a/ingress/config.go b/ingress/config.go index 670a9db1..83f893fe 100644 --- a/ingress/config.go +++ b/ingress/config.go @@ -22,6 +22,7 @@ var ( const ( defaultProxyAddress = "127.0.0.1" defaultKeepAliveConnections = 100 + defaultMaxActiveFlows = 0 // unlimited SSHServerFlag = "ssh-server" Socks5Flag = "socks5" ProxyConnectTimeoutFlag = "proxy-connect-timeout" @@ -46,17 +47,22 @@ const ( type WarpRoutingConfig struct { ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"` + MaxActiveFlows uint64 `yaml:"maxActiveFlows" json:"MaxActiveFlows,omitempty"` TCPKeepAlive config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"` } func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig { cfg := WarpRoutingConfig{ ConnectTimeout: defaultWarpRoutingConnectTimeout, + MaxActiveFlows: defaultMaxActiveFlows, TCPKeepAlive: defaultTCPKeepAlive, } if raw.ConnectTimeout != nil { cfg.ConnectTimeout = *raw.ConnectTimeout } + if raw.MaxActiveFlows != nil { + cfg.MaxActiveFlows = *raw.MaxActiveFlows + } if raw.TCPKeepAlive != nil { cfg.TCPKeepAlive = *raw.TCPKeepAlive } @@ -68,6 +74,9 @@ func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig { if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration { raw.ConnectTimeout = &c.ConnectTimeout } + if c.MaxActiveFlows != defaultMaxActiveFlows { + raw.MaxActiveFlows = &c.MaxActiveFlows + } if c.TCPKeepAlive.Duration != defaultTCPKeepAlive.Duration { raw.TCPKeepAlive = &c.TCPKeepAlive } @@ -172,6 +181,7 @@ func originRequestFromSingleRule(c *cli.Context) OriginRequestConfig { } if flag := ProxyPortFlag; c.IsSet(flag) { // Note TUN-3758 , we use Int because UInt is not supported with altsrc + // nolint: gosec proxyPort = uint(c.Int(flag)) } if flag := Http2OriginFlag; c.IsSet(flag) { @@ -551,7 +561,7 @@ func convertToRawIPRules(ipRules []ipaccess.Rule) []config.IngressIPRule { } func defaultBoolToNil(b bool) *bool { - if b == false { + if !b { return nil } diff --git a/orchestration/orchestrator.go b/orchestration/orchestrator.go index 1dd25f77..1e69e66f 100644 --- a/orchestration/orchestrator.go +++ b/orchestration/orchestrator.go @@ -58,7 +58,7 @@ func NewOrchestrator(ctx context.Context, internalRules: internalRules, config: config, tags: tags, - sessionLimiter: cfdsession.NewLimiter(0), + sessionLimiter: cfdsession.NewLimiter(config.WarpRouting.MaxActiveFlows), log: log, shutdownC: ctx.Done(), } @@ -141,6 +141,10 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i if err := ingressRules.StartOrigins(o.log, proxyShutdownC); err != nil { return errors.Wrap(err, "failed to start origin") } + + // Update the sessions limit since the configuration might have changed + o.sessionLimiter.SetLimit(warpRouting.MaxActiveFlows) + proxy := proxy.NewOriginProxy(ingressRules, warpRouting, o.tags, o.sessionLimiter, o.config.WriteTimeout, o.log) o.proxy.Store(proxy) o.config.Ingress = &ingressRules