diff --git a/cmd/cloudflared/tail/cmd.go b/cmd/cloudflared/tail/cmd.go index 45cca60c..19323564 100644 --- a/cmd/cloudflared/tail/cmd.go +++ b/cmd/cloudflared/tail/cmd.go @@ -83,10 +83,16 @@ func buildTailCommand(subcommands []*cli.Command) *cli.Command { }, &cli.StringFlag{ Name: "level", - Usage: "Filter by specific log levels (debug, info, warn, error)", + Usage: "Filter by specific log levels (debug, info, warn, error). Filters by debug log level by default.", EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_LEVEL"}, Value: "debug", }, + &cli.Float64Flag{ + Name: "sample", + Usage: "Sample log events by percentage (0.0 .. 1.0). No sampling by default.", + EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_SAMPLE"}, + Value: 1.0, + }, &cli.StringFlag{ Name: "token", Usage: "Access token for a specific tunnel", @@ -172,9 +178,11 @@ func createLogger(c *cli.Context) *zerolog.Logger { func parseFilters(c *cli.Context) (*management.StreamingFilters, error) { var level *management.LogLevel var events []management.LogEventType + var sample float64 argLevel := c.String("level") argEvents := c.StringSlice("event") + argSample := c.Float64("sample") if argLevel != "" { l, ok := management.ParseLogLevel(argLevel) @@ -192,14 +200,20 @@ func parseFilters(c *cli.Context) (*management.StreamingFilters, error) { events = append(events, t) } - if level == nil && len(events) == 0 { + if argSample <= 0.0 || argSample > 1.0 { + return nil, fmt.Errorf("invalid --sample value provided, please make sure it is in the range (0.0 .. 1.0)") + } + sample = argSample + + if level == nil && len(events) == 0 && argSample != 1.0 { // When no filters are provided, do not return a StreamingFilters struct return nil, nil } return &management.StreamingFilters{ - Level: level, - Events: events, + Level: level, + Events: events, + Sampling: sample, }, nil } diff --git a/component-tests/test_tail.py b/component-tests/test_tail.py index 7f4c11ee..732bee69 100644 --- a/component-tests/test_tail.py +++ b/component-tests/test_tail.py @@ -21,12 +21,9 @@ class TestTail: print("test_start_stop_streaming") config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False) LOGGER.debug(config) - headers = {} - headers["Content-Type"] = "application/json" config_path = write_config(tmp_path, config.full_config) with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True): - wait_tunnel_ready(tunnel_url=config.get_url(), - require_min_connections=1) + wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1) cfd_cli = CloudflaredCli(config, config_path, LOGGER) url = cfd_cli.get_management_wsurl("logs", config, config_path) async with connect(url, open_timeout=5, close_timeout=3) as websocket: @@ -43,19 +40,16 @@ class TestTail: print("test_streaming_logs") config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False) LOGGER.debug(config) - headers = {} - headers["Content-Type"] = "application/json" config_path = write_config(tmp_path, config.full_config) with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True): - wait_tunnel_ready(tunnel_url=config.get_url(), - require_min_connections=1) + wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1) cfd_cli = CloudflaredCli(config, config_path, LOGGER) url = cfd_cli.get_management_wsurl("logs", config, config_path) async with connect(url, open_timeout=5, close_timeout=5) as websocket: # send start_streaming await websocket.send('{"type": "start_streaming"}') # send some http requests to the tunnel to trigger some logs - await asyncio.wait_for(generate_and_validate_log_event(websocket, config.get_url()), 10) + await generate_and_validate_http_events(websocket, config.get_url(), 10) # send stop_streaming await websocket.send('{"type": "stop_streaming"}') @@ -68,12 +62,36 @@ class TestTail: print("test_streaming_logs_filters") config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False) LOGGER.debug(config) - headers = {} - headers["Content-Type"] = "application/json" config_path = write_config(tmp_path, config.full_config) with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True): - wait_tunnel_ready(tunnel_url=config.get_url(), - require_min_connections=1) + wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1) + cfd_cli = CloudflaredCli(config, config_path, LOGGER) + url = cfd_cli.get_management_wsurl("logs", config, config_path) + async with connect(url, open_timeout=5, close_timeout=5) as websocket: + # send start_streaming with tcp logs only + await websocket.send(json.dumps({ + "type": "start_streaming", + "filters": { + "events": ["tcp"], + "level": "debug" + } + })) + # don't expect any http logs + await generate_and_validate_no_log_event(websocket, config.get_url()) + # send stop_streaming + await websocket.send('{"type": "stop_streaming"}') + + @pytest.mark.asyncio + async def test_streaming_logs_sampling(self, tmp_path, component_tests_config): + """ + Validates that a streaming logs connection will stream logs with sampling. + """ + print("test_streaming_logs_sampling") + config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False) + LOGGER.debug(config) + config_path = write_config(tmp_path, config.full_config) + with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True): + wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1) cfd_cli = CloudflaredCli(config, config_path, LOGGER) url = cfd_cli.get_management_wsurl("logs", config, config_path) async with connect(url, open_timeout=5, close_timeout=5) as websocket: @@ -81,11 +99,12 @@ class TestTail: await websocket.send(json.dumps({ "type": "start_streaming", "filters": { - "events": ["tcp"] + "sampling": 0.5 } })) # don't expect any http logs - await generate_and_validate_no_log_event(websocket, config.get_url()) + count = await generate_and_validate_http_events(websocket, config.get_url(), 10) + assert count < (10 * 2) # There are typically always two log lines for http requests (request and response) # send stop_streaming await websocket.send('{"type": "stop_streaming"}') @@ -97,12 +116,9 @@ class TestTail: print("test_streaming_logs_actor_override") config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False) LOGGER.debug(config) - headers = {} - headers["Content-Type"] = "application/json" config_path = write_config(tmp_path, config.full_config) with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True): - wait_tunnel_ready(tunnel_url=config.get_url(), - require_min_connections=1) + wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1) cfd_cli = CloudflaredCli(config, config_path, LOGGER) url = cfd_cli.get_management_wsurl("logs", config, config_path) task = asyncio.ensure_future(start_streaming_to_be_remotely_closed(url)) @@ -131,16 +147,22 @@ async def start_streaming_override(url): await asyncio.sleep(1) # Every http request has two log lines sent -async def generate_and_validate_log_event(websocket: WebSocketClientProtocol, url: str): - send_request(url) - req_line = await websocket.recv() - log_line = json.loads(req_line) - assert log_line["type"] == "logs" - assert log_line["logs"][0]["event"] == "http" - req_line = await websocket.recv() - log_line = json.loads(req_line) - assert log_line["type"] == "logs" - assert log_line["logs"][0]["event"] == "http" +async def generate_and_validate_http_events(websocket: WebSocketClientProtocol, url: str, count_send: int): + for i in range(count_send): + send_request(url) + # There are typically always two log lines for http requests (request and response) + count = 0 + while True: + try: + req_line = await asyncio.wait_for(websocket.recv(), 2) + log_line = json.loads(req_line) + assert log_line["type"] == "logs" + assert log_line["logs"][0]["event"] == "http" + count += 1 + except asyncio.TimeoutError: + # ignore timeout from waiting for recv + break + return count # Every http request has two log lines sent async def generate_and_validate_no_log_event(websocket: WebSocketClientProtocol, url: str): diff --git a/management/events.go b/management/events.go index b89f81f4..528bae18 100644 --- a/management/events.go +++ b/management/events.go @@ -52,8 +52,9 @@ type EventStartStreaming struct { } type StreamingFilters struct { - Events []LogEventType `json:"events,omitempty"` - Level *LogLevel `json:"level,omitempty"` + Events []LogEventType `json:"events,omitempty"` + Level *LogLevel `json:"level,omitempty"` + Sampling float64 `json:"sampling,omitempty"` } // EventStopStreaming signifies that the client wishes to halt receiving log events. diff --git a/management/events_test.go b/management/events_test.go index 70cd4428..1d40e29b 100644 --- a/management/events_test.go +++ b/management/events_test.go @@ -57,13 +57,23 @@ func TestIntoClientEvent_StartStreaming(t *testing.T) { }, }, }, + { + name: "sampling filter", + expected: EventStartStreaming{ + ClientEvent: ClientEvent{Type: StartStreaming}, + Filters: &StreamingFilters{ + Sampling: 0.5, + }, + }, + }, { name: "level and events filters", expected: EventStartStreaming{ ClientEvent: ClientEvent{Type: StartStreaming}, Filters: &StreamingFilters{ - Level: infoLevel, - Events: []LogEventType{Cloudflared}, + Level: infoLevel, + Events: []LogEventType{Cloudflared}, + Sampling: 0.5, }, }, }, diff --git a/management/session.go b/management/session.go index 52120d39..8b8ccd89 100644 --- a/management/session.go +++ b/management/session.go @@ -2,6 +2,7 @@ package management import ( "context" + "math/rand" "sync/atomic" ) @@ -25,6 +26,8 @@ type session struct { listener chan *Log // Types of log events that this session will provide through the listener filters *StreamingFilters + // Sampling of the log events this session will send (runs after all other filters if available) + sampler *sampler } // NewSession creates a new session. @@ -43,6 +46,20 @@ func newSession(size int, actor actor, cancel context.CancelFunc) *session { func (s *session) Filters(filters *StreamingFilters) { if filters != nil { s.filters = filters + sampling := filters.Sampling + // clamp the sampling values between 0 and 1 + if sampling < 0 { + sampling = 0 + } + if sampling > 1 { + sampling = 1 + } + s.filters.Sampling = sampling + if sampling > 0 && sampling < 1 { + s.sampler = &sampler{ + p: int(sampling * 100), + } + } } else { s.filters = &StreamingFilters{} } @@ -61,6 +78,10 @@ func (s *session) Insert(log *Log) { if len(s.filters.Events) != 0 && !contains(s.filters.Events, log.Event) { return } + // Sampling is also optional + if s.sampler != nil && !s.sampler.Sample() { + return + } select { case s.listener <- log: default: @@ -86,3 +107,14 @@ func contains(array []LogEventType, t LogEventType) bool { } return false } + +// sampler will send approximately every p percentage log events out of 100. +type sampler struct { + p int +} + +// Sample returns true if the event should be part of the sample, false if the event should be dropped. +func (s *sampler) Sample() bool { + return rand.Intn(100) <= s.p + +} diff --git a/management/session_test.go b/management/session_test.go index 9ed947be..7e26f1dc 100644 --- a/management/session_test.go +++ b/management/session_test.go @@ -67,6 +67,27 @@ func TestSession_Insert(t *testing.T) { }, expectLog: false, }, + { + name: "sampling", + filters: StreamingFilters{ + Sampling: 0.9999999, + }, + expectLog: true, + }, + { + name: "sampling (invalid negative)", + filters: StreamingFilters{ + Sampling: -1.0, + }, + expectLog: true, + }, + { + name: "sampling (invalid too large)", + filters: StreamingFilters{ + Sampling: 2.0, + }, + expectLog: true, + }, { name: "filter and event", filters: StreamingFilters{