From f5f3e6a4530a9597b9d742b35a404d0caefc93da Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Thu, 8 Sep 2022 21:42:11 -0700 Subject: [PATCH] TUN-6689: Utilize new RegisterUDPSession to begin tracing --- connection/quic.go | 28 ++++++++++++++++++++++++---- connection/quic_test.go | 7 ++++--- quic/quic_protocol_test.go | 14 +++++++------- tracing/client.go | 31 ++++++++++++++++++++++--------- tracing/tracing.go | 25 ++++++++++++++++++++++++- tunnelrpc/pogs/sessionrpc.go | 15 ++++++++++----- 6 files changed, 91 insertions(+), 29 deletions(-) diff --git a/connection/quic.go b/connection/quic.go index a696192c..640c7be2 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -17,6 +17,8 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "github.com/cloudflare/cloudflared/datagramsession" @@ -259,24 +261,42 @@ func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) er } // RegisterUdpSession is the RPC method invoked by edge to register and run a session -func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) error { +func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*tunnelpogs.RegisterUdpSessionResponse, error) { + traceCtx := tracing.NewTracedContext(ctx, traceContext, q.logger) + ctx, registerSpan := traceCtx.Tracer().Start(traceCtx, "register-session", trace.WithAttributes( + attribute.String("session-id", sessionID.String()), + attribute.String("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)), + )) // Each session is a series of datagram from an eyeball to a dstIP:dstPort. // (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket. originProxy, err := ingress.DialUDP(dstIP, dstPort) if err != nil { q.logger.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort) - return err + tracing.EndWithErrorStatus(registerSpan, err) + return nil, err } + registerSpan.SetAttributes( + attribute.Bool("socket-bind-success", true), + attribute.String("src", originProxy.LocalAddr().String()), + ) + session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy) if err != nil { q.logger.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session") - return err + tracing.EndWithErrorStatus(registerSpan, err) + return nil, err } go q.serveUDPSession(session, closeAfterIdleHint) q.logger.Debug().Str("sessionID", sessionID.String()).Str("src", originProxy.LocalAddr().String()).Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).Msgf("Registered session") - return nil + tracing.End(registerSpan) + + resp := tunnelpogs.RegisterUdpSessionResponse{ + Spans: traceCtx.GetProtoSpans(), + } + + return &resp, nil } func (q *QUICConnection) serveUDPSession(session *datagramsession.Session, closeAfterIdleHint time.Duration) { diff --git a/connection/quic_test.go b/connection/quic_test.go index 26b71e48..8904eeeb 100644 --- a/connection/quic_test.go +++ b/connection/quic_test.go @@ -26,6 +26,7 @@ import ( "github.com/cloudflare/cloudflared/datagramsession" quicpogs "github.com/cloudflare/cloudflared/quic" "github.com/cloudflare/cloudflared/tracing" + "github.com/cloudflare/cloudflared/tunnelrpc/pogs" tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -554,7 +555,7 @@ func TestNopCloserReadWriterCloseAfterEOF(t *testing.T) { require.Equal(t, n, 9) // force another read to read eof - n, err = readerWriter.Read(buffer) + _, err = readerWriter.Read(buffer) require.Equal(t, err, io.EOF) // close @@ -652,8 +653,8 @@ type mockSessionRPCServer struct { calledUnregisterChan chan struct{} } -func (s mockSessionRPCServer) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) error { - return fmt.Errorf("mockSessionRPCServer doesn't implement RegisterUdpSession") +func (s mockSessionRPCServer) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) { + return nil, fmt.Errorf("mockSessionRPCServer doesn't implement RegisterUdpSession") } func (s mockSessionRPCServer) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, reason string) error { diff --git a/quic/quic_protocol_test.go b/quic/quic_protocol_test.go index 9693a808..64943298 100644 --- a/quic/quic_protocol_test.go +++ b/quic/quic_protocol_test.go @@ -230,23 +230,23 @@ type mockSessionRPCServer struct { traceContext string } -func (s mockSessionRPCServer) RegisterUdpSession(_ context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) error { +func (s mockSessionRPCServer) RegisterUdpSession(_ context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) (*tunnelpogs.RegisterUdpSessionResponse, error) { if s.sessionID != sessionID { - return fmt.Errorf("expect session ID %s, got %s", s.sessionID, sessionID) + return nil, fmt.Errorf("expect session ID %s, got %s", s.sessionID, sessionID) } if !s.dstIP.Equal(dstIP) { - return fmt.Errorf("expect destination IP %s, got %s", s.dstIP, dstIP) + return nil, fmt.Errorf("expect destination IP %s, got %s", s.dstIP, dstIP) } if s.dstPort != dstPort { - return fmt.Errorf("expect destination port %d, got %d", s.dstPort, dstPort) + return nil, fmt.Errorf("expect destination port %d, got %d", s.dstPort, dstPort) } if s.closeIdleAfter != closeIdleAfter { - return fmt.Errorf("expect closeIdleAfter %d, got %d", s.closeIdleAfter, closeIdleAfter) + return nil, fmt.Errorf("expect closeIdleAfter %d, got %d", s.closeIdleAfter, closeIdleAfter) } if s.traceContext != traceContext { - return fmt.Errorf("expect traceContext %s, got %s", s.traceContext, traceContext) + return nil, fmt.Errorf("expect traceContext %s, got %s", s.traceContext, traceContext) } - return nil + return &tunnelpogs.RegisterUdpSessionResponse{}, nil } func (s mockSessionRPCServer) UnregisterUdpSession(_ context.Context, sessionID uuid.UUID, message string) error { diff --git a/tracing/client.go b/tracing/client.go index ffc477e1..7cfb1a43 100644 --- a/tracing/client.go +++ b/tracing/client.go @@ -24,6 +24,9 @@ type InMemoryClient interface { // Spans returns a copy of the list of in-memory stored spans as a base64 // encoded otlp protobuf string. Spans() (string, error) + // ProtoSpans returns a copy of the list of in-memory stored spans as otlp + // protobuf byte array. + ProtoSpans() ([]byte, error) } // InMemoryOtlpClient is a client implementation for otlptrace.Client @@ -55,21 +58,26 @@ func (mc *InMemoryOtlpClient) UploadTraces(_ context.Context, protoSpans []*trac // Spans returns the list of in-memory stored spans as a base64 encoded otlp protobuf string. func (mc *InMemoryOtlpClient) Spans() (string, error) { - mc.mu.Lock() - defer mc.mu.Unlock() - if len(mc.spans) <= 0 { - return "", errNoTraces - } - pbRequest := &coltracepb.ExportTraceServiceRequest{ - ResourceSpans: mc.spans, - } - data, err := proto.Marshal(pbRequest) + data, err := mc.ProtoSpans() if err != nil { return "", err } return base64.StdEncoding.EncodeToString(data), nil } +// ProtoSpans returns the list of in-memory stored spans as the protobuf byte array. +func (mc *InMemoryOtlpClient) ProtoSpans() ([]byte, error) { + mc.mu.Lock() + defer mc.mu.Unlock() + if len(mc.spans) <= 0 { + return nil, errNoTraces + } + pbRequest := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: mc.spans, + } + return proto.Marshal(pbRequest) +} + // NoopOtlpClient is a client implementation for otlptrace.Client that does nothing type NoopOtlpClient struct{} @@ -89,3 +97,8 @@ func (mc *NoopOtlpClient) UploadTraces(_ context.Context, _ []*tracepb.ResourceS func (mc *NoopOtlpClient) Spans() (string, error) { return "", errNoopTracer } + +// Spans always returns no traces error +func (mc *NoopOtlpClient) ProtoSpans() ([]byte, error) { + return nil, errNoopTracer +} diff --git a/tracing/tracing.go b/tracing/tracing.go index 1788b0ef..54625fc8 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -93,7 +93,7 @@ type TracedContext struct { *cfdTracer } -// NewTracedHTTPRequest creates a new tracer for the current HTTP request context. +// NewTracedContext creates a new tracer for the current context. func NewTracedContext(ctx context.Context, traceContext string, log *zerolog.Logger) *TracedContext { ctx, exists := extractTraceFromString(ctx, traceContext) if !exists { @@ -155,6 +155,24 @@ func (cft *cfdTracer) GetSpans() (enc string) { return } +// GetProtoSpans returns the spans as the otlp traces in protobuf byte array. +func (cft *cfdTracer) GetProtoSpans() (proto []byte) { + proto, err := cft.exporter.ProtoSpans() + switch err { + case nil: + break + case errNoTraces: + cft.log.Trace().Err(err).Msgf("expected traces to be available") + return + case errNoopTracer: + return // noop tracer has no traces + default: + cft.log.Debug().Err(err) + return + } + return +} + // AddSpans assigns spans as base64 encoded protobuf otlp traces to provided // HTTP headers. func (cft *cfdTracer) AddSpans(headers http.Header) { @@ -171,6 +189,11 @@ func (cft *cfdTracer) AddSpans(headers http.Header) { headers[CanonicalCloudflaredTracingHeader] = []string{enc} } +// End will set the OK status for the span and then end it. +func End(span trace.Span) { + endSpan(span, -1, codes.Ok, nil) +} + // EndWithErrorStatus will set a status for the span and then end it. func EndWithErrorStatus(span trace.Span, err error) { endSpan(span, -1, codes.Error, err) diff --git a/tunnelrpc/pogs/sessionrpc.go b/tunnelrpc/pogs/sessionrpc.go index 378fc205..5bbe47a3 100644 --- a/tunnelrpc/pogs/sessionrpc.go +++ b/tunnelrpc/pogs/sessionrpc.go @@ -14,7 +14,11 @@ import ( ) type SessionManager interface { - RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) error + // RegisterUdpSession is the call provided to cloudflared to handle an incoming + // capnproto RegisterUdpSession request from the edge. + RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*RegisterUdpSessionResponse, error) + // UnregisterUdpSession is the call provided to cloudflared to handle an incoming + // capnproto UnregisterUdpSession request from the edge. UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error } @@ -55,14 +59,15 @@ func (i SessionManager_PogsImpl) RegisterUdpSession(p tunnelrpc.SessionManager_r return err } - resp := RegisterUdpSessionResponse{} - registrationErr := i.impl.RegisterUdpSession(p.Ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) + resp, registrationErr := i.impl.RegisterUdpSession(p.Ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext) if registrationErr != nil { + // Make sure to assign a response even if one is not returned from register + if resp == nil { + resp = &RegisterUdpSessionResponse{} + } resp.Err = registrationErr } - // TUN-6689: Add spans to return path for RegisterUdpSession - result, err := p.Results.NewResult() if err != nil { return err