diff --git a/ingress/origin_udp_proxy.go b/ingress/origin_udp_proxy.go index f553e30d..012c05c0 100644 --- a/ingress/origin_udp_proxy.go +++ b/ingress/origin_udp_proxy.go @@ -39,7 +39,7 @@ func DialUDPAddrPort(dest netip.AddrPort) (*net.UDPConn, error) { // address as context. udpConn, err := net.DialUDP("udp", nil, addr) if err != nil { - return nil, fmt.Errorf("unable to create UDP proxy to origin (%v:%v): %w", dest.Addr(), dest.Port(), err) + return nil, fmt.Errorf("unable to dial udp to origin %s: %w", dest, err) } return udpConn, nil diff --git a/quic/v3/manager.go b/quic/v3/manager.go index d7e61ba9..f5b0667f 100644 --- a/quic/v3/manager.go +++ b/quic/v3/manager.go @@ -7,8 +7,6 @@ import ( "sync" "github.com/rs/zerolog" - - "github.com/cloudflare/cloudflared/ingress" ) var ( @@ -37,17 +35,19 @@ type SessionManager interface { type DialUDP func(dest netip.AddrPort) (*net.UDPConn, error) type sessionManager struct { - sessions map[RequestID]Session - mutex sync.RWMutex - metrics Metrics - log *zerolog.Logger + sessions map[RequestID]Session + mutex sync.RWMutex + originDialer DialUDP + metrics Metrics + log *zerolog.Logger } func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP) SessionManager { return &sessionManager{ - sessions: make(map[RequestID]Session), - metrics: metrics, - log: log, + sessions: make(map[RequestID]Session), + originDialer: originDialer, + metrics: metrics, + log: log, } } @@ -62,12 +62,20 @@ func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram return nil, ErrSessionBoundToOtherConn } // Attempt to bind the UDP socket for the new session - origin, err := ingress.DialUDPAddrPort(request.Dest) + origin, err := s.originDialer(request.Dest) if err != nil { return nil, err } // Create and insert the new session in the map - session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.metrics, s.log) + session := NewSession( + request.RequestID, + request.IdleDurationHint, + origin, + origin.RemoteAddr(), + origin.LocalAddr(), + conn, + s.metrics, + s.log) s.sessions[request.RequestID] = session return session, nil } diff --git a/quic/v3/muxer.go b/quic/v3/muxer.go index e0e45364..79081762 100644 --- a/quic/v3/muxer.go +++ b/quic/v3/muxer.go @@ -3,6 +3,7 @@ package v3 import ( "context" "errors" + "time" "github.com/rs/zerolog" ) @@ -11,6 +12,10 @@ const ( // Allocating a 16 channel buffer here allows for the writer to be slightly faster than the reader. // This has worked previously well for datagramv2, so we will start with this as well demuxChanCapacity = 16 + + logSrcKey = "src" + logDstKey = "dst" + logDurationKey = "durationMS" ) // DatagramConn is the bridge that multiplexes writes and reads of datagrams for UDP sessions and ICMP packets to @@ -174,23 +179,28 @@ func (c *datagramConn) Serve(ctx context.Context) error { // This method handles new registrations of a session and the serve loop for the session. func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram, logger *zerolog.Logger) { + log := logger.With(). + Str(logFlowID, datagram.RequestID.String()). + Str(logDstKey, datagram.Dest.String()). + Logger() session, err := c.sessionManager.RegisterSession(datagram, c) switch err { case nil: // Continue as normal case ErrSessionAlreadyRegistered: // Session is already registered and likely the response got lost - c.handleSessionAlreadyRegistered(datagram.RequestID, logger) + c.handleSessionAlreadyRegistered(datagram.RequestID, &log) return case ErrSessionBoundToOtherConn: // Session is already registered but to a different connection - c.handleSessionMigration(datagram.RequestID, logger) + c.handleSessionMigration(datagram.RequestID, &log) return default: - logger.Err(err).Msgf("flow registration failure") - c.handleSessionRegistrationFailure(datagram.RequestID, logger) + log.Err(err).Msgf("flow registration failure") + c.handleSessionRegistrationFailure(datagram.RequestID, &log) return } + log = log.With().Str(logSrcKey, session.LocalAddr().String()).Logger() c.metrics.IncrementFlows() // Make sure to eventually remove the session from the session manager when the session is closed defer c.sessionManager.UnregisterSession(session.ID()) @@ -199,27 +209,30 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da // Respond that we are able to process the new session err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk) if err != nil { - logger.Err(err).Msgf("flow registration failure: unable to send session registration response") + log.Err(err).Msgf("flow registration failure: unable to send session registration response") return } // We bind the context of the session to the [quic.Connection] that initiated the session. // [Session.Serve] is blocking and will continue this go routine till the end of the session lifetime. + start := time.Now() err = session.Serve(ctx) + elapsedMS := time.Now().Sub(start).Milliseconds() + log = log.With().Int64(logDurationKey, elapsedMS).Logger() if err == nil { // We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical // expected error response. - logger.Warn().Msg("flow was closed without explicit close or timeout") + log.Warn().Msg("flow closed: no explicit close or timeout elapsed") return } // SessionIdleErr and SessionCloseErr are valid and successful error responses to end a session. if errors.Is(err, SessionIdleErr{}) || errors.Is(err, SessionCloseErr) { - logger.Debug().Msg(err.Error()) + log.Debug().Msgf("flow closed: %s", err.Error()) return } // All other errors should be reported as errors - logger.Err(err).Msgf("flow was closed with an error") + log.Err(err).Msgf("flow closed with an error") } func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logger *zerolog.Logger) { @@ -240,6 +253,7 @@ func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logge // packets have come down yet. session.ResetIdleTimer() c.metrics.RetryFlowResponse() + logger.Debug().Msgf("flow registration response retry") } func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerolog.Logger) { @@ -252,7 +266,8 @@ func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerol } // Migrate the session to use this edge connection instead of the currently running one. - session.Migrate(c) + // We also pass in this connection's logger to override the existing logger for the session. + session.Migrate(c, c.logger) // Send another registration response since the session is already active err = c.SendUDPSessionResponse(requestID, ResponseOk) @@ -260,6 +275,7 @@ func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerol logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response") return } + logger.Debug().Msgf("flow registration migration") } func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, logger *zerolog.Logger) { diff --git a/quic/v3/muxer_test.go b/quic/v3/muxer_test.go index 7e695a56..1b2149f9 100644 --- a/quic/v3/muxer_test.go +++ b/quic/v3/muxer_test.go @@ -619,16 +619,12 @@ func newMockSession() mockSession { } } -func (m *mockSession) ID() v3.RequestID { - return testRequestID -} - -func (m *mockSession) ConnectionID() uint8 { - return 0 -} - -func (m *mockSession) Migrate(conn v3.DatagramConn) { m.migrated <- conn.ID() } -func (m *mockSession) ResetIdleTimer() {} +func (m *mockSession) ID() v3.RequestID { return testRequestID } +func (m *mockSession) RemoteAddr() net.Addr { return testOriginAddr } +func (m *mockSession) LocalAddr() net.Addr { return testLocalAddr } +func (m *mockSession) ConnectionID() uint8 { return 0 } +func (m *mockSession) Migrate(conn v3.DatagramConn, log *zerolog.Logger) { m.migrated <- conn.ID() } +func (m *mockSession) ResetIdleTimer() {} func (m *mockSession) Serve(ctx context.Context) error { close(m.served) diff --git a/quic/v3/session.go b/quic/v3/session.go index e54f53e9..7ebe02a7 100644 --- a/quic/v3/session.go +++ b/quic/v3/session.go @@ -22,11 +22,12 @@ const ( // this value (maxDatagramPayloadLen). maxOriginUDPPacketSize = 1500 - logFlowID = "flowID" + logFlowID = "flowID" + logPacketSizeKey = "packetSize" ) // SessionCloseErr indicates that the session's Close method was called. -var SessionCloseErr error = errors.New("flow was closed") +var SessionCloseErr error = errors.New("flow was closed directly") // SessionIdleErr is returned when the session was closed because there was no communication // in either direction over the session for the timeout period. @@ -35,7 +36,7 @@ type SessionIdleErr struct { } func (e SessionIdleErr) Error() string { - return fmt.Sprintf("flow idle for %v", e.timeout) + return fmt.Sprintf("flow was idle for %v", e.timeout) } func (e SessionIdleErr) Is(target error) bool { @@ -51,8 +52,10 @@ type Session interface { io.WriteCloser ID() RequestID ConnectionID() uint8 + RemoteAddr() net.Addr + LocalAddr() net.Addr ResetIdleTimer() - Migrate(eyeball DatagramConn) + Migrate(eyeball DatagramConn, logger *zerolog.Logger) // Serve starts the event loop for processing UDP packets Serve(ctx context.Context) error } @@ -61,6 +64,8 @@ type session struct { id RequestID closeAfterIdle time.Duration origin io.ReadWriteCloser + originAddr net.Addr + localAddr net.Addr eyeball atomic.Pointer[DatagramConn] // activeAtChan is used to communicate the last read/write time activeAtChan chan time.Time @@ -69,12 +74,23 @@ type session struct { log *zerolog.Logger } -func NewSession(id RequestID, closeAfterIdle time.Duration, origin io.ReadWriteCloser, eyeball DatagramConn, metrics Metrics, log *zerolog.Logger) Session { +func NewSession( + id RequestID, + closeAfterIdle time.Duration, + origin io.ReadWriteCloser, + originAddr net.Addr, + localAddr net.Addr, + eyeball DatagramConn, + metrics Metrics, + log *zerolog.Logger, +) Session { logger := log.With().Str(logFlowID, id.String()).Logger() session := &session{ id: id, closeAfterIdle: closeAfterIdle, origin: origin, + originAddr: originAddr, + localAddr: localAddr, eyeball: atomic.Pointer[DatagramConn]{}, // activeAtChan has low capacity. It can be full when there are many concurrent read/write. markActive() will // drop instead of blocking because last active time only needs to be an approximation @@ -91,16 +107,26 @@ func (s *session) ID() RequestID { return s.id } +func (s *session) RemoteAddr() net.Addr { + return s.originAddr +} + +func (s *session) LocalAddr() net.Addr { + return s.localAddr +} + func (s *session) ConnectionID() uint8 { eyeball := *(s.eyeball.Load()) return eyeball.ID() } -func (s *session) Migrate(eyeball DatagramConn) { +func (s *session) Migrate(eyeball DatagramConn, logger *zerolog.Logger) { current := *(s.eyeball.Load()) // Only migrate if the connection ids are different. if current.ID() != eyeball.ID() { s.eyeball.Store(&eyeball) + log := logger.With().Str(logFlowID, s.id.String()).Logger() + s.log = &log } // The session is already running so we want to restart the idle timeout since no proxied packets have come down yet. s.markActive() @@ -119,20 +145,21 @@ func (s *session) Serve(ctx context.Context) error { for { // Read from the origin UDP socket n, err := s.origin.Read(readBuffer[DatagramPayloadHeaderLen:]) - if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - s.log.Debug().Msg("Flow (origin) connection closed") - } if err != nil { + if errors.Is(err, io.EOF) || + errors.Is(err, io.ErrUnexpectedEOF) { + s.log.Debug().Msgf("flow (origin) connection closed: %v", err) + } s.closeChan <- err return } if n < 0 { - s.log.Warn().Int("packetSize", n).Msg("Flow (origin) packet read was negative and was dropped") + s.log.Warn().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was negative and was dropped") continue } if n > maxDatagramPayloadLen { s.metrics.PayloadTooLarge() - s.log.Error().Int("packetSize", n).Msg("Flow (origin) packet read was too large and was dropped") + s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped") continue } // We need to synchronize on the eyeball in-case that the connection was migrated. This should be rarely a point @@ -155,12 +182,12 @@ func (s *session) Serve(ctx context.Context) error { func (s *session) Write(payload []byte) (n int, err error) { n, err = s.origin.Write(payload) if err != nil { - s.log.Err(err).Msg("Failed to write payload to flow (remote)") + s.log.Err(err).Msg("failed to write payload to flow (remote)") return n, err } // Write must return a non-nil error if it returns n < len(p). https://pkg.go.dev/io#Writer if n < len(payload) { - s.log.Err(io.ErrShortWrite).Msg("Failed to write the full payload to flow (remote)") + s.log.Err(io.ErrShortWrite).Msg("failed to write the full payload to flow (remote)") return n, io.ErrShortWrite } // Mark the session as active since we proxied a packet to the origin. diff --git a/quic/v3/session_test.go b/quic/v3/session_test.go index 4379db90..1e31962a 100644 --- a/quic/v3/session_test.go +++ b/quic/v3/session_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "net" + "net/netip" "slices" "sync/atomic" "testing" @@ -14,11 +15,16 @@ import ( v3 "github.com/cloudflare/cloudflared/quic/v3" ) -var expectedContextCanceled = errors.New("expected context canceled") +var ( + expectedContextCanceled = errors.New("expected context canceled") + + testOriginAddr = net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:0")) + testLocalAddr = net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:0")) +) func TestSessionNew(t *testing.T) { log := zerolog.Nop() - session := v3.NewSession(testRequestID, 5*time.Second, nil, &noopEyeball{}, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, 5*time.Second, nil, testOriginAddr, testLocalAddr, &noopEyeball{}, &noopMetrics{}, &log) if testRequestID != session.ID() { t.Fatalf("session id doesn't match: %s != %s", testRequestID, session.ID()) } @@ -27,7 +33,7 @@ func TestSessionNew(t *testing.T) { func testSessionWrite(t *testing.T, payload []byte) { log := zerolog.Nop() origin := newTestOrigin(makePayload(1280)) - session := v3.NewSession(testRequestID, 5*time.Second, &origin, &noopEyeball{}, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, 5*time.Second, &origin, testOriginAddr, testLocalAddr, &noopEyeball{}, &noopMetrics{}, &log) n, err := session.Write(payload) if err != nil { t.Fatal(err) @@ -64,7 +70,7 @@ func testSessionServe_Origin(t *testing.T, payload []byte) { log := zerolog.Nop() eyeball := newMockEyeball() origin := newTestOrigin(payload) - session := v3.NewSession(testRequestID, 3*time.Second, &origin, &eyeball, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, 3*time.Second, &origin, testOriginAddr, testLocalAddr, &eyeball, &noopMetrics{}, &log) defer session.Close() ctx, cancel := context.WithCancelCause(context.Background()) @@ -103,7 +109,7 @@ func TestSessionServe_OriginTooLarge(t *testing.T) { eyeball := newMockEyeball() payload := makePayload(1281) origin := newTestOrigin(payload) - session := v3.NewSession(testRequestID, 2*time.Second, &origin, &eyeball, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, 2*time.Second, &origin, testOriginAddr, testLocalAddr, &eyeball, &noopMetrics{}, &log) defer session.Close() done := make(chan error) @@ -127,7 +133,7 @@ func TestSessionServe_Migrate(t *testing.T) { log := zerolog.Nop() eyeball := newMockEyeball() pipe1, pipe2 := net.Pipe() - session := v3.NewSession(testRequestID, 2*time.Second, pipe2, &eyeball, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, 2*time.Second, pipe2, testOriginAddr, testLocalAddr, &eyeball, &noopMetrics{}, &log) defer session.Close() done := make(chan error) @@ -138,7 +144,7 @@ func TestSessionServe_Migrate(t *testing.T) { // Migrate the session to a new connection before origin sends data eyeball2 := newMockEyeball() eyeball2.connID = 1 - session.Migrate(&eyeball2) + session.Migrate(&eyeball2, &log) // Origin sends data payload2 := []byte{0xde} @@ -165,7 +171,7 @@ func TestSessionServe_Migrate(t *testing.T) { func TestSessionClose_Multiple(t *testing.T) { log := zerolog.Nop() origin := newTestOrigin(makePayload(128)) - session := v3.NewSession(testRequestID, 5*time.Second, &origin, &noopEyeball{}, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, 5*time.Second, &origin, testOriginAddr, testLocalAddr, &noopEyeball{}, &noopMetrics{}, &log) err := session.Close() if err != nil { t.Fatal(err) @@ -184,7 +190,7 @@ func TestSessionServe_IdleTimeout(t *testing.T) { log := zerolog.Nop() origin := newTestIdleOrigin(10 * time.Second) // Make idle time longer than closeAfterIdle closeAfterIdle := 2 * time.Second - session := v3.NewSession(testRequestID, closeAfterIdle, &origin, &noopEyeball{}, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, closeAfterIdle, &origin, testOriginAddr, testLocalAddr, &noopEyeball{}, &noopMetrics{}, &log) err := session.Serve(context.Background()) if !errors.Is(err, v3.SessionIdleErr{}) { t.Fatal(err) @@ -206,7 +212,7 @@ func TestSessionServe_ParentContextCanceled(t *testing.T) { origin := newTestIdleOrigin(10 * time.Second) closeAfterIdle := 10 * time.Second - session := v3.NewSession(testRequestID, closeAfterIdle, &origin, &noopEyeball{}, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, closeAfterIdle, &origin, testOriginAddr, testLocalAddr, &noopEyeball{}, &noopMetrics{}, &log) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := session.Serve(ctx) @@ -227,7 +233,7 @@ func TestSessionServe_ParentContextCanceled(t *testing.T) { func TestSessionServe_ReadErrors(t *testing.T) { log := zerolog.Nop() origin := newTestErrOrigin(net.ErrClosed, nil) - session := v3.NewSession(testRequestID, 30*time.Second, &origin, &noopEyeball{}, &noopMetrics{}, &log) + session := v3.NewSession(testRequestID, 30*time.Second, &origin, testOriginAddr, testLocalAddr, &noopEyeball{}, &noopMetrics{}, &log) err := session.Serve(context.Background()) if !errors.Is(err, net.ErrClosed) { t.Fatal(err)