From 76fb329a6584fca39179033e5b1919ee383bedba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveirinha?= Date: Fri, 28 Jan 2022 14:32:09 +0000 Subject: [PATCH] TUN-5724: Fix SSE streaming by guaranteeing we write everything we read --- origin/proxy.go | 17 +++++++++++++---- origin/proxy_test.go | 21 +++++++++++++++++++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/origin/proxy.go b/origin/proxy.go index 53f10de3..dca03746 100644 --- a/origin/proxy.go +++ b/origin/proxy.go @@ -271,11 +271,20 @@ func (wr *bidirectionalStream) Write(p []byte) (n int, err error) { func (p *Proxy) writeEventStream(w connection.ResponseWriter, respBody io.ReadCloser) { reader := bufio.NewReader(respBody) for { - line, err := reader.ReadBytes('\n') - if err != nil { - break + line, readErr := reader.ReadBytes('\n') + + // We first try to write whatever we read even if an error occurred + // The reason for doing it is to guarantee we really push everything to the eyeball side + // before returning + if len(line) > 0 { + if _, writeErr := w.Write(line); writeErr != nil { + return + } + } + + if readErr != nil { + return } - _, _ = w.Write(line) } } diff --git a/origin/proxy_test.go b/origin/proxy_test.go index 9c49d461..e4184d7a 100644 --- a/origin/proxy_test.go +++ b/origin/proxy_test.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "net/http/httptest" + "strings" "sync" "testing" "time" @@ -138,6 +139,7 @@ func TestProxySingleOrigin(t *testing.T) { t.Run("testProxyHTTP", testProxyHTTP(proxy)) t.Run("testProxyWebsocket", testProxyWebsocket(proxy)) t.Run("testProxySSE", testProxySSE(proxy)) + t.Run("testProxySSEAllData", testProxySSEAllData(proxy)) cancel() wg.Wait() } @@ -256,6 +258,21 @@ func testProxySSE(proxy connection.OriginProxy) func(t *testing.T) { } } +// Regression test to guarantee that we always write the contents downstream even if EOF is reached without +// hitting the delimiter +func testProxySSEAllData(proxy *Proxy) func(t *testing.T) { + return func(t *testing.T) { + eyeballReader := io.NopCloser(strings.NewReader("data\r\r")) + responseWriter := newMockSSERespWriter() + + // responseWriter uses an unbuffered channel, so we call in a different go-routine + go proxy.writeEventStream(responseWriter, eyeballReader) + + result := string(<-responseWriter.writeNotification) + require.Equal(t, "data\r\r", result) + } +} + func TestProxyMultipleOrigins(t *testing.T) { api := httptest.NewServer(mockAPI{}) defer api.Close() @@ -447,7 +464,7 @@ func TestConnections(t *testing.T) { // eyeball connection type. connectionType connection.Type - //requestheaders to be sent in the call to proxy.Proxy + // requestheaders to be sent in the call to proxy.Proxy requestHeaders http.Header } @@ -508,7 +525,7 @@ func TestConnections(t *testing.T) { args: args{ ingressServiceScheme: "ws://", originService: runEchoWSService, - //eyeballResponseWriter gets set after roundtrip dial. + // eyeballResponseWriter gets set after roundtrip dial. eyeballRequestBody: newPipedWSRequestBody([]byte("test3")), warpRoutingService: ingress.NewWarpRoutingService(), requestHeaders: map[string][]string{