From b1de2a74faa69a47744a4fc95204b6a6929f866c Mon Sep 17 00:00:00 2001 From: Chung-Ting Date: Wed, 19 Oct 2022 14:37:10 +0100 Subject: [PATCH] TUN-6876: Fix flaky TestTraceICMPRouterEcho by taking account request span can return before reply --- ingress/icmp_posix_test.go | 6 +-- ingress/origin_icmp_proxy_test.go | 76 +++++++++++++------------------ 2 files changed, 34 insertions(+), 48 deletions(-) diff --git a/ingress/icmp_posix_test.go b/ingress/icmp_posix_test.go index d4b49a1e..a857dacc 100644 --- a/ingress/icmp_posix_test.go +++ b/ingress/icmp_posix_test.go @@ -57,13 +57,13 @@ func TestFunnelIdleTimeout(t *testing.T) { datagramMuxer: muxer, } require.NoError(t, proxy.Request(ctx, &pk, &responder)) - validateEchoFlow(t, muxer, &pk) + validateEchoFlow(t, <-muxer.cfdToEdge, &pk) // Send second request, should reuse the funnel require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{ datagramMuxer: nil, })) - validateEchoFlow(t, muxer, &pk) + validateEchoFlow(t, <-muxer.cfdToEdge, &pk) time.Sleep(idleTimeout * 2) newMuxer := newMockMuxer(0) @@ -71,7 +71,7 @@ func TestFunnelIdleTimeout(t *testing.T) { datagramMuxer: newMuxer, } require.NoError(t, proxy.Request(ctx, &pk, &newResponder)) - validateEchoFlow(t, newMuxer, &pk) + validateEchoFlow(t, <-newMuxer.cfdToEdge, &pk) cancel() <-proxyDone diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index 258d3374..ae9f54b9 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "net/netip" - "runtime" "strings" "sync" "testing" @@ -95,7 +94,7 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { }, } require.NoError(t, router.Request(ctx, &pk, &responder)) - validateEchoFlow(t, muxer, &pk) + validateEchoFlow(t, <-muxer.cfdToEdge, &pk) } } cancel() @@ -148,25 +147,35 @@ func TestTraceICMPRouterEcho(t *testing.T) { } require.NoError(t, router.Request(ctx, &pk, &responder)) - // On Windows, request span is returned before reply - if runtime.GOOS != "windows" { - validateEchoFlow(t, muxer, &pk) + firstPK := <-muxer.cfdToEdge + var requestSpan *quicpogs.TracingSpanPacket + // The order of receiving reply or request span is not deterministic + switch firstPK.Type() { + case quicpogs.DatagramTypeIP: + // reply packet + validateEchoFlow(t, firstPK, &pk) + case quicpogs.DatagramTypeTracingSpan: + // Request span + requestSpan = firstPK.(*quicpogs.TracingSpanPacket) + require.NotEmpty(t, requestSpan.Spans) + require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity)) + default: + panic(fmt.Sprintf("received unexpected packet type %d", firstPK.Type())) } - // Request span - receivedPacket := <-muxer.cfdToEdge - requestSpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket) - require.True(t, ok) - require.NotEmpty(t, requestSpan.Spans) - require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity)) - - if runtime.GOOS == "windows" { - validateEchoFlow(t, muxer, &pk) + secondPK := <-muxer.cfdToEdge + if requestSpan != nil { + // If first packet is request span, second packet should be the reply + validateEchoFlow(t, secondPK, &pk) + } else { + requestSpan = secondPK.(*quicpogs.TracingSpanPacket) + require.NotEmpty(t, requestSpan.Spans) + require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity)) } // Reply span - receivedPacket = <-muxer.cfdToEdge - replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket) + thirdPacket := <-muxer.cfdToEdge + replySpan, ok := thirdPacket.(*quicpogs.TracingSpanPacket) require.True(t, ok) require.NotEmpty(t, replySpan.Spans) require.True(t, bytes.Equal(serializedIdentity, replySpan.TracingIdentity)) @@ -179,10 +188,10 @@ func TestTraceICMPRouterEcho(t *testing.T) { datagramMuxer: muxer, } require.NoError(t, router.Request(ctx, &pk, &newResponder)) - validateEchoFlow(t, muxer, &pk) + validateEchoFlow(t, <-muxer.cfdToEdge, &pk) select { - case receivedPacket = <-muxer.cfdToEdge: + case receivedPacket := <-muxer.cfdToEdge: panic(fmt.Sprintf("Receive unexpected packet %+v", receivedPacket)) default: } @@ -240,7 +249,7 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, } require.NoError(t, router.Request(ctx, pk, &responder)) - validateEchoFlow(t, muxer, pk) + validateEchoFlow(t, <-muxer.cfdToEdge, pk) } }() go func() { @@ -268,7 +277,7 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }, } require.NoError(t, router.Request(ctx, pk, &responder)) - validateEchoFlow(t, muxer, pk) + validateEchoFlow(t, <-muxer.cfdToEdge, pk) } }() } @@ -357,33 +366,10 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp. } } -type echoFlowResponder struct { - lock sync.Mutex - decoder *packet.ICMPDecoder - respChan chan []byte -} - -func (efr *echoFlowResponder) SendPacket(dst netip.Addr, pk packet.RawPacket) error { - efr.lock.Lock() - defer efr.lock.Unlock() - copiedPacket := make([]byte, len(pk.Data)) - copy(copiedPacket, pk.Data) - efr.respChan <- copiedPacket - return nil -} - -func (efr *echoFlowResponder) Close() error { - efr.lock.Lock() - defer efr.lock.Unlock() - close(efr.respChan) - return nil -} - -func validateEchoFlow(t *testing.T, muxer *mockMuxer, echoReq *packet.ICMP) { - pk := <-muxer.cfdToEdge +func validateEchoFlow(t *testing.T, pk quicpogs.Packet, echoReq *packet.ICMP) { decoder := packet.NewICMPDecoder() decoded, err := decoder.Decode(packet.RawPacket{Data: pk.Payload()}) - require.NoError(t, err) + require.NoError(t, err, pk) require.Equal(t, decoded.Src, echoReq.Dst) require.Equal(t, decoded.Dst, echoReq.Src) require.Equal(t, echoReq.Protocol, decoded.Protocol)