diff --git a/connection/connection.go b/connection/connection.go index f141d255..8c05eeea 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -26,14 +26,20 @@ const ( MaxGracePeriod = time.Minute * 3 MaxConcurrentStreams = math.MaxUint32 - contentTypeHeader = "content-type" - sseContentType = "text/event-stream" - grpcContentType = "application/grpc" + contentTypeHeader = "content-type" + contentLengthHeader = "content-length" + transferEncodingHeader = "transfer-encoding" + + sseContentType = "text/event-stream" + grpcContentType = "application/grpc" + sseJsonContentType = "application/x-ndjson" + + chunkTransferEncoding = "chunked" ) var ( switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols)) - flushableContentTypes = []string{sseContentType, grpcContentType} + flushableContentTypes = []string{sseContentType, grpcContentType, sseJsonContentType} ) // TunnelConnection represents the connection to the edge. @@ -274,6 +280,22 @@ type ConnectedFuse interface { // Helper method to let the caller know what content-types should require a flush on every // write to a ResponseWriter. func shouldFlush(headers http.Header) bool { + // When doing Server Side Events (SSE), some frameworks don't respect the `Content-Type` header. + // Therefore, we need to rely on other ways to know whether we should flush on write or not. A good + // approach is to assume that responses without `Content-Length` or with `Transfer-Encoding: chunked` + // are streams, and therefore, should be flushed right away to the eyeball. + // References: + // - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1 + // - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1 + if contentLength := headers.Get(contentLengthHeader); contentLength == "" { + return true + } + if transferEncoding := headers.Get(transferEncodingHeader); transferEncoding != "" { + transferEncoding = strings.ToLower(transferEncoding) + if strings.Contains(transferEncoding, chunkTransferEncoding) { + return true + } + } if contentType := headers.Get(contentTypeHeader); contentType != "" { contentType = strings.ToLower(contentType) for _, c := range flushableContentTypes { @@ -282,7 +304,6 @@ func shouldFlush(headers http.Header) bool { } } } - return false } diff --git a/connection/connection_test.go b/connection/connection_test.go index d1cb8557..a9dc5901 100644 --- a/connection/connection_test.go +++ b/connection/connection_test.go @@ -7,10 +7,12 @@ import ( "io" "math/big" "net/http" + "testing" "time" pkgerrors "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/stretchr/testify/require" cfdflow "github.com/cloudflare/cloudflared/flow" @@ -209,3 +211,48 @@ func (mcf mockConnectedFuse) Connected() {} func (mcf mockConnectedFuse) IsConnected() bool { return true } + +func TestShouldFlushHeaders(t *testing.T) { + tests := []struct { + headers map[string]string + shouldFlush bool + }{ + { + headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "1"}, + shouldFlush: false, + }, + { + headers: map[string]string{contentTypeHeader: "text/html", contentLengthHeader: "1"}, + shouldFlush: false, + }, + { + headers: map[string]string{contentTypeHeader: "text/event-stream", contentLengthHeader: "1"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/grpc", contentLengthHeader: "1"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/x-ndjson", contentLengthHeader: "1"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/json"}, + shouldFlush: true, + }, + { + headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "-1", transferEncodingHeader: "chunked"}, + shouldFlush: true, + }, + } + + for _, test := range tests { + headers := http.Header{} + for k, v := range test.headers { + headers.Add(k, v) + } + + require.Equal(t, test.shouldFlush, shouldFlush(headers)) + } +}