From 73a9980f381a341f4f07abc7408574730e4e9f37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20=22Pisco=22=20Fernandes?= Date: Thu, 24 Apr 2025 11:49:19 +0000 Subject: [PATCH] TUN-9255: Improve flush on write conditions in http2 tunnel type to match what is done on the edge ## Summary We have adapted our edge services to better know when they should flush on write. This is an important feature to ensure response types like Server Side Events are not buffered, and instead are propagated to the eyeball as soon as possible. This commit implements a similar logic for http2 tunnel protocol that we use in our edge services. By adding the new events stream header for json `application/x-ndjson` and using the content-length and transfer-encoding headers as well, following the RFC's: - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1 - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1 Closes TUN-9255 --- connection/connection.go | 31 +++++++++++++++++++---- connection/connection_test.go | 47 +++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/connection/connection.go b/connection/connection.go index f141d255..8c05eeea 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -26,14 +26,20 @@ const ( MaxGracePeriod = time.Minute * 3 MaxConcurrentStreams = math.MaxUint32 - contentTypeHeader = "content-type" - sseContentType = "text/event-stream" - grpcContentType = "application/grpc" + contentTypeHeader = "content-type" + contentLengthHeader = "content-length" + transferEncodingHeader = "transfer-encoding" + + sseContentType = "text/event-stream" + grpcContentType = "application/grpc" + sseJsonContentType = "application/x-ndjson" + + chunkTransferEncoding = "chunked" ) var ( switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols)) - flushableContentTypes = []string{sseContentType, grpcContentType} + flushableContentTypes = []string{sseContentType, grpcContentType, sseJsonContentType} ) // TunnelConnection represents the connection to the edge. @@ -274,6 +280,22 @@ type ConnectedFuse interface { // Helper method to let the caller know what content-types should require a flush on every // write to a ResponseWriter. func shouldFlush(headers http.Header) bool { + // When doing Server Side Events (SSE), some frameworks don't respect the `Content-Type` header. + // Therefore, we need to rely on other ways to know whether we should flush on write or not. A good + // approach is to assume that responses without `Content-Length` or with `Transfer-Encoding: chunked` + // are streams, and therefore, should be flushed right away to the eyeball. + // References: + // - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1 + // - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1 + if contentLength := headers.Get(contentLengthHeader); contentLength == "" { + return true + } + if transferEncoding := headers.Get(transferEncodingHeader); transferEncoding != "" { + transferEncoding = strings.ToLower(transferEncoding) + if strings.Contains(transferEncoding, chunkTransferEncoding) { + return true + } + } if contentType := headers.Get(contentTypeHeader); contentType != "" { contentType = strings.ToLower(contentType) for _, c := range flushableContentTypes { @@ -282,7 +304,6 @@ func shouldFlush(headers http.Header) bool { } } } - return false } diff --git a/connection/connection_test.go b/connection/connection_test.go index d1cb8557..a9dc5901 100644 --- a/connection/connection_test.go +++ b/connection/connection_test.go @@ -7,10 +7,12 @@ import ( "io" "math/big" "net/http" + "testing" "time" pkgerrors "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/stretchr/testify/require" cfdflow "github.com/cloudflare/cloudflared/flow" @@ -209,3 +211,48 @@ func (mcf mockConnectedFuse) Connected() {} func (mcf mockConnectedFuse) IsConnected() bool { return true } + +func TestShouldFlushHeaders(t *testing.T) { + tests := []struct { + headers map[string]string + shouldFlush bool + }{ + { + headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "1"}, + shouldFlush: false, + }, + { + headers: map[string]string{contentTypeHeader: "text/html", contentLengthHeader: "1"}, + shouldFlush: false, + }, + { + headers: map[string]string{contentTypeHeader: "text/event-stream", contentLengthHeader: "1"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/grpc", contentLengthHeader: "1"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/x-ndjson", contentLengthHeader: "1"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/json"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "-1", transferEncodingHeader: "chunked"}, + shouldFlush: true, + }, + } + + for _, test := range tests { + headers := http.Header{} + for k, v := range test.headers { + headers.Add(k, v) + } + + require.Equal(t, test.shouldFlush, shouldFlush(headers)) + } +}