diff --git a/component-tests/test_logging.py b/component-tests/test_logging.py index 23964220..05b9f428 100644 --- a/component-tests/test_logging.py +++ b/component-tests/test_logging.py @@ -13,7 +13,7 @@ class TestLogging: def test_logging_to_terminal(self, tmp_path, component_tests_config): config = component_tests_config() with start_cloudflared(tmp_path, config, new_process=True) as cloudflared: - wait_tunnel_ready() + wait_tunnel_ready(tunnel_url=config.get_url()) self.assert_log_to_terminal(cloudflared) def test_logging_to_file(self, tmp_path, component_tests_config): @@ -24,7 +24,7 @@ class TestLogging: } config = component_tests_config(extra_config) with start_cloudflared(tmp_path, config, new_process=True, capture_output=False): - wait_tunnel_ready() + wait_tunnel_ready(tunnel_url=config.get_url()) self.assert_log_in_file(log_file) self.assert_json_log(log_file) @@ -37,7 +37,7 @@ class TestLogging: } config = component_tests_config(extra_config) with start_cloudflared(tmp_path, config, new_process=True, capture_output=False): - wait_tunnel_ready() + wait_tunnel_ready(tunnel_url=config.get_url()) self.assert_log_to_dir(config, log_dir) def assert_log_to_terminal(self, cloudflared): diff --git a/component-tests/test_reconnect.py b/component-tests/test_reconnect.py index 27caf2a6..13c068d1 100644 --- a/component-tests/test_reconnect.py +++ b/component-tests/test_reconnect.py @@ -4,7 +4,7 @@ import copy from retrying import retry from time import sleep -from util import start_cloudflared, wait_tunnel_ready, check_tunnel_not_ready, send_requests +from util import start_cloudflared, wait_tunnel_ready, check_tunnel_not_connected, send_requests class TestReconnect(): @@ -35,16 +35,17 @@ class TestReconnect(): cloudflared.stdin.flush() def assert_reconnect(self, config, cloudflared, repeat): - wait_tunnel_ready() + wait_tunnel_ready(tunnel_url=config.get_url()) for _ in range(repeat): for i in range(self.default_ha_conns): self.send_reconnect(cloudflared, self.default_reconnect_secs) expect_connections = self.default_ha_conns-i-1 if expect_connections > 0: + # Don't check if tunnel returns 200 here because there is a race condition between wait_tunnel_ready + # retrying to get 200 response and reconnecting wait_tunnel_ready(expect_connections=expect_connections) else: - check_tunnel_not_ready() + check_tunnel_not_connected() sleep(self.default_reconnect_secs + 10) - wait_tunnel_ready() - send_requests(config.get_url(), 1) + wait_tunnel_ready(tunnel_url=config.get_url()) diff --git a/component-tests/test_termination.py b/component-tests/test_termination.py new file mode 100644 index 00000000..765735ac --- /dev/null +++ b/component-tests/test_termination.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +from contextlib import contextmanager +import requests +import signal +import threading +import time + +from util import start_cloudflared, wait_tunnel_ready, check_tunnel_not_connected, LOGGER + + +class TestTermination(): + grace_period = 5 + timeout = 10 + extra_config = { + "grace-period": f"{grace_period}s", + } + signals = [signal.SIGTERM, signal.SIGINT] + sse_endpoint = "/sse?freq=1s" + + def test_graceful_shutdown(self, tmp_path, component_tests_config): + config = component_tests_config(self.extra_config) + for sig in self.signals: + with start_cloudflared( + tmp_path, config, new_process=True, capture_output=False) as cloudflared: + wait_tunnel_ready(tunnel_url=config.get_url()) + + connected = threading.Condition() + in_flight_req = threading.Thread( + target=self.stream_request, args=(config, connected,)) + in_flight_req.start() + + with connected: + connected.wait(self.timeout) + # Send signal after the SSE connection is established + self.terminate_by_signal(cloudflared, sig) + self.wait_eyeball_thread( + in_flight_req, self.grace_period + self.timeout) + + # test cloudflared terminates before grace period expires when all eyeball + # connections are drained + def test_shutdown_once_no_connection(self, tmp_path, component_tests_config): + config = component_tests_config(self.extra_config) + for sig in self.signals: + with start_cloudflared( + tmp_path, config, new_process=True, capture_output=False) as cloudflared: + wait_tunnel_ready(tunnel_url=config.get_url()) + + connected = threading.Condition() + in_flight_req = threading.Thread( + target=self.stream_request, args=(config, connected, True, )) + in_flight_req.start() + + with self.within_grace_period(): + with connected: + connected.wait(self.timeout) + # Send signal after the SSE connection is established + self.terminate_by_signal(cloudflared, sig) + self.wait_eyeball_thread(in_flight_req, self.grace_period) + + def test_no_connection_shutdown(self, tmp_path, component_tests_config): + config = component_tests_config(self.extra_config) + for sig in self.signals: + with start_cloudflared( + tmp_path, config, new_process=True, capture_output=False) as cloudflared: + wait_tunnel_ready(tunnel_url=config.get_url()) + with self.within_grace_period(): + self.terminate_by_signal(cloudflared, sig) + + def terminate_by_signal(self, cloudflared, sig): + cloudflared.send_signal(sig) + check_tunnel_not_connected() + cloudflared.wait() + + def wait_eyeball_thread(self, thread, timeout): + thread.join(timeout) + assert thread.is_alive() == False, "eyeball thread is still alive" + + # Using this context asserts logic within the context is executed within grace period + @contextmanager + def within_grace_period(self): + try: + start = time.time() + yield + finally: + duration = time.time() - start + assert duration < self.grace_period + + def stream_request(self, config, connected, early_terminate): + expected_terminate_message = "502 Bad Gateway" + url = config.get_url() + self.sse_endpoint + + with requests.get(url, timeout=5, stream=True) as resp: + with connected: + connected.notifyAll() + lines = 0 + for line in resp.iter_lines(): + if expected_terminate_message.encode() == line: + break + lines += 1 + if early_terminate and lines == 2: + return + # /sse returns count followed by 2 new lines + assert lines >= (self.grace_period * 2) diff --git a/component-tests/util.py b/component-tests/util.py index d3de533d..7f84efc7 100644 --- a/component-tests/util.py +++ b/component-tests/util.py @@ -44,25 +44,26 @@ def run_cloudflared_background(cmd, allow_input, capture_output): @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=BACKOFF_SECS * 1000) -def wait_tunnel_ready(expect_connections=4): - url = f'http://localhost:{METRICS_PORT}/ready' +def wait_tunnel_ready(tunnel_url=None, expect_connections=4): + metrics_url = f'http://localhost:{METRICS_PORT}/ready' with requests.Session() as s: - resp = send_request(s, url, True) + resp = send_request(s, metrics_url, True) assert resp.json()[ - "readyConnections"] == expect_connections, f"Ready endpoint returned {resp.json()} but we expect {expect_connections} ready connections" + "readyConnections"] >= expect_connections, f"Ready endpoint returned {resp.json()} but we expect at least {expect_connections} connections" + if tunnel_url is not None: + send_request(s, tunnel_url, True) @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=BACKOFF_SECS * 1000) -def check_tunnel_not_ready(): +def check_tunnel_not_connected(): url = f'http://localhost:{METRICS_PORT}/ready' resp = requests.get(url, timeout=1) assert resp.status_code == 503, f"Expect {url} returns 503, got {resp.status_code}" + # In some cases we don't need to check response status, such as when sending batch requests to generate logs - - def send_requests(url, count, require_ok=True): errors = 0 with requests.Session() as s: