TUN-6604: Trace icmp echo request on Linux and Darwin

This commit is contained in:
cthuang 2022-10-13 11:01:25 +01:00
parent 495f9fb8bd
commit b6bd8c1f5e
8 changed files with 187 additions and 6 deletions

View File

@ -17,9 +17,11 @@ import (
"time" "time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/net/icmp" "golang.org/x/net/icmp"
"github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/packet"
"github.com/cloudflare/cloudflared/tracing"
) )
type icmpProxy struct { type icmpProxy struct {
@ -129,10 +131,18 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle
} }
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
ctx, span := responder.requestSpan(ctx, pk)
defer responder.exportSpan()
originalEcho, err := getICMPEcho(pk.Message) originalEcho, err := getICMPEcho(pk.Message)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(span, err)
return err return err
} }
span.SetAttributes(
attribute.Int("originalEchoID", originalEcho.ID),
attribute.Int("seq", originalEcho.Seq),
)
echoIDTrackerKey := flow3Tuple{ echoIDTrackerKey := flow3Tuple{
srcIP: pk.Src, srcIP: pk.Src,
dstIP: pk.Dst, dstIP: pk.Dst,
@ -141,8 +151,12 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
// TODO: TUN-6744 assign unique flow per (src, echo ID) // TODO: TUN-6744 assign unique flow per (src, echo ID)
assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey) assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey)
if !success { if !success {
return fmt.Errorf("failed to assign unique echo ID") err := fmt.Errorf("failed to assign unique echo ID")
tracing.EndWithErrorStatus(span, err)
return err
} }
span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID)))
newFunnelFunc := func() (packet.Funnel, error) { newFunnelFunc := func() (packet.Funnel, error) {
originalEcho, err := getICMPEcho(pk.Message) originalEcho, err := getICMPEcho(pk.Message)
if err != nil { if err != nil {
@ -158,9 +172,11 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
funnelID := echoFunnelID(assignedEchoID) funnelID := echoFunnelID(assignedEchoID)
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc) funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(span, err)
return err return err
} }
if isNew { if isNew {
span.SetAttributes(attribute.Bool("newFlow", true))
ip.logger.Debug(). ip.logger.Debug().
Str("src", pk.Src.String()). Str("src", pk.Src.String()).
Str("dst", pk.Dst.String()). Str("dst", pk.Dst.String()).
@ -170,9 +186,16 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
} }
icmpFlow, err := toICMPEchoFlow(funnel) icmpFlow, err := toICMPEchoFlow(funnel)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(span, err)
return err return err
} }
return icmpFlow.sendToDst(pk.Dst, pk.Message) err = icmpFlow.sendToDst(pk.Dst, pk.Message)
if err != nil {
tracing.EndWithErrorStatus(span, err)
return err
}
tracing.End(span)
return nil
} }
// Serve listens for responses to the requests until context is done // Serve listens for responses to the requests until context is done

View File

@ -19,9 +19,11 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/net/icmp" "golang.org/x/net/icmp"
"github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/packet"
"github.com/cloudflare/cloudflared/tracing"
) )
const ( const (
@ -98,14 +100,24 @@ func checkInPingGroup() error {
} }
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
ctx, span := responder.requestSpan(ctx, pk)
defer responder.exportSpan()
originalEcho, err := getICMPEcho(pk.Message) originalEcho, err := getICMPEcho(pk.Message)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(span, err)
return err return err
} }
span.SetAttributes(
attribute.Int("originalEchoID", originalEcho.ID),
attribute.Int("seq", originalEcho.Seq),
)
newConnChan := make(chan *icmp.PacketConn, 1) newConnChan := make(chan *icmp.PacketConn, 1)
newFunnelFunc := func() (packet.Funnel, error) { newFunnelFunc := func() (packet.Funnel, error) {
conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone) conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(span, err)
return nil, errors.Wrap(err, "failed to open ICMP socket") return nil, errors.Wrap(err, "failed to open ICMP socket")
} }
ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr()) ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr())
@ -117,6 +129,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
if !ok { if !ok {
return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr()) return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr())
} }
span.SetAttributes(attribute.Int("port", localUDPAddr.Port))
echoID := localUDPAddr.Port echoID := localUDPAddr.Port
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder()) icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder())
return icmpFlow, nil return icmpFlow, nil
@ -128,13 +142,16 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
} }
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc) funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(span, err)
return err return err
} }
icmpFlow, err := toICMPEchoFlow(funnel) icmpFlow, err := toICMPEchoFlow(funnel)
if err != nil { if err != nil {
tracing.EndWithErrorStatus(span, err)
return err return err
} }
if isNew { if isNew {
span.SetAttributes(attribute.Bool("newFlow", true))
ip.logger.Debug(). ip.logger.Debug().
Str("src", pk.Src.String()). Str("src", pk.Src.String()).
Str("dst", pk.Dst.String()). Str("dst", pk.Dst.String()).
@ -153,8 +170,10 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
}() }()
} }
if err := icmpFlow.sendToDst(pk.Dst, pk.Message); err != nil { if err := icmpFlow.sendToDst(pk.Dst, pk.Message); err != nil {
tracing.EndWithErrorStatus(span, err)
return errors.Wrap(err, "failed to send ICMP echo request") return errors.Wrap(err, "failed to send ICMP echo request")
} }
tracing.End(span)
return nil return nil
} }

View File

@ -1,10 +1,13 @@
package ingress package ingress
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"net" "net"
"net/netip" "net/netip"
"os"
"runtime"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -17,6 +20,8 @@ import (
"golang.org/x/net/ipv6" "golang.org/x/net/ipv6"
"github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/packet"
quicpogs "github.com/cloudflare/cloudflared/quic"
"github.com/cloudflare/cloudflared/tracing"
) )
var ( var (
@ -98,6 +103,75 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
<-proxyDone <-proxyDone
} }
func TestTraceICMPRouterEcho(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("TODO: TUN-6861 Trace ICMP in Windows")
}
tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1"
logger := zerolog.New(os.Stderr)
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &logger)
require.NoError(t, err)
proxyDone := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
router.Serve(ctx)
close(proxyDone)
}()
// Buffer 2 packets, reply and request span
muxer := newMockMuxer(2)
tracingIdentity, err := tracing.NewIdentity(tracingCtx)
require.NoError(t, err)
serializedIdentity, err := tracingIdentity.MarshalBinary()
require.NoError(t, err)
responder := packetResponder{
datagramMuxer: muxer,
tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &logger),
serializedIdentity: serializedIdentity,
}
echo := &icmp.Echo{
ID: 12910,
Seq: 182,
Data: []byte(t.Name()),
}
pk := packet.ICMP{
IP: &packet.IP{
Src: localhostIP,
Dst: localhostIP,
Protocol: layers.IPProtocolICMPv4,
TTL: packet.DefaultTTL,
},
Message: &icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: echo,
},
}
require.NoError(t, router.Request(ctx, &pk, &responder))
validateEchoFlow(t, muxer, &pk)
receivedPacket := <-muxer.cfdToEdge
tracingSpanPacket, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
require.True(t, ok)
require.NotEmpty(t, tracingSpanPacket.Spans)
require.True(t, bytes.Equal(serializedIdentity, tracingSpanPacket.TracingIdentity))
echo.Seq++
pk.Body = echo
// Only first request for a flow is traced. The edge will not send tracing context for the second request
responder = packetResponder{
datagramMuxer: muxer,
}
require.NoError(t, router.Request(ctx, &pk, &responder))
validateEchoFlow(t, muxer, &pk)
cancel()
<-proxyDone
}
// TestConcurrentRequests makes sure icmpRouter can send concurrent requests to the same destination with different // TestConcurrentRequests makes sure icmpRouter can send concurrent requests to the same destination with different
// echo ID. This simulates concurrent ping to the same destination. // echo ID. This simulates concurrent ping to the same destination.
func TestConcurrentRequestsToSameDst(t *testing.T) { func TestConcurrentRequestsToSameDst(t *testing.T) {

View File

@ -6,9 +6,12 @@ import (
"net/netip" "net/netip"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/cloudflare/cloudflared/packet" "github.com/cloudflare/cloudflared/packet"
quicpogs "github.com/cloudflare/cloudflared/quic" quicpogs "github.com/cloudflare/cloudflared/quic"
"github.com/cloudflare/cloudflared/tracing"
) )
// Upstream of raw packets // Upstream of raw packets
@ -70,7 +73,14 @@ func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packe
case quicpogs.DatagramTypeIP: case quicpogs.DatagramTypeIP:
return packet.RawPacket{Data: pk.Payload()}, responder, nil return packet.RawPacket{Data: pk.Payload()}, responder, nil
case quicpogs.DatagramTypeIPWithTrace: case quicpogs.DatagramTypeIPWithTrace:
return packet.RawPacket{}, responder, fmt.Errorf("TODO: TUN-6604 Handle IP packet with trace") var identity tracing.Identity
if err := identity.UnmarshalBinary(pk.Metadata()); err != nil {
r.logger.Err(err).Bytes("tracingIdentity", pk.Metadata()).Msg("Failed to unmarshal tracing identity")
} else {
responder.tracedCtx = tracing.NewTracedContext(ctx, identity.String(), r.logger)
responder.serializedIdentity = pk.Metadata()
}
return packet.RawPacket{Data: pk.Payload()}, responder, nil
default: default:
return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type()) return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type())
} }
@ -127,8 +137,38 @@ func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, ra
type packetResponder struct { type packetResponder struct {
datagramMuxer muxer datagramMuxer muxer
tracedCtx *tracing.TracedContext
serializedIdentity []byte
}
func (pr *packetResponder) tracingEnabled() bool {
return pr.tracedCtx != nil
} }
func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error { func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error {
return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket)) return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket))
} }
func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (context.Context, trace.Span) {
if !pr.tracingEnabled() {
return ctx, tracing.NewNoopSpan()
}
return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-request", trace.WithAttributes(
attribute.String("src", pk.Src.String()),
attribute.String("dst", pk.Dst.String()),
))
}
func (pr *packetResponder) exportSpan() {
if !pr.tracingEnabled() {
return
}
spans := pr.tracedCtx.GetProtoSpans()
pr.tracedCtx.ClearSpans()
if len(spans) > 0 {
pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{
Spans: spans,
TracingIdentity: pr.serializedIdentity,
})
}
}

View File

@ -200,6 +200,11 @@ func (mm *mockMuxer) SendPacket(pk quicpogs.Packet) error {
}, },
TracingIdentity: copiedMetadata, TracingIdentity: copiedMetadata,
} }
case quicpogs.DatagramTypeTracingSpan:
copiedPacket = &quicpogs.TracingSpanPacket{
Spans: copiedPayload,
TracingIdentity: copiedMetadata,
}
default: default:
return fmt.Errorf("unexpected metadata type %d", pk.Type()) return fmt.Errorf("unexpected metadata type %d", pk.Type())
} }

View File

@ -16,7 +16,9 @@ var (
// Funnel is an abstraction to pipe from 1 src to 1 or more destinations // Funnel is an abstraction to pipe from 1 src to 1 or more destinations
type Funnel interface { type Funnel interface {
// LastActive returns the last time SendToDst or ReturnToSrc is called // Updates the last time traffic went through this funnel
UpdateLastActive()
// LastActive returns the last time there is traffic through this funnel
LastActive() time.Time LastActive() time.Time
// Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error // Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error
Close() error Close() error

View File

@ -27,6 +27,8 @@ type InMemoryClient interface {
// ProtoSpans returns a copy of the list of in-memory stored spans as otlp // ProtoSpans returns a copy of the list of in-memory stored spans as otlp
// protobuf byte array. // protobuf byte array.
ProtoSpans() ([]byte, error) ProtoSpans() ([]byte, error)
// Clear spans removes all in-memory spans
ClearSpans()
} }
// InMemoryOtlpClient is a client implementation for otlptrace.Client // InMemoryOtlpClient is a client implementation for otlptrace.Client
@ -78,6 +80,12 @@ func (mc *InMemoryOtlpClient) ProtoSpans() ([]byte, error) {
return proto.Marshal(pbRequest) return proto.Marshal(pbRequest)
} }
func (mc *InMemoryOtlpClient) ClearSpans() {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.spans = make([]*tracepb.ResourceSpans, 0)
}
// NoopOtlpClient is a client implementation for otlptrace.Client that does nothing // NoopOtlpClient is a client implementation for otlptrace.Client that does nothing
type NoopOtlpClient struct{} type NoopOtlpClient struct{}
@ -102,3 +110,5 @@ func (mc *NoopOtlpClient) Spans() (string, error) {
func (mc *NoopOtlpClient) ProtoSpans() ([]byte, error) { func (mc *NoopOtlpClient) ProtoSpans() ([]byte, error) {
return nil, errNoopTracer return nil, errNoopTracer
} }
func (mc *NoopOtlpClient) ClearSpans() {}

View File

@ -189,6 +189,10 @@ func (cft *cfdTracer) AddSpans(headers http.Header) {
headers[CanonicalCloudflaredTracingHeader] = []string{enc} headers[CanonicalCloudflaredTracingHeader] = []string{enc}
} }
func (cft *cfdTracer) ClearSpans() {
cft.exporter.ClearSpans()
}
// End will set the OK status for the span and then end it. // End will set the OK status for the span and then end it.
func End(span trace.Span) { func End(span trace.Span) {
endSpan(span, -1, codes.Ok, nil) endSpan(span, -1, codes.Ok, nil)
@ -246,7 +250,6 @@ func extractTraceFromString(ctx context.Context, trace string) (context.Context,
parts[0] = strings.Repeat("0", left) + parts[0] parts[0] = strings.Repeat("0", left) + parts[0]
trace = strings.Join(parts, separator) trace = strings.Join(parts, separator)
} }
// Override the 'cf-trace-id' as 'uber-trace-id' so the jaeger propagator can extract it. // Override the 'cf-trace-id' as 'uber-trace-id' so the jaeger propagator can extract it.
traceHeader := map[string]string{TracerContextNameOverride: trace} traceHeader := map[string]string{TracerContextNameOverride: trace}
remoteCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(traceHeader)) remoteCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(traceHeader))
@ -277,6 +280,11 @@ func extractTrace(req *http.Request) (context.Context, bool) {
if traceHeader[TracerContextNameOverride] == "" { if traceHeader[TracerContextNameOverride] == "" {
return nil, false return nil, false
} }
remoteCtx := otel.GetTextMapPropagator().Extract(req.Context(), propagation.MapCarrier(traceHeader)) remoteCtx := otel.GetTextMapPropagator().Extract(req.Context(), propagation.MapCarrier(traceHeader))
return remoteCtx, true return remoteCtx, true
} }
func NewNoopSpan() trace.Span {
return trace.SpanFromContext(nil)
}