From e921ab35d56c5e5c93036c487a38d9e11c083d42 Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Tue, 7 Jun 2022 12:32:29 -0700 Subject: [PATCH] TUN-6010: Add component tests for --edge-ip-version (cherry picked from commit 978e01f77eedf4f098f71423b42e639d12355015) --- component-tests/cli.py | 95 ++++++++++++++ component-tests/test_edge_discovery.py | 165 +++++++++++++++++++++++++ component-tests/test_service.py | 8 +- component-tests/util.py | 18 +++ 4 files changed, 279 insertions(+), 7 deletions(-) create mode 100644 component-tests/cli.py create mode 100644 component-tests/test_edge_discovery.py diff --git a/component-tests/cli.py b/component-tests/cli.py new file mode 100644 index 00000000..ae4a1b12 --- /dev/null +++ b/component-tests/cli.py @@ -0,0 +1,95 @@ +import json +import subprocess +from time import sleep + +from setup import get_config_from_file + +SINGLE_CASE_TIMEOUT = 600 + +class CloudflaredCli: + def __init__(self, config, config_path, logger): + self.basecmd = [config.cloudflared_binary, "tunnel"] + if config_path is not None: + self.basecmd += ["--config", str(config_path)] + origincert = get_config_from_file()["origincert"] + if origincert: + self.basecmd += ["--origincert", origincert] + self.logger = logger + + def _run_command(self, subcmd, subcmd_name, needs_to_pass=True): + cmd = self.basecmd + subcmd + # timeout limits the time a subprocess can run. This is useful to guard against running a tunnel when + # command/args are in wrong order. + result = run_subprocess(cmd, subcmd_name, self.logger, check=needs_to_pass, capture_output=True, timeout=15) + return result + + def list_tunnels(self): + cmd_args = ["list", "--output", "json"] + listed = self._run_command(cmd_args, "list") + return json.loads(listed.stdout) + + def get_tunnel_info(self, tunnel_id): + info = self._run_command(["info", "--output", "json", tunnel_id], "info") + return json.loads(info.stdout) + + def __enter__(self): + self.basecmd += ["run"] + self.process = subprocess.Popen(self.basecmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.logger.info(f"Run cmd {self.basecmd}") + return self.process + + def __exit__(self, exc_type, exc_value, exc_traceback): + terminate_gracefully(self.process, self.logger, self.basecmd) + self.logger.debug(f"{self.basecmd} logs: {self.process.stderr.read()}") + + +def terminate_gracefully(process, logger, cmd): + process.terminate() + process_terminated = wait_for_terminate(process) + if not process_terminated: + process.kill() + logger.warning(f"{cmd}: cloudflared did not terminate within wait period. Killing process. logs: \ + stdout: {process.stdout.read()}, stderr: {process.stderr.read()}") + + +def wait_for_terminate(opened_subprocess, attempts=10, poll_interval=1): + """ + wait_for_terminate polls the opened_subprocess every x seconds for a given number of attempts. + It returns true if the subprocess was terminated and false if it didn't. + """ + for _ in range(attempts): + if _is_process_stopped(opened_subprocess): + return True + sleep(poll_interval) + return False + + +def _is_process_stopped(process): + return process.poll() is not None + + +def cert_path(): + return get_config_from_file()["origincert"] + + +class SubprocessError(Exception): + def __init__(self, program, exit_code, cause): + self.program = program + self.exit_code = exit_code + self.cause = cause + + +def run_subprocess(cmd, cmd_name, logger, timeout=SINGLE_CASE_TIMEOUT, **kargs): + kargs["timeout"] = timeout + try: + result = subprocess.run(cmd, **kargs) + logger.debug(f"{cmd} log: {result.stdout}", extra={"cmd": cmd_name}) + return result + except subprocess.CalledProcessError as e: + err = f"{cmd} return exit code {e.returncode}, stderr" + e.stderr.decode("utf-8") + logger.error(err, extra={"cmd": cmd_name, "return_code": e.returncode}) + raise SubprocessError(cmd[0], e.returncode, e) + except subprocess.TimeoutExpired as e: + err = f"{cmd} timeout after {e.timeout} seconds, stdout: {e.stdout}, stderr: {e.stderr}" + logger.error(err, extra={"cmd": cmd_name, "return_code": "timeout"}) + raise e \ No newline at end of file diff --git a/component-tests/test_edge_discovery.py b/component-tests/test_edge_discovery.py new file mode 100644 index 00000000..61e036a3 --- /dev/null +++ b/component-tests/test_edge_discovery.py @@ -0,0 +1,165 @@ +import ipaddress +import socket + +import pytest + +from constants import protocols +from cli import CloudflaredCli +from util import get_tunnel_connector_id, LOGGER, wait_tunnel_ready, write_config + + +class TestEdgeDiscovery: + def _extra_config(self, protocol, edge_ip_version): + config = { + "protocol": protocol, + } + if edge_ip_version: + config["edge-ip-version"] = edge_ip_version + return config + + @pytest.mark.parametrize("protocol", protocols()) + def test_default_only(self, tmp_path, component_tests_config, protocol): + """ + This test runs a tunnel to connect via IPv4-only edge addresses (default is unset "--edge-ip-version 4") + """ + if self.has_ipv6_only(): + pytest.skip("Host has IPv6 only support and current default is IPv4 only") + self.expect_address_connections( + tmp_path, component_tests_config, protocol, None, self.expect_ipv4_address) + + @pytest.mark.parametrize("protocol", protocols()) + def test_ipv4_only(self, tmp_path, component_tests_config, protocol): + """ + This test runs a tunnel to connect via IPv4-only edge addresses + """ + if self.has_ipv6_only(): + pytest.skip("Host has IPv6 only support") + self.expect_address_connections( + tmp_path, component_tests_config, protocol, "4", self.expect_ipv4_address) + + @pytest.mark.parametrize("protocol", protocols()) + def test_ipv6_only(self, tmp_path, component_tests_config, protocol): + """ + This test runs a tunnel to connect via IPv6-only edge addresses + """ + if self.has_ipv4_only(): + pytest.skip("Host has IPv4 only support") + self.expect_address_connections( + tmp_path, component_tests_config, protocol, "6", self.expect_ipv6_address) + + @pytest.mark.parametrize("protocol", protocols()) + def test_auto_ip64(self, tmp_path, component_tests_config, protocol): + """ + This test runs a tunnel to connect via auto with a preference of IPv6 then IPv4 addresses for a dual stack host + + This test also assumes that the host has IPv6 preference. + """ + if not self.has_dual_stack(address_family_preference=socket.AddressFamily.AF_INET6): + pytest.skip("Host does not support dual stack with IPv6 preference") + self.expect_address_connections( + tmp_path, component_tests_config, protocol, "auto", self.expect_ipv6_address) + + @pytest.mark.parametrize("protocol", protocols()) + def test_auto_ip46(self, tmp_path, component_tests_config, protocol): + """ + This test runs a tunnel to connect via auto with a preference of IPv4 then IPv6 addresses for a dual stack host + + This test also assumes that the host has IPv4 preference. + """ + if not self.has_dual_stack(address_family_preference=socket.AddressFamily.AF_INET): + pytest.skip("Host does not support dual stack with IPv4 preference") + self.expect_address_connections( + tmp_path, component_tests_config, protocol, "auto", self.expect_ipv4_address) + + def expect_address_connections(self, tmp_path, component_tests_config, protocol, edge_ip_version, assert_address_type): + config = component_tests_config( + self._extra_config(protocol, edge_ip_version)) + config_path = write_config(tmp_path, config.full_config) + LOGGER.debug(config) + with CloudflaredCli(config, config_path, LOGGER): + wait_tunnel_ready(tunnel_url=config.get_url(), + require_min_connections=4) + cfd_cli = CloudflaredCli(config, config_path, LOGGER) + tunnel_id = config.get_tunnel_id() + info = cfd_cli.get_tunnel_info(tunnel_id) + connector_id = get_tunnel_connector_id() + connector = next( + (c for c in info["conns"] if c["id"] == connector_id), None) + assert connector, f"Expected connection info from get tunnel info for the connected instance: {info}" + conns = connector["conns"] + assert conns == None or len( + conns) == 4, f"There should be 4 connections registered: {conns}" + for conn in conns: + origin_ip = conn["origin_ip"] + assert origin_ip, f"No available origin_ip for this connection: {conn}" + assert_address_type(origin_ip) + + def expect_ipv4_address(self, address): + assert type(ipaddress.ip_address( + address)) is ipaddress.IPv4Address, f"Expected connection from origin to be a valid IPv4 address: {address}" + + def expect_ipv6_address(self, address): + assert type(ipaddress.ip_address( + address)) is ipaddress.IPv6Address, f"Expected connection from origin to be a valid IPv6 address: {address}" + + def get_addresses(self): + """ + Returns a list of addresses for the host. + """ + host_addresses = socket.getaddrinfo( + "region1.v2.argotunnel.com", 7844, socket.AF_UNSPEC, socket.SOCK_STREAM) + assert len( + host_addresses) > 0, "No addresses returned from getaddrinfo" + return host_addresses + + def has_dual_stack(self, address_family_preference=None): + """ + Returns true if the host has dual stack support and can optionally check + the provided IP family preference. + """ + dual_stack = not self.has_ipv6_only() and not self.has_ipv4_only() + if address_family_preference: + address = self.get_addresses()[0] + return dual_stack and address[0] == address_family_preference + + return dual_stack + + def has_ipv6_only(self): + """ + Returns True if the host has only IPv6 address support. + """ + return self.attempt_connection(socket.AddressFamily.AF_INET6) and not self.attempt_connection(socket.AddressFamily.AF_INET) + + def has_ipv4_only(self): + """ + Returns True if the host has only IPv4 address support. + """ + return self.attempt_connection(socket.AddressFamily.AF_INET) and not self.attempt_connection(socket.AddressFamily.AF_INET6) + + def attempt_connection(self, address_family): + """ + Returns True if a successful socket connection can be made to the + remote host with the provided address family to validate host support + for the provided address family. + """ + address = None + for a in self.get_addresses(): + if a[0] == address_family: + address = a + break + if address is None: + # Couldn't even lookup the address family so we can't connect + return False + af, socktype, proto, canonname, sockaddr = address + s = None + try: + s = socket.socket(af, socktype, proto) + except OSError: + return False + try: + s.connect(sockaddr) + except OSError: + s.close() + return False + s.close() + return True diff --git a/component-tests/test_service.py b/component-tests/test_service.py index f8ace641..e1c155e6 100644 --- a/component-tests/test_service.py +++ b/component-tests/test_service.py @@ -1,7 +1,6 @@ #!/usr/bin/env python import os import pathlib -import platform import subprocess from contextlib import contextmanager from pathlib import Path @@ -10,12 +9,7 @@ import pytest import test_logging from conftest import CfdModes -from util import start_cloudflared, wait_tunnel_ready, write_config - - -def select_platform(plat): - return pytest.mark.skipif( - platform.system() != plat, reason=f"Only runs on {plat}") +from util import select_platform, start_cloudflared, wait_tunnel_ready, write_config def default_config_dir(): diff --git a/component-tests/util.py b/component-tests/util.py index db8f925d..aae9b85f 100644 --- a/component-tests/util.py +++ b/component-tests/util.py @@ -1,9 +1,12 @@ import logging import os +import platform import subprocess from contextlib import contextmanager from time import sleep +import pytest + import requests import yaml from retrying import retry @@ -12,6 +15,10 @@ from constants import METRICS_PORT, MAX_RETRIES, BACKOFF_SECS LOGGER = logging.getLogger(__name__) +def select_platform(plat): + return pytest.mark.skipif( + platform.system() != plat, reason=f"Only runs on {plat}") + def write_config(directory, config): config_path = directory / "config.yml" @@ -111,6 +118,17 @@ def check_tunnel_not_connected(): LOGGER.warning(f"Failed to connect to {url}, error: {e}") +def get_tunnel_connector_id(): + url = f'http://localhost:{METRICS_PORT}/ready' + + try: + resp = requests.get(url, timeout=1) + return resp.json()["connectorId"] + # cloudflared might already terminated + except requests.exceptions.ConnectionError as e: + LOGGER.warning(f"Failed to connect to {url}, error: {e}") + + # In some cases we don't need to check response status, such as when sending batch requests to generate logs def send_requests(url, count, require_ok=True): errors = 0