From e9d07e35c799d8852baefd0c27f74a0a6360d896 Mon Sep 17 00:00:00 2001 From: cthuang Date: Sun, 16 Oct 2022 17:55:35 +0100 Subject: [PATCH] TUN-6861: Trace ICMP on Windows --- ingress/icmp_windows.go | 80 +++++++++++++++++++++++++------ ingress/icmp_windows_test.go | 2 +- ingress/origin_icmp_proxy_test.go | 14 ++++-- 3 files changed, 76 insertions(+), 20 deletions(-) diff --git a/ingress/icmp_windows.go b/ingress/icmp_windows.go index c32b6f1d..10f5bf58 100644 --- a/ingress/icmp_windows.go +++ b/ingress/icmp_windows.go @@ -21,11 +21,13 @@ import ( "github.com/google/gopacket/layers" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" "github.com/cloudflare/cloudflared/packet" + "github.com/cloudflare/cloudflared/tracing" ) const ( @@ -266,33 +268,50 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { // The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version. // It's possible that a slow request will block other requests, so we set the timeout to only 1s. func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { - if pk == nil { - return errPacketNil - } defer func() { if r := recover(); r != nil { ip.logger.Error().Interface("error", r).Msgf("Recover panic from sending icmp request/response, error %s", debug.Stack()) } }() + _, requestSpan := responder.requestSpan(ctx, pk) + defer responder.exportSpan() + echo, err := getICMPEcho(pk.Message) if err != nil { return err } - respData, err := ip.icmpEchoRoundtrip(pk.Dst, echo) + requestSpan.SetAttributes( + attribute.Int("originalEchoID", echo.ID), + attribute.Int("seq", echo.Seq), + ) + + resp, err := ip.icmpEchoRoundtrip(pk.Dst, echo) if err != nil { ip.logger.Err(err).Msg("ICMP echo roundtrip failed") + tracing.EndWithErrorStatus(requestSpan, err) return err } + tracing.End(requestSpan) + responder.exportSpan() - err = ip.handleEchoReply(pk, echo, respData, responder) + _, replySpan := responder.replySpan(ctx, ip.logger) + replySpan.SetAttributes( + attribute.Int("originalEchoID", echo.ID), + attribute.Int("seq", echo.Seq), + attribute.Int64("rtt", int64(resp.rtt())), + attribute.String("status", resp.status().String()), + ) + err = ip.handleEchoReply(pk, echo, resp, responder) if err != nil { + tracing.EndWithErrorStatus(replySpan, err) return errors.Wrap(err, "failed to handle ICMP echo reply") } + tracing.End(replySpan) return nil } -func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder *packetResponder) error { +func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, resp echoResp, responder *packetResponder) error { var replyType icmp.Type if request.Dst.Is4() { replyType = ipv4.ICMPTypeEchoReply @@ -313,7 +332,7 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d Body: &icmp.Echo{ ID: echoReq.ID, Seq: echoReq.Seq, - Data: data, + Data: resp.payload(), }, }, } @@ -334,16 +353,17 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d return responder.returnPacket(serializedPacket) } -func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, error) { +func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) (echoResp, error) { if dst.Is6() { if ip.srcSocketAddr == nil { return nil, fmt.Errorf("cannot send ICMPv6 using ICMPv4 proxy") } resp, err := ip.icmp6SendEcho(dst, echo) if err != nil { + return nil, errors.Wrap(err, "failed to send/receive ICMPv6 echo") } - return resp.data, nil + return resp, nil } if ip.srcSocketAddr != nil { return nil, fmt.Errorf("cannot send ICMPv4 using ICMPv6 proxy") @@ -352,7 +372,7 @@ func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, if err != nil { return nil, errors.Wrap(err, "failed to send/receive ICMPv4 echo") } - return resp.data, nil + return resp, nil } /* @@ -371,7 +391,7 @@ func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, To retain the reference allocated objects, conversion from pointer to uintptr must happen as arguments to the syscall function */ -func (ip *icmpProxy) icmpSendEcho(dst netip.Addr, echo *icmp.Echo) (*echoResp, error) { +func (ip *icmpProxy) icmpSendEcho(dst netip.Addr, echo *icmp.Echo) (*echoV4Resp, error) { dataSize := len(echo.Data) replySize := echoReplySize + uintptr(dataSize) replyBuf := make([]byte, replySize) @@ -399,7 +419,7 @@ func (ip *icmpProxy) icmpSendEcho(dst netip.Addr, echo *icmp.Echo) (*echoResp, e } else if replyCount > 1 { ip.logger.Warn().Msgf("Received %d ICMP echo replies, only sending 1 back", replyCount) } - return newEchoResp(replyBuf) + return newEchoV4Resp(replyBuf) } // Third definition of https://docs.microsoft.com/en-us/windows/win32/api/inaddr/ns-inaddr-in_addr#syntax is address in uint32 @@ -411,12 +431,30 @@ func inAddrV4(ip netip.Addr) (uint32, error) { return endian.Uint32(v4[:]), nil } -type echoResp struct { +type echoResp interface { + status() ipStatus + rtt() uint32 + payload() []byte +} + +type echoV4Resp struct { reply *echoReply data []byte } -func newEchoResp(replyBuf []byte) (*echoResp, error) { +func (r *echoV4Resp) status() ipStatus { + return r.reply.Status +} + +func (r *echoV4Resp) rtt() uint32 { + return r.reply.RoundTripTime +} + +func (r *echoV4Resp) payload() []byte { + return r.data +} + +func newEchoV4Resp(replyBuf []byte) (*echoV4Resp, error) { if len(replyBuf) == 0 { return nil, fmt.Errorf("reply buffer is empty") } @@ -430,7 +468,7 @@ func newEchoResp(replyBuf []byte) (*echoResp, error) { if dataBufStart < int(echoReplySize) { return nil, fmt.Errorf("reply buffer size %d is too small to hold data of size %d", len(replyBuf), int(reply.DataSize)) } - return &echoResp{ + return &echoV4Resp{ reply: &reply, data: replyBuf[dataBufStart:], }, nil @@ -502,6 +540,18 @@ type echoV6Resp struct { data []byte } +func (r *echoV6Resp) status() ipStatus { + return r.reply.Status +} + +func (r *echoV6Resp) rtt() uint32 { + return r.reply.RoundTripTime +} + +func (r *echoV6Resp) payload() []byte { + return r.data +} + func newEchoV6Resp(replyBuf []byte, dataSize int) (*echoV6Resp, error) { if len(replyBuf) == 0 { return nil, fmt.Errorf("reply buffer is empty") diff --git a/ingress/icmp_windows_test.go b/ingress/icmp_windows_test.go index eefc654f..a98d74d2 100644 --- a/ingress/icmp_windows_test.go +++ b/ingress/icmp_windows_test.go @@ -59,7 +59,7 @@ func TestParseEchoReply(t *testing.T) { } for _, test := range tests { - resp, err := newEchoResp(test.replyBuf) + resp, err := newEchoV4Resp(test.replyBuf) if test.expectedReply == nil { require.Error(t, err) require.Nil(t, resp) diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index 314ec3c4..258d3374 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -103,9 +103,6 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { } func TestTraceICMPRouterEcho(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("TODO: TUN-6861 Trace ICMP in Windows") - } tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1" router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) @@ -149,8 +146,12 @@ func TestTraceICMPRouterEcho(t *testing.T) { Body: echo, }, } + require.NoError(t, router.Request(ctx, &pk, &responder)) - validateEchoFlow(t, muxer, &pk) + // On Windows, request span is returned before reply + if runtime.GOOS != "windows" { + validateEchoFlow(t, muxer, &pk) + } // Request span receivedPacket := <-muxer.cfdToEdge @@ -158,6 +159,11 @@ func TestTraceICMPRouterEcho(t *testing.T) { require.True(t, ok) require.NotEmpty(t, requestSpan.Spans) require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity)) + + if runtime.GOOS == "windows" { + validateEchoFlow(t, muxer, &pk) + } + // Reply span receivedPacket = <-muxer.cfdToEdge replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)