Merge branch 'cloudflare:master' into master

This commit is contained in:
Jauder Ho 2022-03-06 17:17:57 -08:00 committed by GitHub
commit 5837858ed8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 322 additions and 119 deletions

View File

@ -1,3 +1,12 @@
2022.3.0
- 2022-03-02 TUN-5680: Adapt component tests for new service install based on token
- 2022-02-21 TUN-5682: Remove name field from credentials
- 2022-02-21 TUN-5681: Add support for running tunnel using Token
- 2022-02-28 TUN-5824: Update updater no-update-in-shell link
- 2022-02-28 TUN-5823: Warn about legacy flags that are ignored when ingress rules are used
- 2022-02-28 TUN-5737: Support https protocol over unix socket origin
- 2022-02-23 TUN-5679: Add support for service install using Tunnel Token
2022.2.2
- 2022-02-22 TUN-5754: Allow ingress validate to take plaintext option
- 2022-02-17 TUN-5678: Cloudflared uses typed tunnel API

View File

@ -0,0 +1,30 @@
package main
import (
"github.com/rs/zerolog"
"github.com/urfave/cli/v2"
"github.com/cloudflare/cloudflared/cmd/cloudflared/cliutil"
"github.com/cloudflare/cloudflared/cmd/cloudflared/tunnel"
)
func buildArgsForToken(c *cli.Context, log *zerolog.Logger) ([]string, error) {
token := c.Args().First()
if _, err := tunnel.ParseToken(token); err != nil {
return nil, cliutil.UsageError("Provided tunnel token is not valid (%s).", err)
}
return []string{
"tunnel", "run", "--token", token,
}, nil
}
func getServiceExtraArgsFromCliArgs(c *cli.Context, log *zerolog.Logger) ([]string, error) {
if c.NArg() > 0 {
// currently, we only support extra args for token
return buildArgsForToken(c, log)
} else {
// empty extra args
return make([]string, 0), nil
}
}

View File

@ -6,7 +6,6 @@ package main
import (
"fmt"
"os"
"path/filepath"
"github.com/rs/zerolog"
"github.com/urfave/cli/v2"
@ -26,12 +25,6 @@ func runApp(app *cli.App, graceShutdownC chan struct{}) {
Name: "install",
Usage: "Install Cloudflare Tunnel as a system service",
Action: cliutil.ConfiguredAction(installLinuxService),
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "legacy",
Usage: "Generate service file for non-named tunnels",
},
},
},
{
Name: "uninstall",
@ -62,7 +55,7 @@ After=network.target
[Service]
TimeoutStartSec=0
Type=notify
ExecStart={{ .Path }} --config /etc/cloudflared/config.yml --no-autoupdate{{ range .ExtraArgs }} {{ . }}{{ end }}
ExecStart={{ .Path }} --no-autoupdate{{ range .ExtraArgs }} {{ . }}{{ end }}
Restart=on-failure
RestartSec=5s
@ -112,7 +105,7 @@ var sysvTemplate = ServiceTemplate{
# Description: Cloudflare Tunnel agent
### END INIT INFO
name=$(basename $(readlink -f $0))
cmd="{{.Path}} --config /etc/cloudflared/config.yml --pidfile /var/run/$name.pid --autoupdate-freq 24h0m0s{{ range .ExtraArgs }} {{ . }}{{ end }}"
cmd="{{.Path}} --pidfile /var/run/$name.pid --autoupdate-freq 24h0m0s{{ range .ExtraArgs }} {{ . }}{{ end }}"
pid_file="/var/run/$name.pid"
stdout_log="/var/log/$name.log"
stderr_log="/var/log/$name.err"
@ -191,27 +184,6 @@ func isSystemd() bool {
return false
}
func copyUserConfiguration(userConfigDir, userConfigFile, userCredentialFile string, log *zerolog.Logger) error {
srcCredentialPath := filepath.Join(userConfigDir, userCredentialFile)
destCredentialPath := filepath.Join(serviceConfigDir, serviceCredentialFile)
if srcCredentialPath != destCredentialPath {
if err := copyCredential(srcCredentialPath, destCredentialPath); err != nil {
return err
}
}
srcConfigPath := filepath.Join(userConfigDir, userConfigFile)
destConfigPath := filepath.Join(serviceConfigDir, serviceConfigFile)
if srcConfigPath != destConfigPath {
if err := copyConfig(srcConfigPath, destConfigPath); err != nil {
return err
}
log.Info().Msgf("Copied %s to %s", srcConfigPath, destConfigPath)
}
return nil
}
func installLinuxService(c *cli.Context) error {
log := logger.CreateLoggerFromContext(c, logger.EnableTerminalLog)
@ -223,52 +195,19 @@ func installLinuxService(c *cli.Context) error {
Path: etPath,
}
if err := ensureConfigDirExists(serviceConfigDir); err != nil {
var extraArgsFunc func(c *cli.Context, log *zerolog.Logger) ([]string, error)
if c.NArg() == 0 {
extraArgsFunc = buildArgsForConfig
} else {
extraArgsFunc = buildArgsForToken
}
extraArgs, err := extraArgsFunc(c, log)
if err != nil {
return err
}
if c.Bool("legacy") {
userConfigDir := filepath.Dir(c.String("config"))
userConfigFile := filepath.Base(c.String("config"))
userCredentialFile := config.DefaultCredentialFile
if err = copyUserConfiguration(userConfigDir, userConfigFile, userCredentialFile, log); err != nil {
log.Err(err).Msgf("Failed to copy user configuration. Before running the service, ensure that %s contains two files, %s and %s",
serviceConfigDir, serviceCredentialFile, serviceConfigFile)
return err
}
templateArgs.ExtraArgs = []string{
"--origincert", serviceConfigDir + "/" + serviceCredentialFile,
}
} else {
src, _, err := config.ReadConfigFile(c, log)
if err != nil {
return err
}
// can't use context because this command doesn't define "credentials-file" flag
configPresent := func(s string) bool {
val, err := src.String(s)
return err == nil && val != ""
}
if src.TunnelID == "" || !configPresent(tunnel.CredFileFlag) {
return fmt.Errorf(`Configuration file %s must contain entries for the tunnel to run and its associated credentials:
tunnel: TUNNEL-UUID
credentials-file: CREDENTIALS-FILE
`, src.Source())
}
if src.Source() != serviceConfigPath {
if exists, err := config.FileExists(serviceConfigPath); err != nil || exists {
return fmt.Errorf("Possible conflicting configuration in %[1]s and %[2]s. Either remove %[2]s or run `cloudflared --config %[2]s service install`", src.Source(), serviceConfigPath)
}
if err := copyFile(src.Source(), serviceConfigPath); err != nil {
return fmt.Errorf("failed to copy %s to %s: %w", src.Source(), serviceConfigPath, err)
}
}
templateArgs.ExtraArgs = []string{
"tunnel", "run",
}
}
templateArgs.ExtraArgs = extraArgs
switch {
case isSystemd():
@ -280,6 +219,42 @@ credentials-file: CREDENTIALS-FILE
}
}
func buildArgsForConfig(c *cli.Context, log *zerolog.Logger) ([]string, error) {
if err := ensureConfigDirExists(serviceConfigDir); err != nil {
return nil, err
}
src, _, err := config.ReadConfigFile(c, log)
if err != nil {
return nil, err
}
// can't use context because this command doesn't define "credentials-file" flag
configPresent := func(s string) bool {
val, err := src.String(s)
return err == nil && val != ""
}
if src.TunnelID == "" || !configPresent(tunnel.CredFileFlag) {
return nil, fmt.Errorf(`Configuration file %s must contain entries for the tunnel to run and its associated credentials:
tunnel: TUNNEL-UUID
credentials-file: CREDENTIALS-FILE
`, src.Source())
}
if src.Source() != serviceConfigPath {
if exists, err := config.FileExists(serviceConfigPath); err != nil || exists {
return nil, fmt.Errorf("Possible conflicting configuration in %[1]s and %[2]s. Either remove %[2]s or run `cloudflared --config %[2]s service install`", src.Source(), serviceConfigPath)
}
if err := copyFile(src.Source(), serviceConfigPath); err != nil {
return nil, fmt.Errorf("failed to copy %s to %s: %w", src.Source(), serviceConfigPath, err)
}
}
return []string{
"--config", "/etc/cloudflared/config.yml", "tunnel", "run",
}, nil
}
func installSystemd(templateArgs *ServiceTemplateArgs, log *zerolog.Logger) error {
for _, serviceTemplate := range systemdTemplates {
err := serviceTemplate.Generate(templateArgs)

View File

@ -50,6 +50,9 @@ func newLaunchdTemplate(installPath, stdoutPath, stderrPath string) *ServiceTemp
<key>ProgramArguments</key>
<array>
<string>{{ .Path }}</string>
{{- range $i, $item := .ExtraArgs}}
<string>{{ $item }}</string>
{{- end}}
</array>
<key>RunAtLoad</key>
<true/>
@ -129,6 +132,13 @@ func installLaunchd(c *cli.Context) error {
log.Err(err).Msg("Error determining install path")
return errors.Wrap(err, "Error determining install path")
}
extraArgs, err := getServiceExtraArgsFromCliArgs(c, log)
if err != nil {
errMsg := "Unable to determine extra arguments for launch daemon"
log.Err(err).Msg(errMsg)
return errors.Wrap(err, errMsg)
}
stdoutPath, err := stdoutPath()
if err != nil {
log.Err(err).Msg("error determining stdout path")
@ -140,7 +150,7 @@ func installLaunchd(c *cli.Context) error {
return errors.Wrap(err, "error determining stderr path")
}
launchdTemplate := newLaunchdTemplate(installPath, stdoutPath, stderrPath)
templateArgs := ServiceTemplateArgs{Path: etPath}
templateArgs := ServiceTemplateArgs{Path: etPath, ExtraArgs: extraArgs}
err = launchdTemplate.Generate(&templateArgs)
if err != nil {
log.Err(err).Msg("error generating launchd template")

View File

@ -724,43 +724,43 @@ func configureProxyFlags(shouldHide bool) []cli.Flag {
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: ingress.Socks5Flag,
Usage: "specify if this tunnel is running as a SOCK5 Server",
Usage: legacyTunnelFlag("specify if this tunnel is running as a SOCK5 Server"),
EnvVars: []string{"TUNNEL_SOCKS"},
Value: false,
Hidden: shouldHide,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: ingress.ProxyConnectTimeoutFlag,
Usage: "HTTP proxy timeout for establishing a new connection",
Usage: legacyTunnelFlag("HTTP proxy timeout for establishing a new connection"),
Value: time.Second * 30,
Hidden: shouldHide,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: ingress.ProxyTLSTimeoutFlag,
Usage: "HTTP proxy timeout for completing a TLS handshake",
Usage: legacyTunnelFlag("HTTP proxy timeout for completing a TLS handshake"),
Value: time.Second * 10,
Hidden: shouldHide,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: ingress.ProxyTCPKeepAliveFlag,
Usage: "HTTP proxy TCP keepalive duration",
Usage: legacyTunnelFlag("HTTP proxy TCP keepalive duration"),
Value: time.Second * 30,
Hidden: shouldHide,
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: ingress.ProxyNoHappyEyeballsFlag,
Usage: "HTTP proxy should disable \"happy eyeballs\" for IPv4/v6 fallback",
Usage: legacyTunnelFlag("HTTP proxy should disable \"happy eyeballs\" for IPv4/v6 fallback"),
Hidden: shouldHide,
}),
altsrc.NewIntFlag(&cli.IntFlag{
Name: ingress.ProxyKeepAliveConnectionsFlag,
Usage: "HTTP proxy maximum keepalive connection pool size",
Usage: legacyTunnelFlag("HTTP proxy maximum keepalive connection pool size"),
Value: 100,
Hidden: shouldHide,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: ingress.ProxyKeepAliveTimeoutFlag,
Usage: "HTTP proxy timeout for closing an idle connection",
Usage: legacyTunnelFlag("HTTP proxy timeout for closing an idle connection"),
Value: time.Second * 90,
Hidden: shouldHide,
}),
@ -778,13 +778,13 @@ func configureProxyFlags(shouldHide bool) []cli.Flag {
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: ingress.HTTPHostHeaderFlag,
Usage: "Sets the HTTP Host header for the local webserver.",
Usage: legacyTunnelFlag("Sets the HTTP Host header for the local webserver."),
EnvVars: []string{"TUNNEL_HTTP_HOST_HEADER"},
Hidden: shouldHide,
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: ingress.OriginServerNameFlag,
Usage: "Hostname on the origin server certificate.",
Usage: legacyTunnelFlag("Hostname on the origin server certificate."),
EnvVars: []string{"TUNNEL_ORIGIN_SERVER_NAME"},
Hidden: shouldHide,
}),
@ -796,19 +796,19 @@ func configureProxyFlags(shouldHide bool) []cli.Flag {
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: tlsconfig.OriginCAPoolFlag,
Usage: "Path to the CA for the certificate of your origin. This option should be used only if your certificate is not signed by Cloudflare.",
Usage: legacyTunnelFlag("Path to the CA for the certificate of your origin. This option should be used only if your certificate is not signed by Cloudflare."),
EnvVars: []string{"TUNNEL_ORIGIN_CA_POOL"},
Hidden: shouldHide,
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: ingress.NoTLSVerifyFlag,
Usage: "Disables TLS verification of the certificate presented by your origin. Will allow any certificate from the origin to be accepted. Note: The connection from your machine to Cloudflare's Edge is still encrypted.",
Usage: legacyTunnelFlag("Disables TLS verification of the certificate presented by your origin. Will allow any certificate from the origin to be accepted. Note: The connection from your machine to Cloudflare's Edge is still encrypted."),
EnvVars: []string{"NO_TLS_VERIFY"},
Hidden: shouldHide,
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: ingress.NoChunkedEncodingFlag,
Usage: "Disables chunked transfer encoding; useful if you are running a WSGI server.",
Usage: legacyTunnelFlag("Disables chunked transfer encoding; useful if you are running a WSGI server."),
EnvVars: []string{"TUNNEL_NO_CHUNKED_ENCODING"},
Hidden: shouldHide,
}),
@ -816,6 +816,15 @@ func configureProxyFlags(shouldHide bool) []cli.Flag {
return append(flags, sshFlags(shouldHide)...)
}
func legacyTunnelFlag(msg string) string {
return fmt.Sprintf(
"%s This flag only takes effect if you define your origin with `--url` and if you do not use ingress rules."+
" The recommended way is to rely on ingress rules and define this property under `originRequest` as per"+
" https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/configuration/configuration-file/ingress",
msg,
)
}
func sshFlags(shouldHide bool) []cli.Flag {
return []cli.Flag{
altsrc.NewStringFlag(&cli.StringFlag{

View File

@ -644,7 +644,7 @@ func runCommand(c *cli.Context) error {
// Check if token is provided and if not use default tunnelID flag method
if tokenStr := c.String(TunnelTokenFlag); tokenStr != "" {
if token, err := parseToken(tokenStr); err == nil {
if token, err := ParseToken(tokenStr); err == nil {
return sc.runWithCredentials(token.Credentials())
}
@ -663,7 +663,7 @@ func runCommand(c *cli.Context) error {
}
}
func parseToken(tokenStr string) (*connection.TunnelToken, error) {
func ParseToken(tokenStr string) (*connection.TunnelToken, error) {
content, err := base64.StdEncoding.DecodeString(tokenStr)
if err != nil {
return nil, err

View File

@ -183,7 +183,7 @@ func Test_validateHostname(t *testing.T) {
}
func Test_TunnelToken(t *testing.T) {
token, err := parseToken("aabc")
token, err := ParseToken("aabc")
require.Error(t, err)
require.Nil(t, token)
@ -198,7 +198,7 @@ func Test_TunnelToken(t *testing.T) {
token64 := base64.StdEncoding.EncodeToString(tokenJsonStr)
token, err = parseToken(token64)
token, err = ParseToken(token64)
require.NoError(t, err)
require.Equal(t, token, expectedToken)
}

View File

@ -193,8 +193,15 @@ func installWindowsService(c *cli.Context) error {
s.Close()
return fmt.Errorf("Service %s already exists", windowsServiceName)
}
extraArgs, err := getServiceExtraArgsFromCliArgs(c, &log)
if err != nil {
errMsg := "Unable to determine extra arguments for windows service"
log.Err(err).Msg(errMsg)
return errors.Wrap(err, errMsg)
}
config := mgr.Config{StartType: mgr.StartAutomatic, DisplayName: windowsServiceDescription}
s, err = m.CreateService(windowsServiceName, exepath, config)
s, err = m.CreateService(windowsServiceName, exepath, config, extraArgs...)
if err != nil {
return errors.Wrap(err, "Cannot install service")
}

View File

@ -14,7 +14,7 @@ classic_hostname: "classic-tunnel-component-tests.example.com"
origincert: "/Users/tunnel/.cloudflared/cert.pem"
ingress:
- hostname: named-tunnel-component-tests.example.com
service: http_status:200
service: hello_world
- service: http_status:404
```

View File

@ -1,5 +1,7 @@
#!/usr/bin/env python
import copy
import json
import base64
from dataclasses import dataclass, InitVar
@ -61,6 +63,23 @@ class NamedTunnelConfig(NamedTunnelBaseConfig):
def get_url(self):
return "https://" + self.ingress[0]['hostname']
def base_config(self):
config = self.full_config.copy()
# removes the tunnel reference
del(config["tunnel"])
del(config["credentials-file"])
return config
def get_token(self):
with open(self.credentials_file) as json_file:
creds = json.load(json_file)
token_dict = {"a": creds["AccountTag"], "t": creds["TunnelID"], "s": creds["TunnelSecret"]}
token_json_str = json.dumps(token_dict)
return base64.b64encode(token_json_str.encode('utf-8'))
@dataclass(frozen=True)
class ClassicTunnelBaseConfig(BaseConfig):

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python
import os
import pathlib
import platform
import subprocess
from contextlib import contextmanager
@ -9,7 +10,7 @@ import pytest
import test_logging
from conftest import CfdModes
from util import start_cloudflared, wait_tunnel_ready
from util import start_cloudflared, wait_tunnel_ready, write_config
def select_platform(plat):
@ -43,6 +44,21 @@ class TestServiceMode:
self.launchd_service_scenario(config, assert_log_file)
@select_platform("Darwin")
@pytest.mark.skipif(os.path.exists(default_config_file()), reason=f"There is already a config file in default path")
def test_launchd_service_with_token(self, tmp_path, component_tests_config):
log_file = tmp_path / test_logging.default_log_file
additional_config = {
"logfile": str(log_file),
}
config = component_tests_config(additional_config=additional_config)
# service install doesn't install the config file but in this case we want to use some default settings
# so we write the base config without the tunnel credentials and ID
write_config(pathlib.Path(default_config_dir()), config.base_config())
self.launchd_service_scenario(config, use_token=True)
@select_platform("Darwin")
@pytest.mark.skipif(os.path.exists(default_config_file()), reason=f"There is already a config file in default path")
def test_launchd_service_rotating_log(self, tmp_path, component_tests_config):
@ -60,12 +76,13 @@ class TestServiceMode:
self.launchd_service_scenario(config, assert_rotating_log)
def launchd_service_scenario(self, config, extra_assertions):
with self.run_service(Path(default_config_dir()), config):
def launchd_service_scenario(self, config, extra_assertions=None, use_token=False):
with self.run_service(Path(default_config_dir()), config, use_token=use_token):
self.launchctl_cmd("list")
self.launchctl_cmd("start")
wait_tunnel_ready(tunnel_url=config.get_url())
extra_assertions()
if extra_assertions is not None:
extra_assertions()
self.launchctl_cmd("stop")
os.remove(default_config_file())
@ -105,12 +122,30 @@ class TestServiceMode:
self.sysv_service_scenario(config, tmp_path, assert_rotating_log)
def sysv_service_scenario(self, config, tmp_path, extra_assertions):
with self.run_service(tmp_path, config, root=True):
@select_platform("Linux")
@pytest.mark.skipif(os.path.exists("/etc/cloudflared/config.yml"),
reason=f"There is already a config file in default path")
def test_sysv_service_with_token(self, tmp_path, component_tests_config):
additional_config = {
"loglevel": "debug",
}
config = component_tests_config(additional_config=additional_config)
# service install doesn't install the config file but in this case we want to use some default settings
# so we write the base config without the tunnel credentials and ID
config_path = write_config(tmp_path, config.base_config())
subprocess.run(["sudo", "cp", config_path, "/etc/cloudflared/config.yml"], check=True)
self.sysv_service_scenario(config, tmp_path, use_token=True)
def sysv_service_scenario(self, config, tmp_path, extra_assertions=None, use_token=False):
with self.run_service(tmp_path, config, root=True, use_token=use_token):
self.sysv_cmd("start")
self.sysv_cmd("status")
wait_tunnel_ready(tunnel_url=config.get_url())
extra_assertions()
if extra_assertions is not None:
extra_assertions()
self.sysv_cmd("stop")
# Service install copies config file to /etc/cloudflared/config.yml
@ -118,14 +153,19 @@ class TestServiceMode:
self.sysv_cmd("status", success=False)
@contextmanager
def run_service(self, tmp_path, config, root=False):
def run_service(self, tmp_path, config, root=False, use_token=False):
args = ["service", "install"]
if use_token:
args.append(config.get_token())
try:
service = start_cloudflared(
tmp_path, config, cfd_args=["service", "install"], cfd_pre_args=[], capture_output=False, root=root)
tmp_path, config, cfd_args=args, cfd_pre_args=[], capture_output=False, root=root, skip_config_flag=use_token)
yield service
finally:
start_cloudflared(
tmp_path, config, cfd_args=["service", "uninstall"], cfd_pre_args=[], capture_output=False, root=root)
tmp_path, config, cfd_args=["service", "uninstall"], cfd_pre_args=[], capture_output=False, root=root, skip_config_flag=use_token)
def launchctl_cmd(self, action, success=True):
cmd = subprocess.run(

View File

@ -21,9 +21,14 @@ def write_config(directory, config):
def start_cloudflared(directory, config, cfd_args=["run"], cfd_pre_args=["tunnel"], new_process=False,
allow_input=False, capture_output=True, root=False):
config_path = write_config(directory, config.full_config)
allow_input=False, capture_output=True, root=False, skip_config_flag=False):
config_path = None
if not skip_config_flag:
config_path = write_config(directory, config.full_config)
cmd = cloudflared_cmd(config, config_path, cfd_args, cfd_pre_args, root)
if new_process:
return run_cloudflared_background(cmd, allow_input, capture_output)
# By setting check=True, it will raise an exception if the process exits with non-zero exit code
@ -36,7 +41,10 @@ def cloudflared_cmd(config, config_path, cfd_args, cfd_pre_args, root):
cmd += ["sudo"]
cmd += [config.cloudflared_binary]
cmd += cfd_pre_args
cmd += ["--config", str(config_path)]
if config_path is not None:
cmd += ["--config", str(config_path)]
cmd += cfd_args
LOGGER.info(f"Run cmd {cmd} with config {config}")
return cmd

View File

@ -80,6 +80,7 @@ const (
TypeTCP
TypeControlStream
TypeHTTP
TypeConfiguration
)
// ShouldFlush returns whether this kind of connection should actively flush data

View File

@ -2,6 +2,7 @@ package connection
import (
"context"
gojson "encoding/json"
"fmt"
"io"
"net"
@ -23,6 +24,7 @@ const (
InternalTCPProxySrcHeader = "Cf-Cloudflared-Proxy-Src"
WebsocketUpgrade = "websocket"
ControlStreamUpgrade = "control-stream"
ConfigurationUpdate = "update-configuration"
)
var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
@ -100,7 +102,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
connType := determineHTTP2Type(r)
handleMissingRequestParts(connType, r)
respWriter, err := NewHTTP2RespWriter(r, w, connType)
respWriter, err := NewHTTP2RespWriter(r, w, connType, c.log)
if err != nil {
c.observer.log.Error().Msg(err.Error())
return
@ -120,6 +122,13 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
respWriter.WriteErrorResponse()
}
case TypeConfiguration:
fmt.Println("TYPE CONFIGURATION?")
if err := c.handleConfigurationUpdate(respWriter, r); err != nil {
c.log.Error().Err(err)
respWriter.WriteErrorResponse()
}
case TypeWebsocket, TypeHTTP:
stripWebsocketUpgradeHeader(r)
if err := originProxy.ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil {
@ -152,6 +161,26 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
// ConfigurationUpdateBody is the representation followed by the edge to send updates to cloudflared.
type ConfigurationUpdateBody struct {
Version int32 `json:"version"`
Config gojson.RawMessage `json:"config"`
}
func (c *HTTP2Connection) handleConfigurationUpdate(respWriter *http2RespWriter, r *http.Request) error {
var configBody ConfigurationUpdateBody
if err := json.NewDecoder(r.Body).Decode(&configBody); err != nil {
return err
}
resp := c.orchestrator.UpdateConfig(configBody.Version, configBody.Config)
bdy, err := json.Marshal(resp)
if err != nil {
return err
}
_, err = respWriter.Write(bdy)
return err
}
func (c *HTTP2Connection) close() {
// Wait for all serve HTTP handlers to return
c.activeRequestsWG.Wait()
@ -163,14 +192,16 @@ type http2RespWriter struct {
w http.ResponseWriter
flusher http.Flusher
shouldFlush bool
log *zerolog.Logger
}
func NewHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type) (*http2RespWriter, error) {
func NewHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type, log *zerolog.Logger) (*http2RespWriter, error) {
flusher, isFlusher := w.(http.Flusher)
if !isFlusher {
respWriter := &http2RespWriter{
r: r.Body,
w: w,
r: r.Body,
w: w,
log: log,
}
respWriter.WriteErrorResponse()
return nil, fmt.Errorf("%T doesn't implement http.Flusher", w)
@ -181,6 +212,7 @@ func NewHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type) (
w: w,
flusher: flusher,
shouldFlush: connType.shouldFlush(),
log: log,
}, nil
}
@ -239,7 +271,7 @@ func (rp *http2RespWriter) Write(p []byte) (n int, err error) {
// Implementer of OriginClient should make sure it doesn't write to the connection after Proxy returns
// Register a recover routine just in case.
if r := recover(); r != nil {
println(fmt.Sprintf("Recover from http2 response writer panic, error %s", debug.Stack()))
rp.log.Debug().Msgf("Recover from http2 response writer panic, error %s", debug.Stack())
}
}()
n, err = rp.w.Write(p)
@ -255,6 +287,8 @@ func (rp *http2RespWriter) Close() error {
func determineHTTP2Type(r *http.Request) Type {
switch {
case isConfigurationUpdate(r):
return TypeConfiguration
case isWebsocketUpgrade(r):
return TypeWebsocket
case IsTCPStream(r):
@ -288,6 +322,10 @@ func isWebsocketUpgrade(r *http.Request) bool {
return r.Header.Get(InternalUpgradeHeader) == WebsocketUpgrade
}
func isConfigurationUpdate(r *http.Request) bool {
return r.Header.Get(InternalUpgradeHeader) == ConfigurationUpdate
}
// IsTCPStream discerns if the connection request needs a tcp stream proxy.
func IsTCPStream(r *http.Request) bool {
return r.Header.Get(InternalTCPProxySrcHeader) != ""

View File

@ -1,6 +1,7 @@
package connection
import (
"bytes"
"context"
"errors"
"fmt"
@ -53,6 +54,41 @@ func newTestHTTP2Connection() (*HTTP2Connection, net.Conn) {
), edgeConn
}
func TestHTTP2ConfigurationSet(t *testing.T) {
http2Conn, edgeConn := newTestHTTP2Connection()
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
http2Conn.Serve(ctx)
}()
edgeHTTP2Conn, err := testTransport.NewClientConn(edgeConn)
require.NoError(t, err)
endpoint := fmt.Sprintf("http://localhost:8080/ok")
reqBody := []byte(`{
"version": 2,
"config": {"warp-routing": {"enabled": true}, "originRequest" : {"connectTimeout": 10}, "ingress" : [ {"hostname": "test", "service": "https://localhost:8000" } , {"service": "http_status:404"} ]}}
`)
reader := bytes.NewReader(reqBody)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, reader)
require.NoError(t, err)
req.Header.Set(InternalUpgradeHeader, ConfigurationUpdate)
resp, err := edgeHTTP2Conn.RoundTrip(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
bdy, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, `{"lastAppliedVersion":2,"err":null}`, string(bdy))
cancel()
wg.Wait()
}
func TestServeHTTP(t *testing.T) {
tests := []testRequest{
{

View File

@ -126,7 +126,7 @@ func parseSingleOriginService(c *cli.Context, allowURLFromArgs bool) (OriginServ
if err != nil {
return nil, errors.Wrap(err, "Error validating --unix-socket")
}
return &unixSocketPath{path: path}, nil
return &unixSocketPath{path: path, scheme: "http"}, nil
}
u, err := url.Parse("http://localhost:8080")
return &httpService{url: u}, err
@ -169,7 +169,10 @@ func validateIngress(ingress []config.UnvalidatedIngressRule, defaults OriginReq
if prefix := "unix:"; strings.HasPrefix(r.Service, prefix) {
// No validation necessary for unix socket filepath services
path := strings.TrimPrefix(r.Service, prefix)
service = &unixSocketPath{path: path}
service = &unixSocketPath{path: path, scheme: "http"}
} else if prefix := "unix+tls:"; strings.HasPrefix(r.Service, prefix) {
path := strings.TrimPrefix(r.Service, prefix)
service = &unixSocketPath{path: path, scheme: "https"}
} else if prefix := "http_status:"; strings.HasPrefix(r.Service, prefix) {
status, err := strconv.Atoi(strings.TrimPrefix(r.Service, prefix))
if err != nil {

View File

@ -26,8 +26,21 @@ ingress:
`
ing, err := ParseIngress(MustReadIngress(rawYAML))
require.NoError(t, err)
_, ok := ing.Rules[0].Service.(*unixSocketPath)
s, ok := ing.Rules[0].Service.(*unixSocketPath)
require.True(t, ok)
require.Equal(t, "http", s.scheme)
}
func TestParseUnixSocketTLS(t *testing.T) {
rawYAML := `
ingress:
- service: unix+tls:/tmp/echo.sock
`
ing, err := ParseIngress(MustReadIngress(rawYAML))
require.NoError(t, err)
s, ok := ing.Rules[0].Service.(*unixSocketPath)
require.True(t, ok)
require.Equal(t, "https", s.scheme)
}
func Test_parseIngress(t *testing.T) {

View File

@ -23,7 +23,7 @@ type StreamBasedOriginProxy interface {
}
func (o *unixSocketPath) RoundTrip(req *http.Request) (*http.Response, error) {
req.URL.Scheme = "http"
req.URL.Scheme = o.scheme
return o.transport.RoundTrip(req)
}

View File

@ -33,9 +33,10 @@ type OriginService interface {
start(log *zerolog.Logger, shutdownC <-chan struct{}, cfg OriginRequestConfig) error
}
// unixSocketPath is an OriginService representing a unix socket (which accepts HTTP)
// unixSocketPath is an OriginService representing a unix socket (which accepts HTTP or HTTPS)
type unixSocketPath struct {
path string
scheme string
transport *http.Transport
}

View File

@ -332,7 +332,8 @@ func proxyHTTP(t *testing.T, originProxy connection.OriginProxy, hostname string
require.NoError(t, err)
w := httptest.NewRecorder()
respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeHTTP)
log := zerolog.Nop()
respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeHTTP, &log)
require.NoError(t, err)
err = originProxy.ProxyHTTP(respWriter, req, false)
@ -358,7 +359,8 @@ func proxyTCP(t *testing.T, originProxy connection.OriginProxy, originAddr strin
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", originAddr), reqBody)
require.NoError(t, err)
respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeTCP)
log := zerolog.Nop()
respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeTCP, &log)
require.NoError(t, err)
tcpReq := &connection.TCPRequest{
@ -578,7 +580,8 @@ func TestPersistentConnection(t *testing.T) {
// ProxyHTTP will add Connection, Upgrade and Sec-Websocket-Version headers
req.Header.Add("Sec-WebSocket-Key", "dGhlIHNhbXBsZSBub25jZQ==")
respWriter, err := connection.NewHTTP2RespWriter(req, wsRespReadWriter, connection.TypeWebsocket)
log := zerolog.Nop()
respWriter, err := connection.NewHTTP2RespWriter(req, wsRespReadWriter, connection.TypeWebsocket, &log)
require.NoError(t, err)
err = originProxy.ProxyHTTP(respWriter, req, true)

View File

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"runtime/debug"
"sync/atomic"
"time"
@ -77,7 +78,7 @@ func unidirectionalStream(dst io.Writer, src io.Reader, dir string, status *bidi
// exited. In such case, we stop a possible panic from propagating upstream.
if r := recover(); r != nil {
// We handle such unexpected errors only when we detect that one side of the streaming is done.
log.Debug().Msgf("Handled gracefully error %v in Streaming for %s", r, dir)
log.Debug().Msgf("Gracefully handled error %v in Streaming for %s, error %s", r, dir, debug.Stack())
}
}
}()