2022-02-11 10:49:06 +00:00
|
|
|
package orchestration
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
|
2022-03-14 17:51:10 +00:00
|
|
|
"github.com/cloudflare/cloudflared/config"
|
2022-02-11 10:49:06 +00:00
|
|
|
"github.com/cloudflare/cloudflared/connection"
|
|
|
|
"github.com/cloudflare/cloudflared/ingress"
|
|
|
|
"github.com/cloudflare/cloudflared/proxy"
|
|
|
|
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
|
|
|
)
|
|
|
|
|
2024-02-12 18:58:55 +00:00
|
|
|
// Orchestrator manages configurations, so they can be updatable during runtime
|
2022-02-11 10:49:06 +00:00
|
|
|
// properties are static, so it can be read without lock
|
|
|
|
// currentVersion and config are read/write infrequently, so their access are synchronized with RWMutex
|
2024-02-12 18:58:55 +00:00
|
|
|
// access to proxy is synchronized with atomic.Value, because it uses copy-on-write to provide scalable frequently
|
2022-02-11 10:49:06 +00:00
|
|
|
// read when update is infrequent
|
|
|
|
type Orchestrator struct {
|
|
|
|
currentVersion int32
|
|
|
|
// Used by UpdateConfig to make sure one update at a time
|
|
|
|
lock sync.RWMutex
|
|
|
|
// Underlying value is proxy.Proxy, can be read without the lock, but still needs the lock to update
|
2022-09-30 09:43:39 +00:00
|
|
|
proxy atomic.Value
|
2023-06-09 17:17:04 +00:00
|
|
|
// Set of internal ingress rules defined at cloudflared startup (separate from user-defined ingress rules)
|
2023-09-08 17:05:13 +00:00
|
|
|
internalRules []ingress.Rule
|
2024-02-12 18:58:55 +00:00
|
|
|
// cloudflared Configuration
|
|
|
|
config *Config
|
|
|
|
tags []tunnelpogs.Tag
|
|
|
|
log *zerolog.Logger
|
2022-02-11 10:49:06 +00:00
|
|
|
|
|
|
|
// orchestrator must not handle any more updates after shutdownC is closed
|
|
|
|
shutdownC <-chan struct{}
|
|
|
|
// Closing proxyShutdownC will close the previous proxy
|
|
|
|
proxyShutdownC chan<- struct{}
|
|
|
|
}
|
|
|
|
|
2024-02-12 18:58:55 +00:00
|
|
|
func NewOrchestrator(ctx context.Context,
|
|
|
|
config *Config,
|
|
|
|
tags []tunnelpogs.Tag,
|
|
|
|
internalRules []ingress.Rule,
|
|
|
|
log *zerolog.Logger) (*Orchestrator, error) {
|
2022-02-11 10:49:06 +00:00
|
|
|
o := &Orchestrator{
|
|
|
|
// Lowest possible version, any remote configuration will have version higher than this
|
2023-05-05 19:41:11 +00:00
|
|
|
// Starting at -1 allows a configuration migration (local to remote) to override the current configuration as it
|
|
|
|
// will start at version 0.
|
|
|
|
currentVersion: -1,
|
2023-06-09 17:17:04 +00:00
|
|
|
internalRules: internalRules,
|
2022-02-11 10:49:06 +00:00
|
|
|
config: config,
|
|
|
|
tags: tags,
|
|
|
|
log: log,
|
|
|
|
shutdownC: ctx.Done(),
|
|
|
|
}
|
2022-06-13 16:44:27 +00:00
|
|
|
if err := o.updateIngress(*config.Ingress, config.WarpRouting); err != nil {
|
2022-02-11 10:49:06 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
go o.waitToCloseLastProxy()
|
|
|
|
return o, nil
|
|
|
|
}
|
|
|
|
|
2022-04-27 10:51:06 +00:00
|
|
|
// UpdateConfig creates a new proxy with the new ingress rules
|
2022-02-11 10:49:06 +00:00
|
|
|
func (o *Orchestrator) UpdateConfig(version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
|
|
|
|
o.lock.Lock()
|
|
|
|
defer o.lock.Unlock()
|
|
|
|
|
|
|
|
if o.currentVersion >= version {
|
|
|
|
o.log.Debug().
|
|
|
|
Int32("current_version", o.currentVersion).
|
|
|
|
Int32("received_version", version).
|
2022-04-27 10:51:06 +00:00
|
|
|
Msg("Current version is equal or newer than received version")
|
2022-02-11 10:49:06 +00:00
|
|
|
return &tunnelpogs.UpdateConfigurationResponse{
|
|
|
|
LastAppliedVersion: o.currentVersion,
|
|
|
|
}
|
|
|
|
}
|
2022-04-27 10:51:06 +00:00
|
|
|
var newConf newRemoteConfig
|
2022-02-11 10:49:06 +00:00
|
|
|
if err := json.Unmarshal(config, &newConf); err != nil {
|
|
|
|
o.log.Err(err).
|
|
|
|
Int32("version", version).
|
|
|
|
Str("config", string(config)).
|
|
|
|
Msgf("Failed to deserialize new configuration")
|
|
|
|
return &tunnelpogs.UpdateConfigurationResponse{
|
|
|
|
LastAppliedVersion: o.currentVersion,
|
|
|
|
Err: err,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-13 16:44:27 +00:00
|
|
|
if err := o.updateIngress(newConf.Ingress, newConf.WarpRouting); err != nil {
|
2022-02-11 10:49:06 +00:00
|
|
|
o.log.Err(err).
|
|
|
|
Int32("version", version).
|
|
|
|
Str("config", string(config)).
|
|
|
|
Msgf("Failed to update ingress")
|
|
|
|
return &tunnelpogs.UpdateConfigurationResponse{
|
|
|
|
LastAppliedVersion: o.currentVersion,
|
|
|
|
Err: err,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
o.currentVersion = version
|
|
|
|
|
|
|
|
o.log.Info().
|
|
|
|
Int32("version", version).
|
|
|
|
Str("config", string(config)).
|
|
|
|
Msg("Updated to new configuration")
|
2022-03-09 23:16:54 +00:00
|
|
|
configVersion.Set(float64(version))
|
2022-02-11 10:49:06 +00:00
|
|
|
return &tunnelpogs.UpdateConfigurationResponse{
|
|
|
|
LastAppliedVersion: o.currentVersion,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The caller is responsible to make sure there is no concurrent access
|
2022-06-13 16:44:27 +00:00
|
|
|
func (o *Orchestrator) updateIngress(ingressRules ingress.Ingress, warpRouting ingress.WarpRoutingConfig) error {
|
2022-02-11 10:49:06 +00:00
|
|
|
select {
|
|
|
|
case <-o.shutdownC:
|
|
|
|
return fmt.Errorf("cloudflared already shutdown")
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2023-06-09 17:17:04 +00:00
|
|
|
// Assign the internal ingress rules to the parsed ingress
|
|
|
|
ingressRules.InternalRules = o.internalRules
|
|
|
|
|
|
|
|
// Check if ingress rules are empty, and add the default route if so.
|
|
|
|
if ingressRules.IsEmpty() {
|
|
|
|
ingressRules.Rules = ingress.GetDefaultIngressRules(o.log)
|
|
|
|
}
|
2023-03-21 18:42:25 +00:00
|
|
|
|
2022-02-11 10:49:06 +00:00
|
|
|
// Start new proxy before closing the ones from last version.
|
|
|
|
// The upside is we don't need to restart proxy from last version, which can fail
|
|
|
|
// The downside is new version might have ingress rule that require previous version to be shutdown first
|
|
|
|
// The downside is minimized because none of the ingress.OriginService implementation have that requirement
|
|
|
|
proxyShutdownC := make(chan struct{})
|
|
|
|
if err := ingressRules.StartOrigins(o.log, proxyShutdownC); err != nil {
|
|
|
|
return errors.Wrap(err, "failed to start origin")
|
|
|
|
}
|
2024-02-12 18:58:55 +00:00
|
|
|
proxy := proxy.NewOriginProxy(ingressRules, warpRouting, o.tags, o.config.WriteTimeout, o.log)
|
2023-03-21 18:42:25 +00:00
|
|
|
o.proxy.Store(proxy)
|
2022-02-11 10:49:06 +00:00
|
|
|
o.config.Ingress = &ingressRules
|
2022-06-13 16:44:27 +00:00
|
|
|
o.config.WarpRouting = warpRouting
|
2022-02-11 10:49:06 +00:00
|
|
|
|
|
|
|
// If proxyShutdownC is nil, there is no previous running proxy
|
|
|
|
if o.proxyShutdownC != nil {
|
|
|
|
close(o.proxyShutdownC)
|
|
|
|
}
|
|
|
|
o.proxyShutdownC = proxyShutdownC
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-04-27 10:51:06 +00:00
|
|
|
// GetConfigJSON returns the current json serialization of the config as the edge understands it
|
2022-03-14 17:51:10 +00:00
|
|
|
func (o *Orchestrator) GetConfigJSON() ([]byte, error) {
|
|
|
|
o.lock.RLock()
|
|
|
|
defer o.lock.RUnlock()
|
2022-04-27 10:51:06 +00:00
|
|
|
|
|
|
|
c := &newLocalConfig{
|
|
|
|
RemoteConfig: ingress.RemoteConfig{
|
|
|
|
Ingress: *o.config.Ingress,
|
2022-06-13 16:44:27 +00:00
|
|
|
WarpRouting: o.config.WarpRouting,
|
2022-04-27 10:51:06 +00:00
|
|
|
},
|
|
|
|
ConfigurationFlags: o.config.ConfigurationFlags,
|
|
|
|
}
|
|
|
|
|
|
|
|
return json.Marshal(c)
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetVersionedConfigJSON returns the current version and configuration as JSON
|
|
|
|
func (o *Orchestrator) GetVersionedConfigJSON() ([]byte, error) {
|
|
|
|
o.lock.RLock()
|
|
|
|
defer o.lock.RUnlock()
|
2022-03-14 17:51:10 +00:00
|
|
|
var currentConfiguration = struct {
|
|
|
|
Version int32 `json:"version"`
|
|
|
|
Config struct {
|
|
|
|
Ingress []ingress.Rule `json:"ingress"`
|
|
|
|
WarpRouting config.WarpRoutingConfig `json:"warp-routing"`
|
|
|
|
OriginRequest ingress.OriginRequestConfig `json:"originRequest"`
|
|
|
|
} `json:"config"`
|
|
|
|
}{
|
|
|
|
Version: o.currentVersion,
|
|
|
|
Config: struct {
|
|
|
|
Ingress []ingress.Rule `json:"ingress"`
|
|
|
|
WarpRouting config.WarpRoutingConfig `json:"warp-routing"`
|
|
|
|
OriginRequest ingress.OriginRequestConfig `json:"originRequest"`
|
|
|
|
}{
|
|
|
|
Ingress: o.config.Ingress.Rules,
|
2022-06-13 16:44:27 +00:00
|
|
|
WarpRouting: o.config.WarpRouting.RawConfig(),
|
2022-03-14 17:51:10 +00:00
|
|
|
OriginRequest: o.config.Ingress.Defaults,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return json.Marshal(currentConfiguration)
|
|
|
|
}
|
|
|
|
|
2022-02-11 10:49:06 +00:00
|
|
|
// GetOriginProxy returns an interface to proxy to origin. It satisfies connection.ConfigManager interface
|
|
|
|
func (o *Orchestrator) GetOriginProxy() (connection.OriginProxy, error) {
|
|
|
|
val := o.proxy.Load()
|
|
|
|
if val == nil {
|
|
|
|
err := fmt.Errorf("origin proxy not configured")
|
|
|
|
o.log.Error().Msg(err.Error())
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-03-21 18:42:25 +00:00
|
|
|
proxy, ok := val.(connection.OriginProxy)
|
2022-02-11 10:49:06 +00:00
|
|
|
if !ok {
|
|
|
|
err := fmt.Errorf("origin proxy has unexpected value %+v", val)
|
|
|
|
o.log.Error().Msg(err.Error())
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return proxy, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *Orchestrator) waitToCloseLastProxy() {
|
|
|
|
<-o.shutdownC
|
|
|
|
o.lock.Lock()
|
|
|
|
defer o.lock.Unlock()
|
|
|
|
|
|
|
|
if o.proxyShutdownC != nil {
|
|
|
|
close(o.proxyShutdownC)
|
|
|
|
o.proxyShutdownC = nil
|
|
|
|
}
|
|
|
|
}
|