TUN-6876: Fix flaky TestTraceICMPRouterEcho by taking account request span can return before reply

This commit is contained in:
Chung-Ting 2022-10-19 14:37:10 +01:00
parent 4d32a64f98
commit b1de2a74fa
2 changed files with 34 additions and 48 deletions

View File

@ -57,13 +57,13 @@ func TestFunnelIdleTimeout(t *testing.T) {
datagramMuxer: muxer,
}
require.NoError(t, proxy.Request(ctx, &pk, &responder))
validateEchoFlow(t, muxer, &pk)
validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
// Send second request, should reuse the funnel
require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{
datagramMuxer: nil,
}))
validateEchoFlow(t, muxer, &pk)
validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
time.Sleep(idleTimeout * 2)
newMuxer := newMockMuxer(0)
@ -71,7 +71,7 @@ func TestFunnelIdleTimeout(t *testing.T) {
datagramMuxer: newMuxer,
}
require.NoError(t, proxy.Request(ctx, &pk, &newResponder))
validateEchoFlow(t, newMuxer, &pk)
validateEchoFlow(t, <-newMuxer.cfdToEdge, &pk)
cancel()
<-proxyDone

View File

@ -6,7 +6,6 @@ import (
"fmt"
"net"
"net/netip"
"runtime"
"strings"
"sync"
"testing"
@ -95,7 +94,7 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
},
}
require.NoError(t, router.Request(ctx, &pk, &responder))
validateEchoFlow(t, muxer, &pk)
validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
}
}
cancel()
@ -148,25 +147,35 @@ func TestTraceICMPRouterEcho(t *testing.T) {
}
require.NoError(t, router.Request(ctx, &pk, &responder))
// On Windows, request span is returned before reply
if runtime.GOOS != "windows" {
validateEchoFlow(t, muxer, &pk)
firstPK := <-muxer.cfdToEdge
var requestSpan *quicpogs.TracingSpanPacket
// The order of receiving reply or request span is not deterministic
switch firstPK.Type() {
case quicpogs.DatagramTypeIP:
// reply packet
validateEchoFlow(t, firstPK, &pk)
case quicpogs.DatagramTypeTracingSpan:
// Request span
requestSpan = firstPK.(*quicpogs.TracingSpanPacket)
require.NotEmpty(t, requestSpan.Spans)
require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
default:
panic(fmt.Sprintf("received unexpected packet type %d", firstPK.Type()))
}
// Request span
receivedPacket := <-muxer.cfdToEdge
requestSpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
require.True(t, ok)
require.NotEmpty(t, requestSpan.Spans)
require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
if runtime.GOOS == "windows" {
validateEchoFlow(t, muxer, &pk)
secondPK := <-muxer.cfdToEdge
if requestSpan != nil {
// If first packet is request span, second packet should be the reply
validateEchoFlow(t, secondPK, &pk)
} else {
requestSpan = secondPK.(*quicpogs.TracingSpanPacket)
require.NotEmpty(t, requestSpan.Spans)
require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
}
// Reply span
receivedPacket = <-muxer.cfdToEdge
replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
thirdPacket := <-muxer.cfdToEdge
replySpan, ok := thirdPacket.(*quicpogs.TracingSpanPacket)
require.True(t, ok)
require.NotEmpty(t, replySpan.Spans)
require.True(t, bytes.Equal(serializedIdentity, replySpan.TracingIdentity))
@ -179,10 +188,10 @@ func TestTraceICMPRouterEcho(t *testing.T) {
datagramMuxer: muxer,
}
require.NoError(t, router.Request(ctx, &pk, &newResponder))
validateEchoFlow(t, muxer, &pk)
validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
select {
case receivedPacket = <-muxer.cfdToEdge:
case receivedPacket := <-muxer.cfdToEdge:
panic(fmt.Sprintf("Receive unexpected packet %+v", receivedPacket))
default:
}
@ -240,7 +249,7 @@ func TestConcurrentRequestsToSameDst(t *testing.T) {
},
}
require.NoError(t, router.Request(ctx, pk, &responder))
validateEchoFlow(t, muxer, pk)
validateEchoFlow(t, <-muxer.cfdToEdge, pk)
}
}()
go func() {
@ -268,7 +277,7 @@ func TestConcurrentRequestsToSameDst(t *testing.T) {
},
}
require.NoError(t, router.Request(ctx, pk, &responder))
validateEchoFlow(t, muxer, pk)
validateEchoFlow(t, <-muxer.cfdToEdge, pk)
}
}()
}
@ -357,33 +366,10 @@ func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp.
}
}
type echoFlowResponder struct {
lock sync.Mutex
decoder *packet.ICMPDecoder
respChan chan []byte
}
func (efr *echoFlowResponder) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
efr.lock.Lock()
defer efr.lock.Unlock()
copiedPacket := make([]byte, len(pk.Data))
copy(copiedPacket, pk.Data)
efr.respChan <- copiedPacket
return nil
}
func (efr *echoFlowResponder) Close() error {
efr.lock.Lock()
defer efr.lock.Unlock()
close(efr.respChan)
return nil
}
func validateEchoFlow(t *testing.T, muxer *mockMuxer, echoReq *packet.ICMP) {
pk := <-muxer.cfdToEdge
func validateEchoFlow(t *testing.T, pk quicpogs.Packet, echoReq *packet.ICMP) {
decoder := packet.NewICMPDecoder()
decoded, err := decoder.Decode(packet.RawPacket{Data: pk.Payload()})
require.NoError(t, err)
require.NoError(t, err, pk)
require.Equal(t, decoded.Src, echoReq.Dst)
require.Equal(t, decoded.Dst, echoReq.Src)
require.Equal(t, echoReq.Protocol, decoded.Protocol)