TUN-5724: Fix SSE streaming by guaranteeing we write everything we read

This commit is contained in:
João Oliveirinha 2022-01-28 14:32:09 +00:00
parent 7bac4b15b0
commit 76fb329a65
2 changed files with 32 additions and 6 deletions

View File

@ -271,11 +271,20 @@ func (wr *bidirectionalStream) Write(p []byte) (n int, err error) {
func (p *Proxy) writeEventStream(w connection.ResponseWriter, respBody io.ReadCloser) {
reader := bufio.NewReader(respBody)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
break
line, readErr := reader.ReadBytes('\n')
// We first try to write whatever we read even if an error occurred
// The reason for doing it is to guarantee we really push everything to the eyeball side
// before returning
if len(line) > 0 {
if _, writeErr := w.Write(line); writeErr != nil {
return
}
}
if readErr != nil {
return
}
_, _ = w.Write(line)
}
}

View File

@ -9,6 +9,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
@ -138,6 +139,7 @@ func TestProxySingleOrigin(t *testing.T) {
t.Run("testProxyHTTP", testProxyHTTP(proxy))
t.Run("testProxyWebsocket", testProxyWebsocket(proxy))
t.Run("testProxySSE", testProxySSE(proxy))
t.Run("testProxySSEAllData", testProxySSEAllData(proxy))
cancel()
wg.Wait()
}
@ -256,6 +258,21 @@ func testProxySSE(proxy connection.OriginProxy) func(t *testing.T) {
}
}
// Regression test to guarantee that we always write the contents downstream even if EOF is reached without
// hitting the delimiter
func testProxySSEAllData(proxy *Proxy) func(t *testing.T) {
return func(t *testing.T) {
eyeballReader := io.NopCloser(strings.NewReader("data\r\r"))
responseWriter := newMockSSERespWriter()
// responseWriter uses an unbuffered channel, so we call in a different go-routine
go proxy.writeEventStream(responseWriter, eyeballReader)
result := string(<-responseWriter.writeNotification)
require.Equal(t, "data\r\r", result)
}
}
func TestProxyMultipleOrigins(t *testing.T) {
api := httptest.NewServer(mockAPI{})
defer api.Close()
@ -447,7 +464,7 @@ func TestConnections(t *testing.T) {
// eyeball connection type.
connectionType connection.Type
//requestheaders to be sent in the call to proxy.Proxy
// requestheaders to be sent in the call to proxy.Proxy
requestHeaders http.Header
}
@ -508,7 +525,7 @@ func TestConnections(t *testing.T) {
args: args{
ingressServiceScheme: "ws://",
originService: runEchoWSService,
//eyeballResponseWriter gets set after roundtrip dial.
// eyeballResponseWriter gets set after roundtrip dial.
eyeballRequestBody: newPipedWSRequestBody([]byte("test3")),
warpRoutingService: ingress.NewWarpRoutingService(),
requestHeaders: map[string][]string{