diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index caa67bb4..721aeb4f 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -17,9 +17,11 @@ import ( "time" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" "golang.org/x/net/icmp" "github.com/cloudflare/cloudflared/packet" + "github.com/cloudflare/cloudflared/tracing" ) type icmpProxy struct { @@ -129,10 +131,18 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle } func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { + ctx, span := responder.requestSpan(ctx, pk) + defer responder.exportSpan() + originalEcho, err := getICMPEcho(pk.Message) if err != nil { + tracing.EndWithErrorStatus(span, err) return err } + span.SetAttributes( + attribute.Int("originalEchoID", originalEcho.ID), + attribute.Int("seq", originalEcho.Seq), + ) echoIDTrackerKey := flow3Tuple{ srcIP: pk.Src, dstIP: pk.Dst, @@ -141,8 +151,12 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa // TODO: TUN-6744 assign unique flow per (src, echo ID) assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey) if !success { - return fmt.Errorf("failed to assign unique echo ID") + err := fmt.Errorf("failed to assign unique echo ID") + tracing.EndWithErrorStatus(span, err) + return err } + span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID))) + newFunnelFunc := func() (packet.Funnel, error) { originalEcho, err := getICMPEcho(pk.Message) if err != nil { @@ -158,9 +172,11 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa funnelID := echoFunnelID(assignedEchoID) funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc) if err != nil { + tracing.EndWithErrorStatus(span, err) return err } if isNew { + span.SetAttributes(attribute.Bool("newFlow", true)) ip.logger.Debug(). Str("src", pk.Src.String()). Str("dst", pk.Dst.String()). @@ -170,9 +186,16 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa } icmpFlow, err := toICMPEchoFlow(funnel) if err != nil { + tracing.EndWithErrorStatus(span, err) return err } - return icmpFlow.sendToDst(pk.Dst, pk.Message) + err = icmpFlow.sendToDst(pk.Dst, pk.Message) + if err != nil { + tracing.EndWithErrorStatus(span, err) + return err + } + tracing.End(span) + return nil } // Serve listens for responses to the requests until context is done diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index a1936ad3..8e6d546a 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -19,9 +19,11 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" "golang.org/x/net/icmp" "github.com/cloudflare/cloudflared/packet" + "github.com/cloudflare/cloudflared/tracing" ) const ( @@ -98,14 +100,24 @@ func checkInPingGroup() error { } func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { + ctx, span := responder.requestSpan(ctx, pk) + defer responder.exportSpan() + originalEcho, err := getICMPEcho(pk.Message) if err != nil { + tracing.EndWithErrorStatus(span, err) return err } + span.SetAttributes( + attribute.Int("originalEchoID", originalEcho.ID), + attribute.Int("seq", originalEcho.Seq), + ) + newConnChan := make(chan *icmp.PacketConn, 1) newFunnelFunc := func() (packet.Funnel, error) { conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone) if err != nil { + tracing.EndWithErrorStatus(span, err) return nil, errors.Wrap(err, "failed to open ICMP socket") } ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr()) @@ -117,6 +129,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa if !ok { return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr()) } + span.SetAttributes(attribute.Int("port", localUDPAddr.Port)) + echoID := localUDPAddr.Port icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder()) return icmpFlow, nil @@ -128,13 +142,16 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa } funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc) if err != nil { + tracing.EndWithErrorStatus(span, err) return err } icmpFlow, err := toICMPEchoFlow(funnel) if err != nil { + tracing.EndWithErrorStatus(span, err) return err } if isNew { + span.SetAttributes(attribute.Bool("newFlow", true)) ip.logger.Debug(). Str("src", pk.Src.String()). Str("dst", pk.Dst.String()). @@ -153,8 +170,10 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa }() } if err := icmpFlow.sendToDst(pk.Dst, pk.Message); err != nil { + tracing.EndWithErrorStatus(span, err) return errors.Wrap(err, "failed to send ICMP echo request") } + tracing.End(span) return nil } diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index a6370e7c..c36aeab7 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -1,10 +1,13 @@ package ingress import ( + "bytes" "context" "fmt" "net" "net/netip" + "os" + "runtime" "strings" "sync" "testing" @@ -17,6 +20,8 @@ import ( "golang.org/x/net/ipv6" "github.com/cloudflare/cloudflared/packet" + quicpogs "github.com/cloudflare/cloudflared/quic" + "github.com/cloudflare/cloudflared/tracing" ) var ( @@ -98,6 +103,75 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { <-proxyDone } +func TestTraceICMPRouterEcho(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("TODO: TUN-6861 Trace ICMP in Windows") + } + tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1" + + logger := zerolog.New(os.Stderr) + router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &logger) + require.NoError(t, err) + + proxyDone := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + router.Serve(ctx) + close(proxyDone) + }() + + // Buffer 2 packets, reply and request span + muxer := newMockMuxer(2) + tracingIdentity, err := tracing.NewIdentity(tracingCtx) + require.NoError(t, err) + serializedIdentity, err := tracingIdentity.MarshalBinary() + require.NoError(t, err) + + responder := packetResponder{ + datagramMuxer: muxer, + tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &logger), + serializedIdentity: serializedIdentity, + } + + echo := &icmp.Echo{ + ID: 12910, + Seq: 182, + Data: []byte(t.Name()), + } + pk := packet.ICMP{ + IP: &packet.IP{ + Src: localhostIP, + Dst: localhostIP, + Protocol: layers.IPProtocolICMPv4, + TTL: packet.DefaultTTL, + }, + Message: &icmp.Message{ + Type: ipv4.ICMPTypeEcho, + Code: 0, + Body: echo, + }, + } + require.NoError(t, router.Request(ctx, &pk, &responder)) + validateEchoFlow(t, muxer, &pk) + receivedPacket := <-muxer.cfdToEdge + tracingSpanPacket, ok := receivedPacket.(*quicpogs.TracingSpanPacket) + require.True(t, ok) + require.NotEmpty(t, tracingSpanPacket.Spans) + require.True(t, bytes.Equal(serializedIdentity, tracingSpanPacket.TracingIdentity)) + + echo.Seq++ + pk.Body = echo + // Only first request for a flow is traced. The edge will not send tracing context for the second request + responder = packetResponder{ + datagramMuxer: muxer, + } + require.NoError(t, router.Request(ctx, &pk, &responder)) + validateEchoFlow(t, muxer, &pk) + + cancel() + <-proxyDone +} + // TestConcurrentRequests makes sure icmpRouter can send concurrent requests to the same destination with different // echo ID. This simulates concurrent ping to the same destination. func TestConcurrentRequestsToSameDst(t *testing.T) { diff --git a/ingress/packet_router.go b/ingress/packet_router.go index 88777869..d4919e2c 100644 --- a/ingress/packet_router.go +++ b/ingress/packet_router.go @@ -6,9 +6,12 @@ import ( "net/netip" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/cloudflare/cloudflared/packet" quicpogs "github.com/cloudflare/cloudflared/quic" + "github.com/cloudflare/cloudflared/tracing" ) // Upstream of raw packets @@ -70,7 +73,14 @@ func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packe case quicpogs.DatagramTypeIP: return packet.RawPacket{Data: pk.Payload()}, responder, nil case quicpogs.DatagramTypeIPWithTrace: - return packet.RawPacket{}, responder, fmt.Errorf("TODO: TUN-6604 Handle IP packet with trace") + var identity tracing.Identity + if err := identity.UnmarshalBinary(pk.Metadata()); err != nil { + r.logger.Err(err).Bytes("tracingIdentity", pk.Metadata()).Msg("Failed to unmarshal tracing identity") + } else { + responder.tracedCtx = tracing.NewTracedContext(ctx, identity.String(), r.logger) + responder.serializedIdentity = pk.Metadata() + } + return packet.RawPacket{Data: pk.Payload()}, responder, nil default: return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type()) } @@ -126,9 +136,39 @@ func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, ra } type packetResponder struct { - datagramMuxer muxer + datagramMuxer muxer + tracedCtx *tracing.TracedContext + serializedIdentity []byte +} + +func (pr *packetResponder) tracingEnabled() bool { + return pr.tracedCtx != nil } func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error { return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket)) } + +func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (context.Context, trace.Span) { + if !pr.tracingEnabled() { + return ctx, tracing.NewNoopSpan() + } + return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-request", trace.WithAttributes( + attribute.String("src", pk.Src.String()), + attribute.String("dst", pk.Dst.String()), + )) +} + +func (pr *packetResponder) exportSpan() { + if !pr.tracingEnabled() { + return + } + spans := pr.tracedCtx.GetProtoSpans() + pr.tracedCtx.ClearSpans() + if len(spans) > 0 { + pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{ + Spans: spans, + TracingIdentity: pr.serializedIdentity, + }) + } +} diff --git a/ingress/packet_router_test.go b/ingress/packet_router_test.go index 7738d294..ca618c50 100644 --- a/ingress/packet_router_test.go +++ b/ingress/packet_router_test.go @@ -200,6 +200,11 @@ func (mm *mockMuxer) SendPacket(pk quicpogs.Packet) error { }, TracingIdentity: copiedMetadata, } + case quicpogs.DatagramTypeTracingSpan: + copiedPacket = &quicpogs.TracingSpanPacket{ + Spans: copiedPayload, + TracingIdentity: copiedMetadata, + } default: return fmt.Errorf("unexpected metadata type %d", pk.Type()) } diff --git a/packet/funnel.go b/packet/funnel.go index 547161bc..3b5cfeb6 100644 --- a/packet/funnel.go +++ b/packet/funnel.go @@ -16,7 +16,9 @@ var ( // Funnel is an abstraction to pipe from 1 src to 1 or more destinations type Funnel interface { - // LastActive returns the last time SendToDst or ReturnToSrc is called + // Updates the last time traffic went through this funnel + UpdateLastActive() + // LastActive returns the last time there is traffic through this funnel LastActive() time.Time // Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error Close() error diff --git a/tracing/client.go b/tracing/client.go index 7cfb1a43..81d31f05 100644 --- a/tracing/client.go +++ b/tracing/client.go @@ -27,6 +27,8 @@ type InMemoryClient interface { // ProtoSpans returns a copy of the list of in-memory stored spans as otlp // protobuf byte array. ProtoSpans() ([]byte, error) + // Clear spans removes all in-memory spans + ClearSpans() } // InMemoryOtlpClient is a client implementation for otlptrace.Client @@ -78,6 +80,12 @@ func (mc *InMemoryOtlpClient) ProtoSpans() ([]byte, error) { return proto.Marshal(pbRequest) } +func (mc *InMemoryOtlpClient) ClearSpans() { + mc.mu.Lock() + defer mc.mu.Unlock() + mc.spans = make([]*tracepb.ResourceSpans, 0) +} + // NoopOtlpClient is a client implementation for otlptrace.Client that does nothing type NoopOtlpClient struct{} @@ -102,3 +110,5 @@ func (mc *NoopOtlpClient) Spans() (string, error) { func (mc *NoopOtlpClient) ProtoSpans() ([]byte, error) { return nil, errNoopTracer } + +func (mc *NoopOtlpClient) ClearSpans() {} diff --git a/tracing/tracing.go b/tracing/tracing.go index 54625fc8..6cd59134 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -189,6 +189,10 @@ func (cft *cfdTracer) AddSpans(headers http.Header) { headers[CanonicalCloudflaredTracingHeader] = []string{enc} } +func (cft *cfdTracer) ClearSpans() { + cft.exporter.ClearSpans() +} + // End will set the OK status for the span and then end it. func End(span trace.Span) { endSpan(span, -1, codes.Ok, nil) @@ -246,7 +250,6 @@ func extractTraceFromString(ctx context.Context, trace string) (context.Context, parts[0] = strings.Repeat("0", left) + parts[0] trace = strings.Join(parts, separator) } - // Override the 'cf-trace-id' as 'uber-trace-id' so the jaeger propagator can extract it. traceHeader := map[string]string{TracerContextNameOverride: trace} remoteCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(traceHeader)) @@ -277,6 +280,11 @@ func extractTrace(req *http.Request) (context.Context, bool) { if traceHeader[TracerContextNameOverride] == "" { return nil, false } + remoteCtx := otel.GetTextMapPropagator().Extract(req.Context(), propagation.MapCarrier(traceHeader)) return remoteCtx, true } + +func NewNoopSpan() trace.Span { + return trace.SpanFromContext(nil) +}