From e6537418859afcac29e56a39daa08bcabc09e048 Mon Sep 17 00:00:00 2001 From: chungthuang Date: Mon, 15 Jan 2024 16:49:17 +0000 Subject: [PATCH] TUN-8158: Add logging to confirm when ICMP reply is returned to the edge --- ingress/icmp_darwin.go | 18 +++++++----------- ingress/icmp_linux.go | 19 +++++++++---------- ingress/icmp_posix.go | 8 ++++++++ ingress/icmp_windows.go | 17 +++++++---------- ingress/origin_icmp_proxy.go | 24 ++++++++++++++++++++++++ 5 files changed, 55 insertions(+), 31 deletions(-) diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 8c74598f..4e315f15 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -131,7 +131,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle } func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { - ctx, span := responder.requestSpan(ctx, pk) + _, span := responder.requestSpan(ctx, pk) defer responder.exportSpan() originalEcho, err := getICMPEcho(pk.Message) @@ -139,10 +139,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } - span.SetAttributes( - attribute.Int("originalEchoID", originalEcho.ID), - attribute.Int("seq", originalEcho.Seq), - ) + observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq) + echoIDTrackerKey := flow3Tuple{ srcIP: pk.Src, dstIP: pk.Dst, @@ -189,6 +187,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } + err = icmpFlow.sendToDst(pk.Dst, pk.Message) if err != nil { tracing.EndWithErrorStatus(span, err) @@ -269,15 +268,12 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error { _, span := icmpFlow.responder.replySpan(ctx, ip.logger) defer icmpFlow.responder.exportSpan() - span.SetAttributes( - attribute.String("dst", reply.from.String()), - attribute.Int("echoID", reply.echo.ID), - attribute.Int("seq", reply.echo.Seq), - attribute.Int("originalEchoID", icmpFlow.originalEchoID), - ) if err := icmpFlow.returnToSrc(reply); err != nil { tracing.EndWithErrorStatus(span, err) + return err } + observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq) + span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID)) tracing.End(span) return nil } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index 321b5c3f..e4d471aa 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -107,10 +107,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } - span.SetAttributes( - attribute.Int("originalEchoID", originalEcho.ID), - attribute.Int("seq", originalEcho.Seq), - ) + observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq) shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID) newFunnelFunc := func() (packet.Funnel, error) { @@ -199,6 +196,10 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf n, from, err := flow.originConn.ReadFrom(buf) if err != nil { + if flow.IsClosed() { + tracing.EndWithErrorStatus(span, fmt.Errorf("flow was closed")) + return false, nil + } tracing.EndWithErrorStatus(span, err) return false, err } @@ -214,16 +215,14 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf tracing.EndWithErrorStatus(span, err) return true, err } - span.SetAttributes( - attribute.String("dst", reply.from.String()), - attribute.Int("echoID", reply.echo.ID), - attribute.Int("seq", reply.echo.Seq), - ) + if err := flow.returnToSrc(reply); err != nil { - ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") + ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") tracing.EndWithErrorStatus(span, err) return true, err } + + observeICMPReply(ip.logger, span, from.String(), reply.echo.ID, reply.echo.Seq) tracing.End(span) return true, nil } diff --git a/ingress/icmp_posix.go b/ingress/icmp_posix.go index 504df60a..b03be49e 100644 --- a/ingress/icmp_posix.go +++ b/ingress/icmp_posix.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "net/netip" + "sync/atomic" "github.com/google/gopacket/layers" "github.com/rs/zerolog" @@ -46,6 +47,7 @@ type flow3Tuple struct { type icmpEchoFlow struct { *packet.ActivityTracker closeCallback func() error + closed *atomic.Bool src netip.Addr originConn *icmp.PacketConn responder *packetResponder @@ -59,6 +61,7 @@ func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icm return &icmpEchoFlow{ ActivityTracker: packet.NewActivityTracker(), closeCallback: closeCallback, + closed: &atomic.Bool{}, src: src, originConn: originConn, responder: responder, @@ -86,9 +89,14 @@ func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool { } func (ief *icmpEchoFlow) Close() error { + ief.closed.Store(true) return ief.closeCallback() } +func (ief *icmpEchoFlow) IsClosed() bool { + return ief.closed.Load() +} + // sendToDst rewrites the echo ID to the one assigned to this flow func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error { ief.UpdateLastActive() diff --git a/ingress/icmp_windows.go b/ingress/icmp_windows.go index 816ed383..19604ee4 100644 --- a/ingress/icmp_windows.go +++ b/ingress/icmp_windows.go @@ -281,10 +281,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa if err != nil { return err } - requestSpan.SetAttributes( - attribute.Int("originalEchoID", echo.ID), - attribute.Int("seq", echo.Seq), - ) + observeICMPRequest(ip.logger, requestSpan, pk.Src.String(), pk.Dst.String(), echo.ID, echo.Seq) resp, err := ip.icmpEchoRoundtrip(pk.Dst, echo) if err != nil { @@ -296,17 +293,17 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa responder.exportSpan() _, replySpan := responder.replySpan(ctx, ip.logger) - replySpan.SetAttributes( - attribute.Int("originalEchoID", echo.ID), - attribute.Int("seq", echo.Seq), - attribute.Int64("rtt", int64(resp.rtt())), - attribute.String("status", resp.status().String()), - ) err = ip.handleEchoReply(pk, echo, resp, responder) if err != nil { + ip.logger.Err(err).Msg("Failed to send ICMP reply") tracing.EndWithErrorStatus(replySpan, err) return errors.Wrap(err, "failed to handle ICMP echo reply") } + observeICMPReply(ip.logger, replySpan, pk.Dst.String(), echo.ID, echo.Seq) + replySpan.SetAttributes( + attribute.Int64("rtt", int64(resp.rtt())), + attribute.String("status", resp.status().String()), + ) tracing.End(replySpan) return nil } diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index 466608b3..91d18430 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -7,6 +7,8 @@ import ( "time" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" @@ -102,3 +104,25 @@ func getICMPEcho(msg *icmp.Message) (*icmp.Echo, error) { func isEchoReply(msg *icmp.Message) bool { return msg.Type == ipv4.ICMPTypeEchoReply || msg.Type == ipv6.ICMPTypeEchoReply } + +func observeICMPRequest(logger *zerolog.Logger, span trace.Span, src string, dst string, echoID int, seq int) { + logger.Debug(). + Str("src", src). + Str("dst", dst). + Int("originalEchoID", echoID). + Int("originalEchoSeq", seq). + Msg("Received ICMP request") + span.SetAttributes( + attribute.Int("originalEchoID", echoID), + attribute.Int("seq", seq), + ) +} + +func observeICMPReply(logger *zerolog.Logger, span trace.Span, dst string, echoID int, seq int) { + logger.Debug().Str("dst", dst).Int("echoID", echoID).Int("seq", seq).Msg("Sent ICMP reply to edge") + span.SetAttributes( + attribute.String("dst", dst), + attribute.Int("echoID", echoID), + attribute.Int("seq", seq), + ) +}