diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 4e315f15..8c74598f 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -131,7 +131,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle } func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { - _, span := responder.requestSpan(ctx, pk) + ctx, span := responder.requestSpan(ctx, pk) defer responder.exportSpan() originalEcho, err := getICMPEcho(pk.Message) @@ -139,8 +139,10 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } - observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq) - + span.SetAttributes( + attribute.Int("originalEchoID", originalEcho.ID), + attribute.Int("seq", originalEcho.Seq), + ) echoIDTrackerKey := flow3Tuple{ srcIP: pk.Src, dstIP: pk.Dst, @@ -187,7 +189,6 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } - err = icmpFlow.sendToDst(pk.Dst, pk.Message) if err != nil { tracing.EndWithErrorStatus(span, err) @@ -268,12 +269,15 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error { _, span := icmpFlow.responder.replySpan(ctx, ip.logger) defer icmpFlow.responder.exportSpan() + span.SetAttributes( + attribute.String("dst", reply.from.String()), + attribute.Int("echoID", reply.echo.ID), + attribute.Int("seq", reply.echo.Seq), + attribute.Int("originalEchoID", icmpFlow.originalEchoID), + ) if err := icmpFlow.returnToSrc(reply); err != nil { tracing.EndWithErrorStatus(span, err) - return err } - observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq) - span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID)) tracing.End(span) return nil } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index e4d471aa..321b5c3f 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -107,7 +107,10 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } - observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq) + span.SetAttributes( + attribute.Int("originalEchoID", originalEcho.ID), + attribute.Int("seq", originalEcho.Seq), + ) shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID) newFunnelFunc := func() (packet.Funnel, error) { @@ -196,10 +199,6 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf n, from, err := flow.originConn.ReadFrom(buf) if err != nil { - if flow.IsClosed() { - tracing.EndWithErrorStatus(span, fmt.Errorf("flow was closed")) - return false, nil - } tracing.EndWithErrorStatus(span, err) return false, err } @@ -215,14 +214,16 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf tracing.EndWithErrorStatus(span, err) return true, err } - + span.SetAttributes( + attribute.String("dst", reply.from.String()), + attribute.Int("echoID", reply.echo.ID), + attribute.Int("seq", reply.echo.Seq), + ) if err := flow.returnToSrc(reply); err != nil { - ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") + ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") tracing.EndWithErrorStatus(span, err) return true, err } - - observeICMPReply(ip.logger, span, from.String(), reply.echo.ID, reply.echo.Seq) tracing.End(span) return true, nil } diff --git a/ingress/icmp_posix.go b/ingress/icmp_posix.go index b03be49e..504df60a 100644 --- a/ingress/icmp_posix.go +++ b/ingress/icmp_posix.go @@ -8,7 +8,6 @@ import ( "fmt" "net" "net/netip" - "sync/atomic" "github.com/google/gopacket/layers" "github.com/rs/zerolog" @@ -47,7 +46,6 @@ type flow3Tuple struct { type icmpEchoFlow struct { *packet.ActivityTracker closeCallback func() error - closed *atomic.Bool src netip.Addr originConn *icmp.PacketConn responder *packetResponder @@ -61,7 +59,6 @@ func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icm return &icmpEchoFlow{ ActivityTracker: packet.NewActivityTracker(), closeCallback: closeCallback, - closed: &atomic.Bool{}, src: src, originConn: originConn, responder: responder, @@ -89,14 +86,9 @@ func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool { } func (ief *icmpEchoFlow) Close() error { - ief.closed.Store(true) return ief.closeCallback() } -func (ief *icmpEchoFlow) IsClosed() bool { - return ief.closed.Load() -} - // sendToDst rewrites the echo ID to the one assigned to this flow func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error { ief.UpdateLastActive() diff --git a/ingress/icmp_windows.go b/ingress/icmp_windows.go index 19604ee4..816ed383 100644 --- a/ingress/icmp_windows.go +++ b/ingress/icmp_windows.go @@ -281,7 +281,10 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa if err != nil { return err } - observeICMPRequest(ip.logger, requestSpan, pk.Src.String(), pk.Dst.String(), echo.ID, echo.Seq) + requestSpan.SetAttributes( + attribute.Int("originalEchoID", echo.ID), + attribute.Int("seq", echo.Seq), + ) resp, err := ip.icmpEchoRoundtrip(pk.Dst, echo) if err != nil { @@ -293,17 +296,17 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa responder.exportSpan() _, replySpan := responder.replySpan(ctx, ip.logger) - err = ip.handleEchoReply(pk, echo, resp, responder) - if err != nil { - ip.logger.Err(err).Msg("Failed to send ICMP reply") - tracing.EndWithErrorStatus(replySpan, err) - return errors.Wrap(err, "failed to handle ICMP echo reply") - } - observeICMPReply(ip.logger, replySpan, pk.Dst.String(), echo.ID, echo.Seq) replySpan.SetAttributes( + attribute.Int("originalEchoID", echo.ID), + attribute.Int("seq", echo.Seq), attribute.Int64("rtt", int64(resp.rtt())), attribute.String("status", resp.status().String()), ) + err = ip.handleEchoReply(pk, echo, resp, responder) + if err != nil { + tracing.EndWithErrorStatus(replySpan, err) + return errors.Wrap(err, "failed to handle ICMP echo reply") + } tracing.End(replySpan) return nil } diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index 91d18430..466608b3 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -7,8 +7,6 @@ import ( "time" "github.com/rs/zerolog" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" @@ -104,25 +102,3 @@ func getICMPEcho(msg *icmp.Message) (*icmp.Echo, error) { func isEchoReply(msg *icmp.Message) bool { return msg.Type == ipv4.ICMPTypeEchoReply || msg.Type == ipv6.ICMPTypeEchoReply } - -func observeICMPRequest(logger *zerolog.Logger, span trace.Span, src string, dst string, echoID int, seq int) { - logger.Debug(). - Str("src", src). - Str("dst", dst). - Int("originalEchoID", echoID). - Int("originalEchoSeq", seq). - Msg("Received ICMP request") - span.SetAttributes( - attribute.Int("originalEchoID", echoID), - attribute.Int("seq", seq), - ) -} - -func observeICMPReply(logger *zerolog.Logger, span trace.Span, dst string, echoID int, seq int) { - logger.Debug().Str("dst", dst).Int("echoID", echoID).Int("seq", seq).Msg("Sent ICMP reply to edge") - span.SetAttributes( - attribute.String("dst", dst), - attribute.Int("echoID", echoID), - attribute.Int("seq", seq), - ) -}