diff --git a/cmd/cloudflared/tunnel/cmd.go b/cmd/cloudflared/tunnel/cmd.go index 3eed3436..e9038403 100644 --- a/cmd/cloudflared/tunnel/cmd.go +++ b/cmd/cloudflared/tunnel/cmd.go @@ -343,13 +343,13 @@ func StartServer( observer.SendURL(quickTunnelURL) } - tunnelConfig, dynamicConfig, err := prepareTunnelConfig(c, info, log, logTransport, observer, namedTunnel) + tunnelConfig, orchestratorConfig, err := prepareTunnelConfig(c, info, log, logTransport, observer, namedTunnel) if err != nil { log.Err(err).Msg("Couldn't start tunnel") return err } - orchestrator, err := orchestration.NewOrchestrator(ctx, dynamicConfig, tunnelConfig.Tags, tunnelConfig.Log) + orchestrator, err := orchestration.NewOrchestrator(ctx, orchestratorConfig, tunnelConfig.Tags, tunnelConfig.Log) if err != nil { return err } @@ -388,7 +388,7 @@ func StartServer( info.Version(), hostname, metricsListener.Addr().String(), - dynamicConfig.Ingress, + orchestratorConfig.Ingress, tunnelConfig.HAConnections, ) app := tunnelUI.Launch(ctx, log, logTransport) diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index 778ec6b3..9a0728c5 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -43,6 +43,8 @@ var ( secretFlags = [2]*altsrc.StringFlag{credentialsContentsFlag, tunnelTokenFlag} defaultFeatures = []string{supervisor.FeatureAllowRemoteConfig, supervisor.FeatureSerializedHeaders} + + configFlags = []string{"autoupdate-freq", "no-autoupdate", "retries", "protocol", "loglevel", "transport-loglevel", "origincert", "metrics", "metrics-update-freq"} ) // returns the first path that contains a cert.pem file. If none of the DefaultConfigSearchDirectories @@ -348,11 +350,24 @@ func prepareTunnelConfig( ProtocolSelector: protocolSelector, EdgeTLSConfigs: edgeTLSConfigs, } - dynamicConfig := &orchestration.Config{ + orchestratorConfig := &orchestration.Config{ Ingress: &ingressRules, WarpRoutingEnabled: warpRoutingEnabled, + ConfigurationFlags: parseConfigFlags(c), } - return tunnelConfig, dynamicConfig, nil + return tunnelConfig, orchestratorConfig, nil +} + +func parseConfigFlags(c *cli.Context) map[string]string { + result := make(map[string]string) + + for _, flag := range configFlags { + if v := c.String(flag); c.IsSet(flag) && v != "" { + result[flag] = v + } + } + + return result } func gracePeriod(c *cli.Context) (time.Duration, error) { diff --git a/config/configuration.go b/config/configuration.go index bb7a10b8..49395404 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -177,9 +177,9 @@ func ValidateUrl(c *cli.Context, allowURLFromArgs bool) (*url.URL, error) { } type UnvalidatedIngressRule struct { - Hostname string `json:"hostname"` - Path string `json:"path"` - Service string `json:"service"` + Hostname string `json:"hostname,omitempty"` + Path string `json:"path,omitempty"` + Service string `json:"service,omitempty"` OriginRequest OriginRequestConfig `yaml:"originRequest" json:"originRequest"` } @@ -192,41 +192,41 @@ type UnvalidatedIngressRule struct { // - To specify a time.Duration in json, use int64 of the nanoseconds type OriginRequestConfig struct { // HTTP proxy timeout for establishing a new connection - ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout"` + ConnectTimeout *CustomDuration `yaml:"connectTimeout" json:"connectTimeout,omitempty"` // HTTP proxy timeout for completing a TLS handshake - TLSTimeout *CustomDuration `yaml:"tlsTimeout" json:"tlsTimeout"` + TLSTimeout *CustomDuration `yaml:"tlsTimeout" json:"tlsTimeout,omitempty"` // HTTP proxy TCP keepalive duration - TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive"` + TCPKeepAlive *CustomDuration `yaml:"tcpKeepAlive" json:"tcpKeepAlive,omitempty"` // HTTP proxy should disable "happy eyeballs" for IPv4/v6 fallback - NoHappyEyeballs *bool `yaml:"noHappyEyeballs" json:"noHappyEyeballs"` + NoHappyEyeballs *bool `yaml:"noHappyEyeballs" json:"noHappyEyeballs,omitempty"` // HTTP proxy maximum keepalive connection pool size - KeepAliveConnections *int `yaml:"keepAliveConnections" json:"keepAliveConnections"` + KeepAliveConnections *int `yaml:"keepAliveConnections" json:"keepAliveConnections,omitempty"` // HTTP proxy timeout for closing an idle connection - KeepAliveTimeout *CustomDuration `yaml:"keepAliveTimeout" json:"keepAliveTimeout"` + KeepAliveTimeout *CustomDuration `yaml:"keepAliveTimeout" json:"keepAliveTimeout,omitempty"` // Sets the HTTP Host header for the local webserver. - HTTPHostHeader *string `yaml:"httpHostHeader" json:"httpHostHeader"` + HTTPHostHeader *string `yaml:"httpHostHeader" json:"httpHostHeader,omitempty"` // Hostname on the origin server certificate. - OriginServerName *string `yaml:"originServerName" json:"originServerName"` + OriginServerName *string `yaml:"originServerName" json:"originServerName,omitempty"` // Path to the CA for the certificate of your origin. // This option should be used only if your certificate is not signed by Cloudflare. - CAPool *string `yaml:"caPool" json:"caPool"` + CAPool *string `yaml:"caPool" json:"caPool,omitempty"` // Disables TLS verification of the certificate presented by your origin. // Will allow any certificate from the origin to be accepted. // Note: The connection from your machine to Cloudflare's Edge is still encrypted. - NoTLSVerify *bool `yaml:"noTLSVerify" json:"noTLSVerify"` + NoTLSVerify *bool `yaml:"noTLSVerify" json:"noTLSVerify,omitempty"` // Disables chunked transfer encoding. // Useful if you are running a WSGI server. - DisableChunkedEncoding *bool `yaml:"disableChunkedEncoding" json:"disableChunkedEncoding"` + DisableChunkedEncoding *bool `yaml:"disableChunkedEncoding" json:"disableChunkedEncoding,omitempty"` // Runs as jump host - BastionMode *bool `yaml:"bastionMode" json:"bastionMode"` + BastionMode *bool `yaml:"bastionMode" json:"bastionMode,omitempty"` // Listen address for the proxy. - ProxyAddress *string `yaml:"proxyAddress" json:"proxyAddress"` + ProxyAddress *string `yaml:"proxyAddress" json:"proxyAddress,omitempty"` // Listen port for the proxy. - ProxyPort *uint `yaml:"proxyPort" json:"proxyPort"` + ProxyPort *uint `yaml:"proxyPort" json:"proxyPort,omitempty"` // Valid options are 'socks' or empty. - ProxyType *string `yaml:"proxyType" json:"proxyType"` + ProxyType *string `yaml:"proxyType" json:"proxyType,omitempty"` // IP rules for the proxy service - IPRules []IngressIPRule `yaml:"ipRules" json:"ipRules"` + IPRules []IngressIPRule `yaml:"ipRules" json:"ipRules,omitempty"` } type IngressIPRule struct { diff --git a/connection/connection.go b/connection/connection.go index ef60a2f9..aa66e494 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -30,6 +30,7 @@ var switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, type Orchestrator interface { UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse + GetConfigJSON() ([]byte, error) GetOriginProxy() (OriginProxy, error) } diff --git a/connection/connection_test.go b/connection/connection_test.go index 63390165..b8c3b674 100644 --- a/connection/connection_test.go +++ b/connection/connection_test.go @@ -42,6 +42,10 @@ type mockOrchestrator struct { originProxy OriginProxy } +func (mcr *mockOrchestrator) GetConfigJSON() ([]byte, error) { + return nil, fmt.Errorf("not implemented") +} + func (*mockOrchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse { return &tunnelpogs.UpdateConfigurationResponse{ LastAppliedVersion: version, diff --git a/connection/control.go b/connection/control.go index 2467e80a..1b28ceb1 100644 --- a/connection/control.go +++ b/connection/control.go @@ -30,11 +30,15 @@ type controlStream struct { // ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown. type ControlStreamHandler interface { // ServeControlStream handles the control plane of the transport in the current goroutine calling this - ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error + ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error // IsStopped tells whether the method above has finished IsStopped() bool } +type TunnelConfigJSONGetter interface { + GetConfigJSON() ([]byte, error) +} + // NewControlStream returns a new instance of ControlStreamHandler func NewControlStream( observer *Observer, @@ -63,15 +67,28 @@ func (c *controlStream) ServeControlStream( ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, + tunnelConfigGetter TunnelConfigJSONGetter, ) error { rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log) - if err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.observer); err != nil { + registrationDetails, err := rpcClient.RegisterConnection(ctx, c.namedTunnelProperties, connOptions, c.connIndex, c.observer) + if err != nil { rpcClient.Close() return err } c.connectedFuse.Connected() + // if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration + if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged { + if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil { + if err := rpcClient.SendLocalConfiguration(ctx, tunnelConfig, c.observer); err != nil { + c.observer.log.Err(err).Msg("unable to send local configuration") + } + } else { + c.observer.log.Err(err).Msg("failed to obtain current configuration") + } + } + c.waitForUnregister(ctx, rpcClient) return nil } diff --git a/connection/http2.go b/connection/http2.go index 715c827c..f7ead1f0 100644 --- a/connection/http2.go +++ b/connection/http2.go @@ -117,7 +117,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch connType { case TypeControlStream: - if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil { + if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, c.orchestrator); err != nil { c.controlStreamErr = err c.log.Error().Err(err) respWriter.WriteErrorResponse() diff --git a/connection/http2_test.go b/connection/http2_test.go index 384d29fb..18e688eb 100644 --- a/connection/http2_test.go +++ b/connection/http2_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/gobwas/ws/wsutil" + "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -166,18 +167,26 @@ type mockNamedTunnelRPCClient struct { unregistered chan struct{} } +func (mc mockNamedTunnelRPCClient) SendLocalConfiguration(c context.Context, config []byte, observer *Observer) error { + return nil +} + func (mc mockNamedTunnelRPCClient) RegisterConnection( c context.Context, properties *NamedTunnelProperties, options *tunnelpogs.ConnectionOptions, connIndex uint8, observer *Observer, -) error { +) (*tunnelpogs.ConnectionDetails, error) { if mc.shouldFail != nil { - return mc.shouldFail + return nil, mc.shouldFail } close(mc.registered) - return nil + return &tunnelpogs.ConnectionDetails{ + Location: "LIS", + UUID: uuid.New(), + TunnelIsRemotelyManaged: false, + }, nil } func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) { @@ -477,7 +486,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) { select { case <-rpcClientFactory.registered: - break //ok + break // ok case <-time.Tick(time.Second): t.Fatal("timeout out waiting for registration") } @@ -487,7 +496,7 @@ func TestGracefulShutdownHTTP2(t *testing.T) { select { case <-rpcClientFactory.unregistered: - break //ok + break // ok case <-time.Tick(time.Second): t.Fatal("timeout out waiting for unregistered signal") } diff --git a/connection/metrics.go b/connection/metrics.go index b52fda0e..a171f1a4 100644 --- a/connection/metrics.go +++ b/connection/metrics.go @@ -13,6 +13,7 @@ const ( MetricsNamespace = "cloudflared" TunnelSubsystem = "tunnel" muxerSubsystem = "muxer" + configSubsystem = "config" ) type muxerMetrics struct { @@ -36,6 +37,11 @@ type muxerMetrics struct { compRateAve *prometheus.GaugeVec } +type localConfigMetrics struct { + pushes prometheus.Counter + pushesErrors prometheus.Counter +} + type tunnelMetrics struct { timerRetries prometheus.Gauge serverLocations *prometheus.GaugeVec @@ -51,6 +57,39 @@ type tunnelMetrics struct { muxerMetrics *muxerMetrics tunnelsHA tunnelsForHA userHostnamesCounts *prometheus.CounterVec + + localConfigMetrics *localConfigMetrics +} + +func newLocalConfigMetrics() *localConfigMetrics { + + pushesMetric := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Subsystem: configSubsystem, + Name: "local_config_pushes", + Help: "Number of local configuration pushes to the edge", + }, + ) + + pushesErrorsMetric := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Subsystem: configSubsystem, + Name: "local_config_pushes_errors", + Help: "Number of errors occurred during local configuration pushes", + }, + ) + + prometheus.MustRegister( + pushesMetric, + pushesErrorsMetric, + ) + + return &localConfigMetrics{ + pushes: pushesMetric, + pushesErrors: pushesErrorsMetric, + } } func newMuxerMetrics() *muxerMetrics { @@ -386,6 +425,7 @@ func initTunnelMetrics() *tunnelMetrics { regFail: registerFail, rpcFail: rpcFail, userHostnamesCounts: userHostnamesCounts, + localConfigMetrics: newLocalConfigMetrics(), } } diff --git a/connection/quic.go b/connection/quic.go index 822925ae..2c04edf3 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -111,7 +111,7 @@ func (q *QUICConnection) Serve(ctx context.Context) error { func (q *QUICConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error { // This blocks until the control plane is done. - err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions) + err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions, q.orchestrator) if err != nil { // Not wrapping error here to be consistent with the http2 message. return err diff --git a/connection/quic_test.go b/connection/quic_test.go index 50f9863e..05fd64e2 100644 --- a/connection/quic_test.go +++ b/connection/quic_test.go @@ -163,7 +163,7 @@ type fakeControlStream struct { ControlStreamHandler } -func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error { +func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error { <-ctx.Done() return nil } diff --git a/connection/rpc.go b/connection/rpc.go index 937604b3..4565f396 100644 --- a/connection/rpc.go +++ b/connection/rpc.go @@ -58,6 +58,11 @@ type NamedTunnelRPCClient interface { options *tunnelpogs.ConnectionOptions, connIndex uint8, observer *Observer, + ) (*tunnelpogs.ConnectionDetails, error) + SendLocalConfiguration( + c context.Context, + config []byte, + observer *Observer, ) error GracefulShutdown(ctx context.Context, gracePeriod time.Duration) Close() @@ -90,7 +95,7 @@ func (rsc *registrationServerClient) RegisterConnection( options *tunnelpogs.ConnectionOptions, connIndex uint8, observer *Observer, -) error { +) (*tunnelpogs.ConnectionDetails, error) { conn, err := rsc.client.RegisterConnection( ctx, properties.Credentials.Auth(), @@ -101,10 +106,10 @@ func (rsc *registrationServerClient) RegisterConnection( if err != nil { if err.Error() == DuplicateConnectionError { observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc() - return errDuplicationConnection + return nil, errDuplicationConnection } observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc() - return serverRegistrationErrorFromRPC(err) + return nil, serverRegistrationErrorFromRPC(err) } observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc() @@ -112,7 +117,18 @@ func (rsc *registrationServerClient) RegisterConnection( observer.logServerInfo(connIndex, conn.Location, fmt.Sprintf("Connection %s registered", conn.UUID)) observer.sendConnectedEvent(connIndex, conn.Location) - return nil + return conn, nil +} + +func (rsc *registrationServerClient) SendLocalConfiguration(ctx context.Context, config []byte, observer *Observer) (err error) { + observer.metrics.localConfigMetrics.pushes.Inc() + defer func() { + if err != nil { + observer.metrics.localConfigMetrics.pushesErrors.Inc() + } + }() + + return rsc.client.SendLocalConfiguration(ctx, config) } func (rsc *registrationServerClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) { @@ -274,7 +290,7 @@ func (h *h2muxConnection) registerNamedTunnel( rpcClient := h.newRPCClientFunc(ctx, stream, h.observer.log) defer rpcClient.Close() - if err = rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, h.observer); err != nil { + if _, err = rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, h.observer); err != nil { return err } return nil diff --git a/ingress/config.go b/ingress/config.go index 0b59ecff..bc2a9f6b 100644 --- a/ingress/config.go +++ b/ingress/config.go @@ -47,18 +47,26 @@ type RemoteConfig struct { WarpRouting config.WarpRoutingConfig } -type remoteConfigJSON struct { - GlobalOriginRequest config.OriginRequestConfig `json:"originRequest"` +type RemoteConfigJSON struct { + GlobalOriginRequest *config.OriginRequestConfig `json:"originRequest,omitempty"` IngressRules []config.UnvalidatedIngressRule `json:"ingress"` WarpRouting config.WarpRoutingConfig `json:"warp-routing"` } func (rc *RemoteConfig) UnmarshalJSON(b []byte) error { - var rawConfig remoteConfigJSON + var rawConfig RemoteConfigJSON + if err := json.Unmarshal(b, &rawConfig); err != nil { return err } - ingress, err := validateIngress(rawConfig.IngressRules, originRequestFromConfig(rawConfig.GlobalOriginRequest)) + + // if nil, just assume the default values. + globalOriginRequestConfig := rawConfig.GlobalOriginRequest + if globalOriginRequestConfig == nil { + globalOriginRequestConfig = &config.OriginRequestConfig{} + } + + ingress, err := validateIngress(rawConfig.IngressRules, originRequestFromConfig(*globalOriginRequestConfig)) if err != nil { return err } @@ -387,3 +395,91 @@ func setConfig(defaults OriginRequestConfig, overrides config.OriginRequestConfi cfg.setIPRules(overrides) return cfg } + +func ConvertToRawOriginConfig(c OriginRequestConfig) config.OriginRequestConfig { + var connectTimeout *config.CustomDuration + var tlsTimeout *config.CustomDuration + var tcpKeepAlive *config.CustomDuration + var keepAliveConnections *int + var keepAliveTimeout *config.CustomDuration + var proxyAddress *string + + if c.ConnectTimeout != defaultConnectTimeout { + connectTimeout = &c.ConnectTimeout + } + if c.TLSTimeout != defaultTLSTimeout { + tlsTimeout = &c.TLSTimeout + } + if c.TCPKeepAlive != defaultTCPKeepAlive { + tcpKeepAlive = &c.TCPKeepAlive + } + if c.KeepAliveConnections != defaultKeepAliveConnections { + keepAliveConnections = &c.KeepAliveConnections + } + if c.KeepAliveTimeout != defaultKeepAliveTimeout { + keepAliveTimeout = &c.KeepAliveTimeout + } + if c.ProxyAddress != defaultProxyAddress { + proxyAddress = &c.ProxyAddress + } + + return config.OriginRequestConfig{ + ConnectTimeout: connectTimeout, + TLSTimeout: tlsTimeout, + TCPKeepAlive: tcpKeepAlive, + NoHappyEyeballs: defaultBoolToNil(c.NoHappyEyeballs), + KeepAliveConnections: keepAliveConnections, + KeepAliveTimeout: keepAliveTimeout, + HTTPHostHeader: emptyStringToNil(c.HTTPHostHeader), + OriginServerName: emptyStringToNil(c.OriginServerName), + CAPool: emptyStringToNil(c.CAPool), + NoTLSVerify: defaultBoolToNil(c.NoTLSVerify), + DisableChunkedEncoding: defaultBoolToNil(c.DisableChunkedEncoding), + BastionMode: defaultBoolToNil(c.BastionMode), + ProxyAddress: proxyAddress, + ProxyPort: zeroUIntToNil(c.ProxyPort), + ProxyType: emptyStringToNil(c.ProxyType), + IPRules: convertToRawIPRules(c.IPRules), + } +} + +func convertToRawIPRules(ipRules []ipaccess.Rule) []config.IngressIPRule { + result := make([]config.IngressIPRule, 0) + for _, r := range ipRules { + cidr := r.StringCIDR() + + newRule := config.IngressIPRule{ + Prefix: &cidr, + Ports: r.Ports(), + Allow: r.RulePolicy(), + } + + result = append(result, newRule) + } + + return result +} + +func defaultBoolToNil(b bool) *bool { + if b == false { + return nil + } + + return &b +} + +func emptyStringToNil(s string) *string { + if s == "" { + return nil + } + + return &s +} + +func zeroUIntToNil(v uint) *uint { + if v == 0 { + return nil + } + + return &v +} diff --git a/ipaccess/access.go b/ipaccess/access.go index 3136f2b5..4d41a7a8 100644 --- a/ipaccess/access.go +++ b/ipaccess/access.go @@ -99,3 +99,15 @@ func (ipr *Rule) PortsString() string { } return "all" } + +func (ipr *Rule) Ports() []int { + return ipr.ports +} + +func (ipr *Rule) RulePolicy() bool { + return ipr.allow +} + +func (ipr *Rule) StringCIDR() string { + return ipr.ipNet.String() +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 0129c49a..f8b3b1bc 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -23,7 +23,7 @@ const ( ) type orchestrator interface { - GetConfigJSON() ([]byte, error) + GetVersionedConfigJSON() ([]byte, error) } func newMetricsHandler( @@ -47,7 +47,7 @@ func newMetricsHandler( }) if orchestrator != nil { router.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { - json, err := orchestrator.GetConfigJSON() + json, err := orchestrator.GetVersionedConfigJSON() if err != nil { w.WriteHeader(500) _, _ = fmt.Fprintf(w, "ERR: %v", err) diff --git a/orchestration/config.go b/orchestration/config.go index dff7e701..cfdbd939 100644 --- a/orchestration/config.go +++ b/orchestration/config.go @@ -1,15 +1,66 @@ package orchestration import ( + "encoding/json" + + "github.com/cloudflare/cloudflared/config" "github.com/cloudflare/cloudflared/ingress" ) -type newConfig struct { +type newRemoteConfig struct { ingress.RemoteConfig // Add more fields when we support other settings in tunnel orchestration } +type newLocalConfig struct { + RemoteConfig ingress.RemoteConfig + ConfigurationFlags map[string]string `json:"__configuration_flags,omitempty"` +} + +// Config is the original config as read and parsed by cloudflared. type Config struct { Ingress *ingress.Ingress WarpRoutingEnabled bool + + // Extra settings used to configure this instance but that are not eligible for remotely management + // ie. (--protocol, --loglevel, ...) + ConfigurationFlags map[string]string +} + +func (rc *newLocalConfig) MarshalJSON() ([]byte, error) { + var r = struct { + ConfigurationFlags map[string]string `json:"__configuration_flags,omitempty"` + ingress.RemoteConfigJSON + }{ + ConfigurationFlags: rc.ConfigurationFlags, + RemoteConfigJSON: ingress.RemoteConfigJSON{ + // UI doesn't support top level configs, so we reconcile to individual ingress configs. + GlobalOriginRequest: nil, + IngressRules: convertToUnvalidatedIngressRules(rc.RemoteConfig.Ingress), + WarpRouting: rc.RemoteConfig.WarpRouting, + }, + } + + return json.Marshal(r) +} + +func convertToUnvalidatedIngressRules(i ingress.Ingress) []config.UnvalidatedIngressRule { + result := make([]config.UnvalidatedIngressRule, 0) + for _, rule := range i.Rules { + var path string + if rule.Path != nil { + path = rule.Path.String() + } + + newRule := config.UnvalidatedIngressRule{ + Hostname: rule.Hostname, + Path: path, + Service: rule.Service.String(), + OriginRequest: ingress.ConvertToRawOriginConfig(rule.Config), + } + + result = append(result, newRule) + } + + return result } diff --git a/orchestration/config_test.go b/orchestration/config_test.go new file mode 100644 index 00000000..53d9a23f --- /dev/null +++ b/orchestration/config_test.go @@ -0,0 +1,82 @@ +package orchestration + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/cloudflare/cloudflared/ingress" +) + +// TestNewLocalConfig_MarshalJSON tests that we are able to converte a compiled and validated config back +// into an "unvalidated" format which is compatible with Remote Managed configurations. +func TestNewLocalConfig_MarshalJSON(t *testing.T) { + + rawConfig := []byte(` + { + "originRequest": { + "connectTimeout": 160, + "httpHostHeader": "default" + }, + "ingress": [ + { + "hostname": "tun.example.com", + "service": "https://localhost:8000" + }, + { + "hostname": "*", + "service": "https://localhost:8001", + "originRequest": { + "connectTimeout": 121, + "tlsTimeout": 2, + "noHappyEyeballs": false, + "tcpKeepAlive": 2, + "keepAliveConnections": 2, + "keepAliveTimeout": 2, + "httpHostHeader": "def", + "originServerName": "b2", + "caPool": "/tmp/path1", + "noTLSVerify": false, + "disableChunkedEncoding": false, + "bastionMode": false, + "proxyAddress": "interface", + "proxyPort": 200, + "proxyType": "", + "ipRules": [ + { + "prefix": "10.0.0.0/16", + "ports": [3000, 3030], + "allow": false + }, + { + "prefix": "192.16.0.0/24", + "ports": [5000, 5050], + "allow": true + } + ] + } + } + ] + } + `) + + var expectedConfig ingress.RemoteConfig + err := json.Unmarshal(rawConfig, &expectedConfig) + require.NoError(t, err) + + c := &newLocalConfig{ + RemoteConfig: expectedConfig, + ConfigurationFlags: nil, + } + + jsonSerde, err := json.Marshal(c) + require.NoError(t, err) + + var config ingress.RemoteConfig + err = json.Unmarshal(jsonSerde, &config) + require.NoError(t, err) + + require.Equal(t, config.WarpRouting.Enabled, false) + require.Equal(t, config.Ingress.Rules, expectedConfig.Ingress.Rules) +} diff --git a/orchestration/orchestrator.go b/orchestration/orchestrator.go index 9206cc21..60cd1f10 100644 --- a/orchestration/orchestrator.go +++ b/orchestration/orchestrator.go @@ -54,7 +54,7 @@ func NewOrchestrator(ctx context.Context, config *Config, tags []tunnelpogs.Tag, return o, nil } -// Update creates a new proxy with the new ingress rules +// UpdateConfig creates a new proxy with the new ingress rules func (o *Orchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse { o.lock.Lock() defer o.lock.Unlock() @@ -63,12 +63,12 @@ func (o *Orchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.Up o.log.Debug(). Int32("current_version", o.currentVersion). Int32("received_version", version). - Msg("Current version is equal or newer than receivied version") + Msg("Current version is equal or newer than received version") return &tunnelpogs.UpdateConfigurationResponse{ LastAppliedVersion: o.currentVersion, } } - var newConf newConfig + var newConf newRemoteConfig if err := json.Unmarshal(config, &newConf); err != nil { o.log.Err(err). Int32("version", version). @@ -131,10 +131,26 @@ func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRoutingEn return nil } -// GetConfigJSON returns the current version and configuration as JSON +// GetConfigJSON returns the current json serialization of the config as the edge understands it func (o *Orchestrator) GetConfigJSON() ([]byte, error) { o.lock.RLock() defer o.lock.RUnlock() + + c := &newLocalConfig{ + RemoteConfig: ingress.RemoteConfig{ + Ingress: *o.config.Ingress, + WarpRouting: config.WarpRoutingConfig{Enabled: o.config.WarpRoutingEnabled}, + }, + ConfigurationFlags: o.config.ConfigurationFlags, + } + + return json.Marshal(c) +} + +// GetVersionedConfigJSON returns the current version and configuration as JSON +func (o *Orchestrator) GetVersionedConfigJSON() ([]byte, error) { + o.lock.RLock() + defer o.lock.RUnlock() var currentConfiguration = struct { Version int32 `json:"version"` Config struct { diff --git a/orchestration/orchestrator_test.go b/orchestration/orchestrator_test.go index c543d3b6..85f0f83a 100644 --- a/orchestration/orchestrator_test.go +++ b/orchestration/orchestrator_test.go @@ -2,6 +2,7 @@ package orchestration import ( "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -641,6 +642,19 @@ func TestPersistentConnection(t *testing.T) { wg.Wait() } +func TestSerializeLocalConfig(t *testing.T) { + c := &newLocalConfig{ + RemoteConfig: ingress.RemoteConfig{ + Ingress: ingress.Ingress{}, + WarpRouting: config.WarpRoutingConfig{}, + }, + ConfigurationFlags: map[string]string{"a": "b"}, + } + + result, _ := json.Marshal(c) + fmt.Println(string(result)) +} + func wsEcho(w http.ResponseWriter, r *http.Request) { upgrader := gows.Upgrader{} diff --git a/tunnelrpc/pogs/connectionrpc.go b/tunnelrpc/pogs/connectionrpc.go index d713edc6..e736cd07 100644 --- a/tunnelrpc/pogs/connectionrpc.go +++ b/tunnelrpc/pogs/connectionrpc.go @@ -175,6 +175,24 @@ func (c RegistrationServer_PogsClient) RegisterConnection(ctx context.Context, a return nil, newRPCError("unknown result which %d", result.Which()) } +func (c RegistrationServer_PogsClient) SendLocalConfiguration(ctx context.Context, config []byte) error { + client := tunnelrpc.TunnelServer{Client: c.Client} + promise := client.UpdateLocalConfiguration(ctx, func(p tunnelrpc.RegistrationServer_updateLocalConfiguration_Params) error { + if err := p.SetConfig(config); err != nil { + return err + } + + return nil + }) + + _, err := promise.Struct() + if err != nil { + return wrapRPCError(err) + } + + return nil +} + func (c RegistrationServer_PogsClient) UnregisterConnection(ctx context.Context) error { client := tunnelrpc.TunnelServer{Client: c.Client} promise := client.UnregisterConnection(ctx, func(p tunnelrpc.RegistrationServer_unregisterConnection_Params) error {