Revert "Fix race condition in proxy test"

This reverts commit 1d68ad253f.
This commit is contained in:
MoofMonkey 2022-10-04 06:19:14 +03:00
parent 9d0c5d8ffa
commit 36554d8648
1 changed files with 40 additions and 57 deletions

View File

@ -465,10 +465,11 @@ func (r *replayer) Bytes() []byte {
// eyeball sends tcp packets wrapped in websockets. (E.g: cloudflared access). // eyeball sends tcp packets wrapped in websockets. (E.g: cloudflared access).
func TestConnections(t *testing.T) { func TestConnections(t *testing.T) {
logger := logger.Create(nil) logger := logger.Create(nil)
replayer := &replayer{rw: &bytes.Buffer{}}
type args struct { type args struct {
ingressServiceScheme string ingressServiceScheme string
originService func(*testing.T, net.Listener) originService func(*testing.T, net.Listener)
eyeballResponseWriter func(io.Writer) connection.ResponseWriter eyeballResponseWriter connection.ResponseWriter
eyeballRequestBody io.ReadCloser eyeballRequestBody io.ReadCloser
// Can be set to nil to show warp routing is not enabled. // Can be set to nil to show warp routing is not enabled.
@ -495,13 +496,11 @@ func TestConnections(t *testing.T) {
{ {
name: "ws-ws proxy", name: "ws-ws proxy",
args: args{ args: args{
ingressServiceScheme: "ws://", ingressServiceScheme: "ws://",
originService: runEchoWSService, originService: runEchoWSService,
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter { eyeballResponseWriter: newWSRespWriter(replayer),
return newWSRespWriter(w) eyeballRequestBody: newWSRequestBody([]byte("test1")),
}, connectionType: connection.TypeWebsocket,
eyeballRequestBody: newWSRequestBody([]byte("test1")),
connectionType: connection.TypeWebsocket,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
// Example key from https://tools.ietf.org/html/rfc6455#section-1.2 // Example key from https://tools.ietf.org/html/rfc6455#section-1.2
"Sec-Websocket-Key": {"dGhlIHNhbXBsZSBub25jZQ=="}, "Sec-Websocket-Key": {"dGhlIHNhbXBsZSBub25jZQ=="},
@ -521,14 +520,12 @@ func TestConnections(t *testing.T) {
{ {
name: "tcp-tcp proxy", name: "tcp-tcp proxy",
args: args{ args: args{
ingressServiceScheme: "tcp://", ingressServiceScheme: "tcp://",
originService: runEchoTCPService, originService: runEchoTCPService,
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter { eyeballResponseWriter: newTCPRespWriter(replayer),
return newTCPRespWriter(w) eyeballRequestBody: newTCPRequestBody([]byte("test2")),
}, warpRoutingService: ingress.NewWarpRoutingService(testWarpRouting),
eyeballRequestBody: newTCPRequestBody([]byte("test2")), connectionType: connection.TypeTCP,
warpRoutingService: ingress.NewWarpRoutingService(testWarpRouting),
connectionType: connection.TypeTCP,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
"Cf-Cloudflared-Proxy-Src": {"non-blank-value"}, "Cf-Cloudflared-Proxy-Src": {"non-blank-value"},
}, },
@ -561,13 +558,11 @@ func TestConnections(t *testing.T) {
{ {
name: "ws-tcp proxy", name: "ws-tcp proxy",
args: args{ args: args{
ingressServiceScheme: "tcp://", ingressServiceScheme: "tcp://",
originService: runEchoTCPService, originService: runEchoTCPService,
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter { eyeballResponseWriter: newWSRespWriter(replayer),
return newWSRespWriter(w) eyeballRequestBody: newWSRequestBody([]byte("test4")),
}, connectionType: connection.TypeWebsocket,
eyeballRequestBody: newWSRequestBody([]byte("test4")),
connectionType: connection.TypeWebsocket,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
// Example key from https://tools.ietf.org/html/rfc6455#section-1.2 // Example key from https://tools.ietf.org/html/rfc6455#section-1.2
"Sec-Websocket-Key": {"dGhlIHNhbXBsZSBub25jZQ=="}, "Sec-Websocket-Key": {"dGhlIHNhbXBsZSBub25jZQ=="},
@ -586,51 +581,45 @@ func TestConnections(t *testing.T) {
// Send (unexpected) HTTP when origin expects WS (to unwrap for raw TCP) // Send (unexpected) HTTP when origin expects WS (to unwrap for raw TCP)
name: "http-(ws)tcp proxy", name: "http-(ws)tcp proxy",
args: args{ args: args{
ingressServiceScheme: "tcp://", ingressServiceScheme: "tcp://",
originService: runEchoTCPService, originService: runEchoTCPService,
eyeballResponseWriter: func(_ io.Writer) connection.ResponseWriter { eyeballResponseWriter: newMockHTTPRespWriter(),
return newMockHTTPRespWriter() eyeballRequestBody: http.NoBody,
}, connectionType: connection.TypeHTTP,
eyeballRequestBody: http.NoBody,
connectionType: connection.TypeHTTP,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
"Cf-Cloudflared-Proxy-Src": {"non-blank-value"}, "Cf-Cloudflared-Proxy-Src": {"non-blank-value"},
}, },
}, },
want: want{ want: want{
message: nil, message: []byte{},
headers: map[string][]string{}, headers: map[string][]string{},
}, },
}, },
{ {
name: "tcp-tcp proxy without warpRoutingService enabled", name: "tcp-tcp proxy without warpRoutingService enabled",
args: args{ args: args{
ingressServiceScheme: "tcp://", ingressServiceScheme: "tcp://",
originService: runEchoTCPService, originService: runEchoTCPService,
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter { eyeballResponseWriter: newTCPRespWriter(replayer),
return newTCPRespWriter(w) eyeballRequestBody: newTCPRequestBody([]byte("test2")),
}, connectionType: connection.TypeTCP,
eyeballRequestBody: newTCPRequestBody([]byte("test2")),
connectionType: connection.TypeTCP,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
"Cf-Cloudflared-Proxy-Src": {"non-blank-value"}, "Cf-Cloudflared-Proxy-Src": {"non-blank-value"},
}, },
}, },
want: want{ want: want{
message: nil, message: []byte{},
err: true, err: true,
}, },
}, },
{ {
name: "ws-ws proxy when origin is different", name: "ws-ws proxy when origin is different",
args: args{ args: args{
ingressServiceScheme: "ws://", ingressServiceScheme: "ws://",
originService: runEchoWSService, originService: runEchoWSService,
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter { eyeballResponseWriter: newWSRespWriter(replayer),
return newWSRespWriter(w) eyeballRequestBody: newWSRequestBody([]byte("test1")),
}, connectionType: connection.TypeWebsocket,
eyeballRequestBody: newWSRequestBody([]byte("test1")),
connectionType: connection.TypeWebsocket,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
// Example key from https://tools.ietf.org/html/rfc6455#section-1.2 // Example key from https://tools.ietf.org/html/rfc6455#section-1.2
"Sec-Websocket-Key": {"dGhlIHNhbXBsZSBub25jZQ=="}, "Sec-Websocket-Key": {"dGhlIHNhbXBsZSBub25jZQ=="},
@ -656,17 +645,15 @@ func TestConnections(t *testing.T) {
// closing the listener created by the test. // closing the listener created by the test.
ln.Close() ln.Close()
}, },
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter { eyeballResponseWriter: newTCPRespWriter(replayer),
return newTCPRespWriter(w) eyeballRequestBody: newTCPRequestBody([]byte("test2")),
}, connectionType: connection.TypeTCP,
eyeballRequestBody: newTCPRequestBody([]byte("test2")),
connectionType: connection.TypeTCP,
requestHeaders: map[string][]string{ requestHeaders: map[string][]string{
"Cf-Cloudflared-Proxy-Src": {"non-blank-value"}, "Cf-Cloudflared-Proxy-Src": {"non-blank-value"},
}, },
}, },
want: want{ want: want{
message: nil, message: []byte{},
err: true, err: true,
}, },
}, },
@ -674,7 +661,6 @@ func TestConnections(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
replayer := &replayer{rw: &bytes.Buffer{}}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ln, err := net.Listen("tcp", "127.0.0.1:0") ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
@ -695,18 +681,15 @@ func TestConnections(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
req.Header = test.args.requestHeaders req.Header = test.args.requestHeaders
respWriter := test.args.eyeballResponseWriter
var respWriter connection.ResponseWriter
if pipedReqBody, ok := test.args.eyeballRequestBody.(*pipedRequestBody); ok { if pipedReqBody, ok := test.args.eyeballRequestBody.(*pipedRequestBody); ok {
respWriter = newTCPRespWriter(pipedReqBody.pipedConn) respWriter = newTCPRespWriter(pipedReqBody.pipedConn)
go func() { go func() {
resp := pipedReqBody.roundtrip(test.args.ingressServiceScheme + ln.Addr().String()) resp := pipedReqBody.roundtrip(test.args.ingressServiceScheme + ln.Addr().String())
replayer.Write(resp) replayer.Write(resp)
}() }()
} else {
respWriter = test.args.eyeballResponseWriter(replayer)
} }
if test.args.connectionType == connection.TypeTCP { if test.args.connectionType == connection.TypeTCP {
rwa := connection.NewHTTPResponseReadWriterAcker(respWriter, req) rwa := connection.NewHTTPResponseReadWriterAcker(respWriter, req)
err = proxy.ProxyTCP(ctx, rwa, &connection.TCPRequest{Dest: dest}) err = proxy.ProxyTCP(ctx, rwa, &connection.TCPRequest{Dest: dest})