TUN-8861: Add configuration for active sessions limiter
## Summary This commit adds a new configuration in the warp routing config to allow users to define the active sessions limit value.
This commit is contained in:
		
							parent
							
								
									8bfe111cab
								
							
						
					
					
						commit
						8c2eda16c1
					
				| 
						 | 
				
			
			@ -155,7 +155,7 @@ func FindOrCreateConfigPath() string {
 | 
			
		|||
// i.e. it fails if a user specifies both --url and --unix-socket
 | 
			
		||||
func ValidateUnixSocket(c *cli.Context) (string, error) {
 | 
			
		||||
	if c.IsSet("unix-socket") && (c.IsSet("url") || c.NArg() > 0) {
 | 
			
		||||
		return "", errors.New("--unix-socket must be used exclusivly.")
 | 
			
		||||
		return "", errors.New("--unix-socket must be used exclusively.")
 | 
			
		||||
	}
 | 
			
		||||
	return c.String("unix-socket"), nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -260,6 +260,7 @@ type Configuration struct {
 | 
			
		|||
 | 
			
		||||
type WarpRoutingConfig struct {
 | 
			
		||||
	ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
 | 
			
		||||
	MaxActiveFlows *uint64         `yaml:"maxActiveFlows" json:"maxActiveFlows,omitempty"`
 | 
			
		||||
	TCPKeepAlive   *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,6 +22,7 @@ var (
 | 
			
		|||
const (
 | 
			
		||||
	defaultProxyAddress           = "127.0.0.1"
 | 
			
		||||
	defaultKeepAliveConnections   = 100
 | 
			
		||||
	defaultMaxActiveFlows         = 0 // unlimited
 | 
			
		||||
	SSHServerFlag                 = "ssh-server"
 | 
			
		||||
	Socks5Flag                    = "socks5"
 | 
			
		||||
	ProxyConnectTimeoutFlag       = "proxy-connect-timeout"
 | 
			
		||||
| 
						 | 
				
			
			@ -46,17 +47,22 @@ const (
 | 
			
		|||
 | 
			
		||||
type WarpRoutingConfig struct {
 | 
			
		||||
	ConnectTimeout config.CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"`
 | 
			
		||||
	MaxActiveFlows uint64                `yaml:"maxActiveFlows" json:"MaxActiveFlows,omitempty"`
 | 
			
		||||
	TCPKeepAlive   config.CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewWarpRoutingConfig(raw *config.WarpRoutingConfig) WarpRoutingConfig {
 | 
			
		||||
	cfg := WarpRoutingConfig{
 | 
			
		||||
		ConnectTimeout: defaultWarpRoutingConnectTimeout,
 | 
			
		||||
		MaxActiveFlows: defaultMaxActiveFlows,
 | 
			
		||||
		TCPKeepAlive:   defaultTCPKeepAlive,
 | 
			
		||||
	}
 | 
			
		||||
	if raw.ConnectTimeout != nil {
 | 
			
		||||
		cfg.ConnectTimeout = *raw.ConnectTimeout
 | 
			
		||||
	}
 | 
			
		||||
	if raw.MaxActiveFlows != nil {
 | 
			
		||||
		cfg.MaxActiveFlows = *raw.MaxActiveFlows
 | 
			
		||||
	}
 | 
			
		||||
	if raw.TCPKeepAlive != nil {
 | 
			
		||||
		cfg.TCPKeepAlive = *raw.TCPKeepAlive
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -68,6 +74,9 @@ func (c *WarpRoutingConfig) RawConfig() config.WarpRoutingConfig {
 | 
			
		|||
	if c.ConnectTimeout.Duration != defaultWarpRoutingConnectTimeout.Duration {
 | 
			
		||||
		raw.ConnectTimeout = &c.ConnectTimeout
 | 
			
		||||
	}
 | 
			
		||||
	if c.MaxActiveFlows != defaultMaxActiveFlows {
 | 
			
		||||
		raw.MaxActiveFlows = &c.MaxActiveFlows
 | 
			
		||||
	}
 | 
			
		||||
	if c.TCPKeepAlive.Duration != defaultTCPKeepAlive.Duration {
 | 
			
		||||
		raw.TCPKeepAlive = &c.TCPKeepAlive
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -172,6 +181,7 @@ func originRequestFromSingleRule(c *cli.Context) OriginRequestConfig {
 | 
			
		|||
	}
 | 
			
		||||
	if flag := ProxyPortFlag; c.IsSet(flag) {
 | 
			
		||||
		// Note TUN-3758 , we use Int because UInt is not supported with altsrc
 | 
			
		||||
		// nolint: gosec
 | 
			
		||||
		proxyPort = uint(c.Int(flag))
 | 
			
		||||
	}
 | 
			
		||||
	if flag := Http2OriginFlag; c.IsSet(flag) {
 | 
			
		||||
| 
						 | 
				
			
			@ -551,7 +561,7 @@ func convertToRawIPRules(ipRules []ipaccess.Rule) []config.IngressIPRule {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func defaultBoolToNil(b bool) *bool {
 | 
			
		||||
	if b == false {
 | 
			
		||||
	if !b {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,7 +58,7 @@ func NewOrchestrator(ctx context.Context,
 | 
			
		|||
		internalRules:  internalRules,
 | 
			
		||||
		config:         config,
 | 
			
		||||
		tags:           tags,
 | 
			
		||||
		sessionLimiter: cfdsession.NewLimiter(0),
 | 
			
		||||
		sessionLimiter: cfdsession.NewLimiter(config.WarpRouting.MaxActiveFlows),
 | 
			
		||||
		log:            log,
 | 
			
		||||
		shutdownC:      ctx.Done(),
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -141,6 +141,10 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting i
 | 
			
		|||
	if err := ingressRules.StartOrigins(o.log, proxyShutdownC); err != nil {
 | 
			
		||||
		return errors.Wrap(err, "failed to start origin")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Update the sessions limit since the configuration might have changed
 | 
			
		||||
	o.sessionLimiter.SetLimit(warpRouting.MaxActiveFlows)
 | 
			
		||||
 | 
			
		||||
	proxy := proxy.NewOriginProxy(ingressRules, warpRouting, o.tags, o.sessionLimiter, o.config.WriteTimeout, o.log)
 | 
			
		||||
	o.proxy.Store(proxy)
 | 
			
		||||
	o.config.Ingress = &ingressRules
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue