TUN-6861: Trace ICMP on Windows

This commit is contained in:
cthuang 2022-10-16 17:55:35 +01:00
parent 2d5234e021
commit e9d07e35c7
3 changed files with 76 additions and 20 deletions

View File

@ -21,11 +21,13 @@ import (
"github.com/google/gopacket/layers" "github.com/google/gopacket/layers"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/net/icmp" "golang.org/x/net/icmp"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
"golang.org/x/net/ipv6" "golang.org/x/net/ipv6"
"github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/packet"
"github.com/cloudflare/cloudflared/tracing"
) )
const ( const (
@ -266,33 +268,50 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
// The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version. // The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version.
// It's possible that a slow request will block other requests, so we set the timeout to only 1s. // It's possible that a slow request will block other requests, so we set the timeout to only 1s.
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
if pk == nil {
return errPacketNil
}
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
ip.logger.Error().Interface("error", r).Msgf("Recover panic from sending icmp request/response, error %s", debug.Stack()) ip.logger.Error().Interface("error", r).Msgf("Recover panic from sending icmp request/response, error %s", debug.Stack())
} }
}() }()
_, requestSpan := responder.requestSpan(ctx, pk)
defer responder.exportSpan()
echo, err := getICMPEcho(pk.Message) echo, err := getICMPEcho(pk.Message)
if err != nil { if err != nil {
return err return err
} }
respData, err := ip.icmpEchoRoundtrip(pk.Dst, echo) requestSpan.SetAttributes(
attribute.Int("originalEchoID", echo.ID),
attribute.Int("seq", echo.Seq),
)
resp, err := ip.icmpEchoRoundtrip(pk.Dst, echo)
if err != nil { if err != nil {
ip.logger.Err(err).Msg("ICMP echo roundtrip failed") ip.logger.Err(err).Msg("ICMP echo roundtrip failed")
tracing.EndWithErrorStatus(requestSpan, err)
return err return err
} }
tracing.End(requestSpan)
responder.exportSpan()
err = ip.handleEchoReply(pk, echo, respData, responder) _, replySpan := responder.replySpan(ctx, ip.logger)
replySpan.SetAttributes(
attribute.Int("originalEchoID", echo.ID),
attribute.Int("seq", echo.Seq),
attribute.Int64("rtt", int64(resp.rtt())),
attribute.String("status", resp.status().String()),
)
err = ip.handleEchoReply(pk, echo, resp, responder)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(replySpan, err)
return errors.Wrap(err, "failed to handle ICMP echo reply") return errors.Wrap(err, "failed to handle ICMP echo reply")
} }
tracing.End(replySpan)
return nil return nil
} }
func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder *packetResponder) error { func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, resp echoResp, responder *packetResponder) error {
var replyType icmp.Type var replyType icmp.Type
if request.Dst.Is4() { if request.Dst.Is4() {
replyType = ipv4.ICMPTypeEchoReply replyType = ipv4.ICMPTypeEchoReply
@ -313,7 +332,7 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d
Body: &icmp.Echo{ Body: &icmp.Echo{
ID: echoReq.ID, ID: echoReq.ID,
Seq: echoReq.Seq, Seq: echoReq.Seq,
Data: data, Data: resp.payload(),
}, },
}, },
} }
@ -334,16 +353,17 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d
return responder.returnPacket(serializedPacket) return responder.returnPacket(serializedPacket)
} }
func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, error) { func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) (echoResp, error) {
if dst.Is6() { if dst.Is6() {
if ip.srcSocketAddr == nil { if ip.srcSocketAddr == nil {
return nil, fmt.Errorf("cannot send ICMPv6 using ICMPv4 proxy") return nil, fmt.Errorf("cannot send ICMPv6 using ICMPv4 proxy")
} }
resp, err := ip.icmp6SendEcho(dst, echo) resp, err := ip.icmp6SendEcho(dst, echo)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to send/receive ICMPv6 echo") return nil, errors.Wrap(err, "failed to send/receive ICMPv6 echo")
} }
return resp.data, nil return resp, nil
} }
if ip.srcSocketAddr != nil { if ip.srcSocketAddr != nil {
return nil, fmt.Errorf("cannot send ICMPv4 using ICMPv6 proxy") return nil, fmt.Errorf("cannot send ICMPv4 using ICMPv6 proxy")
@ -352,7 +372,7 @@ func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte,
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to send/receive ICMPv4 echo") return nil, errors.Wrap(err, "failed to send/receive ICMPv4 echo")
} }
return resp.data, nil return resp, nil
} }
/* /*
@ -371,7 +391,7 @@ func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte,
To retain the reference allocated objects, conversion from pointer to uintptr must happen as arguments to the To retain the reference allocated objects, conversion from pointer to uintptr must happen as arguments to the
syscall function syscall function
*/ */
func (ip *icmpProxy) icmpSendEcho(dst netip.Addr, echo *icmp.Echo) (*echoResp, error) { func (ip *icmpProxy) icmpSendEcho(dst netip.Addr, echo *icmp.Echo) (*echoV4Resp, error) {
dataSize := len(echo.Data) dataSize := len(echo.Data)
replySize := echoReplySize + uintptr(dataSize) replySize := echoReplySize + uintptr(dataSize)
replyBuf := make([]byte, replySize) replyBuf := make([]byte, replySize)
@ -399,7 +419,7 @@ func (ip *icmpProxy) icmpSendEcho(dst netip.Addr, echo *icmp.Echo) (*echoResp, e
} else if replyCount > 1 { } else if replyCount > 1 {
ip.logger.Warn().Msgf("Received %d ICMP echo replies, only sending 1 back", replyCount) ip.logger.Warn().Msgf("Received %d ICMP echo replies, only sending 1 back", replyCount)
} }
return newEchoResp(replyBuf) return newEchoV4Resp(replyBuf)
} }
// Third definition of https://docs.microsoft.com/en-us/windows/win32/api/inaddr/ns-inaddr-in_addr#syntax is address in uint32 // Third definition of https://docs.microsoft.com/en-us/windows/win32/api/inaddr/ns-inaddr-in_addr#syntax is address in uint32
@ -411,12 +431,30 @@ func inAddrV4(ip netip.Addr) (uint32, error) {
return endian.Uint32(v4[:]), nil return endian.Uint32(v4[:]), nil
} }
type echoResp struct { type echoResp interface {
status() ipStatus
rtt() uint32
payload() []byte
}
type echoV4Resp struct {
reply *echoReply reply *echoReply
data []byte data []byte
} }
func newEchoResp(replyBuf []byte) (*echoResp, error) { func (r *echoV4Resp) status() ipStatus {
return r.reply.Status
}
func (r *echoV4Resp) rtt() uint32 {
return r.reply.RoundTripTime
}
func (r *echoV4Resp) payload() []byte {
return r.data
}
func newEchoV4Resp(replyBuf []byte) (*echoV4Resp, error) {
if len(replyBuf) == 0 { if len(replyBuf) == 0 {
return nil, fmt.Errorf("reply buffer is empty") return nil, fmt.Errorf("reply buffer is empty")
} }
@ -430,7 +468,7 @@ func newEchoResp(replyBuf []byte) (*echoResp, error) {
if dataBufStart < int(echoReplySize) { if dataBufStart < int(echoReplySize) {
return nil, fmt.Errorf("reply buffer size %d is too small to hold data of size %d", len(replyBuf), int(reply.DataSize)) return nil, fmt.Errorf("reply buffer size %d is too small to hold data of size %d", len(replyBuf), int(reply.DataSize))
} }
return &echoResp{ return &echoV4Resp{
reply: &reply, reply: &reply,
data: replyBuf[dataBufStart:], data: replyBuf[dataBufStart:],
}, nil }, nil
@ -502,6 +540,18 @@ type echoV6Resp struct {
data []byte data []byte
} }
func (r *echoV6Resp) status() ipStatus {
return r.reply.Status
}
func (r *echoV6Resp) rtt() uint32 {
return r.reply.RoundTripTime
}
func (r *echoV6Resp) payload() []byte {
return r.data
}
func newEchoV6Resp(replyBuf []byte, dataSize int) (*echoV6Resp, error) { func newEchoV6Resp(replyBuf []byte, dataSize int) (*echoV6Resp, error) {
if len(replyBuf) == 0 { if len(replyBuf) == 0 {
return nil, fmt.Errorf("reply buffer is empty") return nil, fmt.Errorf("reply buffer is empty")

View File

@ -59,7 +59,7 @@ func TestParseEchoReply(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
resp, err := newEchoResp(test.replyBuf) resp, err := newEchoV4Resp(test.replyBuf)
if test.expectedReply == nil { if test.expectedReply == nil {
require.Error(t, err) require.Error(t, err)
require.Nil(t, resp) require.Nil(t, resp)

View File

@ -103,9 +103,6 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
} }
func TestTraceICMPRouterEcho(t *testing.T) { func TestTraceICMPRouterEcho(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("TODO: TUN-6861 Trace ICMP in Windows")
}
tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1" tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1"
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
@ -149,8 +146,12 @@ func TestTraceICMPRouterEcho(t *testing.T) {
Body: echo, Body: echo,
}, },
} }
require.NoError(t, router.Request(ctx, &pk, &responder)) require.NoError(t, router.Request(ctx, &pk, &responder))
// On Windows, request span is returned before reply
if runtime.GOOS != "windows" {
validateEchoFlow(t, muxer, &pk) validateEchoFlow(t, muxer, &pk)
}
// Request span // Request span
receivedPacket := <-muxer.cfdToEdge receivedPacket := <-muxer.cfdToEdge
@ -158,6 +159,11 @@ func TestTraceICMPRouterEcho(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.NotEmpty(t, requestSpan.Spans) require.NotEmpty(t, requestSpan.Spans)
require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity)) require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
if runtime.GOOS == "windows" {
validateEchoFlow(t, muxer, &pk)
}
// Reply span // Reply span
receivedPacket = <-muxer.cfdToEdge receivedPacket = <-muxer.cfdToEdge
replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket) replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)