Fix race condition in proxy test

This commit is contained in:
MoofMonkey 2022-10-03 13:05:48 +03:00
parent 301b2baca3
commit 1d68ad253f
1 changed files with 57 additions and 40 deletions

View File

@ -465,11 +465,10 @@ func (r *replayer) Bytes() []byte {
// eyeball sends tcp packets wrapped in websockets. (E.g: cloudflared access).
func TestConnections(t *testing.T) {
logger := logger.Create(nil)
replayer := &replayer{rw: &bytes.Buffer{}}
type args struct {
ingressServiceScheme string
originService func(*testing.T, net.Listener)
eyeballResponseWriter connection.ResponseWriter
eyeballResponseWriter func(io.Writer) connection.ResponseWriter
eyeballRequestBody io.ReadCloser
// Can be set to nil to show warp routing is not enabled.
@ -498,7 +497,9 @@ func TestConnections(t *testing.T) {
args: args{
ingressServiceScheme: "ws://",
originService: runEchoWSService,
eyeballResponseWriter: newWSRespWriter(replayer),
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter {
return newWSRespWriter(w)
},
eyeballRequestBody: newWSRequestBody([]byte("test1")),
connectionType: connection.TypeWebsocket,
requestHeaders: map[string][]string{
@ -522,7 +523,9 @@ func TestConnections(t *testing.T) {
args: args{
ingressServiceScheme: "tcp://",
originService: runEchoTCPService,
eyeballResponseWriter: newTCPRespWriter(replayer),
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter {
return newTCPRespWriter(w)
},
eyeballRequestBody: newTCPRequestBody([]byte("test2")),
warpRoutingService: ingress.NewWarpRoutingService(testWarpRouting),
connectionType: connection.TypeTCP,
@ -560,7 +563,9 @@ func TestConnections(t *testing.T) {
args: args{
ingressServiceScheme: "tcp://",
originService: runEchoTCPService,
eyeballResponseWriter: newWSRespWriter(replayer),
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter {
return newWSRespWriter(w)
},
eyeballRequestBody: newWSRequestBody([]byte("test4")),
connectionType: connection.TypeWebsocket,
requestHeaders: map[string][]string{
@ -583,7 +588,9 @@ func TestConnections(t *testing.T) {
args: args{
ingressServiceScheme: "tcp://",
originService: runEchoTCPService,
eyeballResponseWriter: newMockHTTPRespWriter(),
eyeballResponseWriter: func(_ io.Writer) connection.ResponseWriter {
return newMockHTTPRespWriter()
},
eyeballRequestBody: http.NoBody,
connectionType: connection.TypeHTTP,
requestHeaders: map[string][]string{
@ -591,7 +598,7 @@ func TestConnections(t *testing.T) {
},
},
want: want{
message: []byte{},
message: nil,
headers: map[string][]string{},
},
},
@ -600,7 +607,9 @@ func TestConnections(t *testing.T) {
args: args{
ingressServiceScheme: "tcp://",
originService: runEchoTCPService,
eyeballResponseWriter: newTCPRespWriter(replayer),
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter {
return newTCPRespWriter(w)
},
eyeballRequestBody: newTCPRequestBody([]byte("test2")),
connectionType: connection.TypeTCP,
requestHeaders: map[string][]string{
@ -608,7 +617,7 @@ func TestConnections(t *testing.T) {
},
},
want: want{
message: []byte{},
message: nil,
err: true,
},
},
@ -617,7 +626,9 @@ func TestConnections(t *testing.T) {
args: args{
ingressServiceScheme: "ws://",
originService: runEchoWSService,
eyeballResponseWriter: newWSRespWriter(replayer),
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter {
return newWSRespWriter(w)
},
eyeballRequestBody: newWSRequestBody([]byte("test1")),
connectionType: connection.TypeWebsocket,
requestHeaders: map[string][]string{
@ -645,7 +656,9 @@ func TestConnections(t *testing.T) {
// closing the listener created by the test.
ln.Close()
},
eyeballResponseWriter: newTCPRespWriter(replayer),
eyeballResponseWriter: func(w io.Writer) connection.ResponseWriter {
return newTCPRespWriter(w)
},
eyeballRequestBody: newTCPRequestBody([]byte("test2")),
connectionType: connection.TypeTCP,
requestHeaders: map[string][]string{
@ -653,7 +666,7 @@ func TestConnections(t *testing.T) {
},
},
want: want{
message: []byte{},
message: nil,
err: true,
},
},
@ -661,6 +674,7 @@ func TestConnections(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
replayer := &replayer{rw: &bytes.Buffer{}}
ctx, cancel := context.WithCancel(context.Background())
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
@ -681,15 +695,18 @@ func TestConnections(t *testing.T) {
require.NoError(t, err)
req.Header = test.args.requestHeaders
respWriter := test.args.eyeballResponseWriter
var respWriter connection.ResponseWriter
if pipedReqBody, ok := test.args.eyeballRequestBody.(*pipedRequestBody); ok {
respWriter = newTCPRespWriter(pipedReqBody.pipedConn)
go func() {
resp := pipedReqBody.roundtrip(test.args.ingressServiceScheme + ln.Addr().String())
replayer.Write(resp)
}()
} else {
respWriter = test.args.eyeballResponseWriter(replayer)
}
if test.args.connectionType == connection.TypeTCP {
rwa := connection.NewHTTPResponseReadWriterAcker(respWriter, req)
err = proxy.ProxyTCP(ctx, rwa, &connection.TCPRequest{Dest: dest})