diff --git a/connection/quic_datagram_v3.go b/connection/quic_datagram_v3.go index 2aab8966..00d3c950 100644 --- a/connection/quic_datagram_v3.go +++ b/connection/quic_datagram_v3.go @@ -10,6 +10,7 @@ import ( "github.com/quic-go/quic-go" "github.com/rs/zerolog" + "github.com/cloudflare/cloudflared/management" cfdquic "github.com/cloudflare/cloudflared/quic/v3" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -25,9 +26,15 @@ func NewDatagramV3Connection(ctx context.Context, conn quic.Connection, sessionManager cfdquic.SessionManager, index uint8, + metrics cfdquic.Metrics, logger *zerolog.Logger, ) DatagramSessionHandler { - datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, index, logger) + log := logger. + With(). + Int(management.EventTypeKey, int(management.UDP)). + Uint8(LogFieldConnIndex, index). + Logger() + datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, index, metrics, &log) return &datagramV3Connection{ conn, diff --git a/quic/v3/manager.go b/quic/v3/manager.go index 57314728..d7e61ba9 100644 --- a/quic/v3/manager.go +++ b/quic/v3/manager.go @@ -13,11 +13,11 @@ import ( var ( // ErrSessionNotFound indicates that a session has not been registered yet for the request id. - ErrSessionNotFound = errors.New("session not found") + ErrSessionNotFound = errors.New("flow not found") // ErrSessionBoundToOtherConn is returned when a registration already exists for a different connection. - ErrSessionBoundToOtherConn = errors.New("session is in use by another connection") + ErrSessionBoundToOtherConn = errors.New("flow is in use by another connection") // ErrSessionAlreadyRegistered is returned when a registration already exists for this connection. - ErrSessionAlreadyRegistered = errors.New("session is already registered for this connection") + ErrSessionAlreadyRegistered = errors.New("flow is already registered for this connection") ) type SessionManager interface { @@ -39,12 +39,14 @@ type DialUDP func(dest netip.AddrPort) (*net.UDPConn, error) type sessionManager struct { sessions map[RequestID]Session mutex sync.RWMutex + metrics Metrics log *zerolog.Logger } -func NewSessionManager(log *zerolog.Logger, originDialer DialUDP) SessionManager { +func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP) SessionManager { return &sessionManager{ sessions: make(map[RequestID]Session), + metrics: metrics, log: log, } } @@ -65,7 +67,7 @@ func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram return nil, err } // Create and insert the new session in the map - session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.log) + session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.metrics, s.log) s.sessions[request.RequestID] = session return session, nil } diff --git a/quic/v3/manager_test.go b/quic/v3/manager_test.go index 0d93ac2f..71defadd 100644 --- a/quic/v3/manager_test.go +++ b/quic/v3/manager_test.go @@ -15,7 +15,7 @@ import ( func TestRegisterSession(t *testing.T) { log := zerolog.Nop() - manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort) + manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort) request := v3.UDPSessionRegistrationDatagram{ RequestID: testRequestID, @@ -71,7 +71,7 @@ func TestRegisterSession(t *testing.T) { func TestGetSession_Empty(t *testing.T) { log := zerolog.Nop() - manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort) + manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort) _, err := manager.GetSession(testRequestID) if !errors.Is(err, v3.ErrSessionNotFound) { diff --git a/quic/v3/metrics.go b/quic/v3/metrics.go new file mode 100644 index 00000000..8f9cf19e --- /dev/null +++ b/quic/v3/metrics.go @@ -0,0 +1,90 @@ +package v3 + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "cloudflared" + subsystem = "udp" +) + +type Metrics interface { + IncrementFlows() + DecrementFlows() + PayloadTooLarge() + RetryFlowResponse() + MigrateFlow() +} + +type metrics struct { + activeUDPFlows prometheus.Gauge + totalUDPFlows prometheus.Counter + payloadTooLarge prometheus.Counter + retryFlowResponses prometheus.Counter + migratedFlows prometheus.Counter +} + +func (m *metrics) IncrementFlows() { + m.totalUDPFlows.Inc() + m.activeUDPFlows.Inc() +} + +func (m *metrics) DecrementFlows() { + m.activeUDPFlows.Dec() +} + +func (m *metrics) PayloadTooLarge() { + m.payloadTooLarge.Inc() +} + +func (m *metrics) RetryFlowResponse() { + m.retryFlowResponses.Inc() +} + +func (m *metrics) MigrateFlow() { + m.migratedFlows.Inc() +} + +func NewMetrics(registerer prometheus.Registerer) Metrics { + m := &metrics{ + activeUDPFlows: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "active_flows", + Help: "Concurrent count of UDP flows that are being proxied to any origin", + }), + totalUDPFlows: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "total_flows", + Help: "Total count of UDP flows that have been proxied to any origin", + }), + payloadTooLarge: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "payload_too_large", + Help: "Total count of UDP flows that have had origin payloads that are too large to proxy", + }), + retryFlowResponses: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "retry_flow_responses", + Help: "Total count of UDP flows that have had to send their registration response more than once", + }), + migratedFlows: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "migrated_flows", + Help: "Total count of UDP flows have been migrated across local connections", + }), + } + registerer.MustRegister( + m.activeUDPFlows, + m.totalUDPFlows, + m.payloadTooLarge, + m.retryFlowResponses, + m.migratedFlows, + ) + return m +} diff --git a/quic/v3/metrics_test.go b/quic/v3/metrics_test.go new file mode 100644 index 00000000..5f6b18a7 --- /dev/null +++ b/quic/v3/metrics_test.go @@ -0,0 +1,9 @@ +package v3_test + +type noopMetrics struct{} + +func (noopMetrics) IncrementFlows() {} +func (noopMetrics) DecrementFlows() {} +func (noopMetrics) PayloadTooLarge() {} +func (noopMetrics) RetryFlowResponse() {} +func (noopMetrics) MigrateFlow() {} diff --git a/quic/v3/muxer.go b/quic/v3/muxer.go index e34dd27b..e0e45364 100644 --- a/quic/v3/muxer.go +++ b/quic/v3/muxer.go @@ -45,18 +45,20 @@ type datagramConn struct { conn QuicConnection index uint8 sessionManager SessionManager + metrics Metrics logger *zerolog.Logger datagrams chan []byte readErrors chan error } -func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, index uint8, logger *zerolog.Logger) DatagramConn { +func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, index uint8, metrics Metrics, logger *zerolog.Logger) DatagramConn { log := logger.With().Uint8("datagramVersion", 3).Logger() return &datagramConn{ conn: conn, index: index, sessionManager: sessionManager, + metrics: metrics, logger: &log, datagrams: make(chan []byte, demuxChanCapacity), readErrors: make(chan error, 2), @@ -143,11 +145,12 @@ func (c *datagramConn) Serve(ctx context.Context) error { c.logger.Err(err).Msgf("unable to unmarshal session registration datagram") return } + logger := c.logger.With().Str(logFlowID, reg.RequestID.String()).Logger() // We bind the new session to the quic connection context instead of cloudflared context to allow for the // quic connection to close and close only the sessions bound to it. Closing of cloudflared will also // initiate the close of the quic connection, so we don't have to worry about the application context // in the scope of a session. - c.handleSessionRegistrationDatagram(connCtx, reg) + c.handleSessionRegistrationDatagram(connCtx, reg, &logger) case UDPSessionPayloadType: payload := &UDPSessionPayloadDatagram{} err := payload.UnmarshalBinary(datagram) @@ -155,7 +158,8 @@ func (c *datagramConn) Serve(ctx context.Context) error { c.logger.Err(err).Msgf("unable to unmarshal session payload datagram") return } - c.handleSessionPayloadDatagram(payload) + logger := c.logger.With().Str(logFlowID, payload.RequestID.String()).Logger() + c.handleSessionPayloadDatagram(payload, &logger) case UDPSessionRegistrationResponseType: // cloudflared should never expect to receive UDP session responses as it will not initiate new // sessions towards the edge. @@ -169,31 +173,33 @@ func (c *datagramConn) Serve(ctx context.Context) error { } // This method handles new registrations of a session and the serve loop for the session. -func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram) { +func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram, logger *zerolog.Logger) { session, err := c.sessionManager.RegisterSession(datagram, c) switch err { case nil: // Continue as normal case ErrSessionAlreadyRegistered: // Session is already registered and likely the response got lost - c.handleSessionAlreadyRegistered(datagram.RequestID) + c.handleSessionAlreadyRegistered(datagram.RequestID, logger) return case ErrSessionBoundToOtherConn: // Session is already registered but to a different connection - c.handleSessionMigration(datagram.RequestID) + c.handleSessionMigration(datagram.RequestID, logger) return default: - c.logger.Err(err).Msgf("session registration failure") - c.handleSessionRegistrationFailure(datagram.RequestID) + logger.Err(err).Msgf("flow registration failure") + c.handleSessionRegistrationFailure(datagram.RequestID, logger) return } + c.metrics.IncrementFlows() // Make sure to eventually remove the session from the session manager when the session is closed defer c.sessionManager.UnregisterSession(session.ID()) + defer c.metrics.DecrementFlows() // Respond that we are able to process the new session err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk) if err != nil { - c.logger.Err(err).Msgf("session registration failure: unable to send session registration response") + logger.Err(err).Msgf("flow registration failure: unable to send session registration response") return } @@ -203,24 +209,24 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da if err == nil { // We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical // expected error response. - c.logger.Warn().Msg("session was closed without explicit close or timeout") + logger.Warn().Msg("flow was closed without explicit close or timeout") return } // SessionIdleErr and SessionCloseErr are valid and successful error responses to end a session. if errors.Is(err, SessionIdleErr{}) || errors.Is(err, SessionCloseErr) { - c.logger.Debug().Msg(err.Error()) + logger.Debug().Msg(err.Error()) return } // All other errors should be reported as errors - c.logger.Err(err).Msgf("session was closed with an error") + logger.Err(err).Msgf("flow was closed with an error") } -func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID) { +func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logger *zerolog.Logger) { // Send another registration response since the session is already active err := c.SendUDPSessionResponse(requestID, ResponseOk) if err != nil { - c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response") + logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response") return } @@ -233,9 +239,10 @@ func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID) { // The session is already running in another routine so we want to restart the idle timeout since no proxied // packets have come down yet. session.ResetIdleTimer() + c.metrics.RetryFlowResponse() } -func (c *datagramConn) handleSessionMigration(requestID RequestID) { +func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerolog.Logger) { // We need to migrate the currently running session to this edge connection. session, err := c.sessionManager.GetSession(requestID) if err != nil { @@ -250,29 +257,29 @@ func (c *datagramConn) handleSessionMigration(requestID RequestID) { // Send another registration response since the session is already active err = c.SendUDPSessionResponse(requestID, ResponseOk) if err != nil { - c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response") + logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response") return } } -func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID) { +func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, logger *zerolog.Logger) { err := c.SendUDPSessionResponse(requestID, ResponseUnableToBindSocket) if err != nil { - c.logger.Err(err).Msgf("unable to send session registration error response (%d)", ResponseUnableToBindSocket) + logger.Err(err).Msgf("unable to send flow registration error response (%d)", ResponseUnableToBindSocket) } } // Handles incoming datagrams that need to be sent to a registered session. -func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram) { +func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) { s, err := c.sessionManager.GetSession(datagram.RequestID) if err != nil { - c.logger.Err(err).Msgf("unable to find session") + logger.Err(err).Msgf("unable to find flow") return } // We ignore the bytes written to the socket because any partial write must return an error. _, err = s.Write(datagram.Payload) if err != nil { - c.logger.Err(err).Msgf("unable to write payload for unavailable session") + logger.Err(err).Msgf("unable to write payload for the flow") return } } diff --git a/quic/v3/muxer_test.go b/quic/v3/muxer_test.go index b80ad172..7e695a56 100644 --- a/quic/v3/muxer_test.go +++ b/quic/v3/muxer_test.go @@ -72,7 +72,7 @@ func (m *mockEyeball) SendUDPSessionResponse(id v3.RequestID, resp v3.SessionReg func TestDatagramConn_New(t *testing.T) { log := zerolog.Nop() - conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&log, ingress.DialUDPAddrPort), 0, &log) + conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log) if conn == nil { t.Fatal("expected valid connection") } @@ -81,7 +81,7 @@ func TestDatagramConn_New(t *testing.T) { func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) { log := zerolog.Nop() quic := newMockQuicConn() - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), 0, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log) payload := []byte{0xef, 0xef} conn.SendUDPSessionDatagram(payload) @@ -94,7 +94,7 @@ func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) { func TestDatagramConn_SendUDPSessionResponse(t *testing.T) { log := zerolog.Nop() quic := newMockQuicConn() - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), 0, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log) conn.SendUDPSessionResponse(testRequestID, v3.ResponseDestinationUnreachable) resp := <-quic.recv @@ -115,7 +115,7 @@ func TestDatagramConn_SendUDPSessionResponse(t *testing.T) { func TestDatagramConnServe_ApplicationClosed(t *testing.T) { log := zerolog.Nop() quic := newMockQuicConn() - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), 0, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -131,7 +131,7 @@ func TestDatagramConnServe_ConnectionClosed(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() quic.ctx = ctx - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), 0, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log) err := conn.Serve(context.Background()) if !errors.Is(err, context.DeadlineExceeded) { @@ -142,7 +142,7 @@ func TestDatagramConnServe_ConnectionClosed(t *testing.T) { func TestDatagramConnServe_ReceiveDatagramError(t *testing.T) { log := zerolog.Nop() quic := &mockQuicConnReadError{err: net.ErrClosed} - conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), 0, &log) + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log) err := conn.Serve(context.Background()) if !errors.Is(err, net.ErrClosed) { @@ -177,7 +177,7 @@ func TestDatagramConnServe_ErrorDatagramTypes(t *testing.T) { log := zerolog.New(logOutput) quic := newMockQuicConn() quic.send <- test.input - conn := v3.NewDatagramConn(quic, &mockSessionManager{}, 0, &log) + conn := v3.NewDatagramConn(quic, &mockSessionManager{}, 0, &noopMetrics{}, &log) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -218,7 +218,7 @@ func TestDatagramConnServe_RegisterSession_SessionManagerError(t *testing.T) { quic := newMockQuicConn() expectedErr := errors.New("unable to register session") sessionManager := mockSessionManager{expectedRegErr: expectedErr} - conn := v3.NewDatagramConn(quic, &sessionManager, 0, &log) + conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log) // Setup the muxer ctx, cancel := context.WithCancelCause(context.Background()) @@ -253,7 +253,7 @@ func TestDatagramConnServe(t *testing.T) { quic := newMockQuicConn() session := newMockSession() sessionManager := mockSessionManager{session: &session} - conn := v3.NewDatagramConn(quic, &sessionManager, 0, &log) + conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log) // Setup the muxer ctx, cancel := context.WithCancelCause(context.Background()) @@ -298,7 +298,7 @@ func TestDatagramConnServe_RegisterTwice(t *testing.T) { quic := newMockQuicConn() session := newMockSession() sessionManager := mockSessionManager{session: &session} - conn := v3.NewDatagramConn(quic, &sessionManager, 0, &log) + conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log) // Setup the muxer ctx, cancel := context.WithCancelCause(context.Background()) @@ -360,9 +360,9 @@ func TestDatagramConnServe_MigrateConnection(t *testing.T) { quic := newMockQuicConn() session := newMockSession() sessionManager := mockSessionManager{session: &session} - conn := v3.NewDatagramConn(quic, &sessionManager, 0, &log) + conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log) quic2 := newMockQuicConn() - conn2 := v3.NewDatagramConn(quic2, &sessionManager, 1, &log) + conn2 := v3.NewDatagramConn(quic2, &sessionManager, 1, &noopMetrics{}, &log) // Setup the muxer ctx, cancel := context.WithCancelCause(context.Background()) @@ -443,7 +443,7 @@ func TestDatagramConnServe_Payload_GetSessionError(t *testing.T) { quic := newMockQuicConn() // mockSessionManager will return the ErrSessionNotFound for any session attempting to be queried by the muxer sessionManager := mockSessionManager{session: nil, expectedGetErr: v3.ErrSessionNotFound} - conn := v3.NewDatagramConn(quic, &sessionManager, 0, &log) + conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log) // Setup the muxer ctx, cancel := context.WithCancelCause(context.Background()) @@ -471,7 +471,7 @@ func TestDatagramConnServe_Payload(t *testing.T) { quic := newMockQuicConn() session := newMockSession() sessionManager := mockSessionManager{session: &session} - conn := v3.NewDatagramConn(quic, &sessionManager, 0, &log) + conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log) // Setup the muxer ctx, cancel := context.WithCancelCause(context.Background()) diff --git a/quic/v3/session.go b/quic/v3/session.go index 0146e90d..e54f53e9 100644 --- a/quic/v3/session.go +++ b/quic/v3/session.go @@ -21,10 +21,12 @@ const ( // read 1500 bytes from the origin, we limit the amount of bytes to be proxied to less than // this value (maxDatagramPayloadLen). maxOriginUDPPacketSize = 1500 + + logFlowID = "flowID" ) // SessionCloseErr indicates that the session's Close method was called. -var SessionCloseErr error = errors.New("session was closed") +var SessionCloseErr error = errors.New("flow was closed") // SessionIdleErr is returned when the session was closed because there was no communication // in either direction over the session for the timeout period. @@ -33,7 +35,7 @@ type SessionIdleErr struct { } func (e SessionIdleErr) Error() string { - return fmt.Sprintf("session idle for %v", e.timeout) + return fmt.Sprintf("flow idle for %v", e.timeout) } func (e SessionIdleErr) Is(target error) bool { @@ -63,10 +65,12 @@ type session struct { // activeAtChan is used to communicate the last read/write time activeAtChan chan time.Time closeChan chan error + metrics Metrics log *zerolog.Logger } -func NewSession(id RequestID, closeAfterIdle time.Duration, origin io.ReadWriteCloser, eyeball DatagramConn, log *zerolog.Logger) Session { +func NewSession(id RequestID, closeAfterIdle time.Duration, origin io.ReadWriteCloser, eyeball DatagramConn, metrics Metrics, log *zerolog.Logger) Session { + logger := log.With().Str(logFlowID, id.String()).Logger() session := &session{ id: id, closeAfterIdle: closeAfterIdle, @@ -76,7 +80,8 @@ func NewSession(id RequestID, closeAfterIdle time.Duration, origin io.ReadWriteC // drop instead of blocking because last active time only needs to be an approximation activeAtChan: make(chan time.Time, 1), closeChan: make(chan error, 1), - log: log, + metrics: metrics, + log: &logger, } session.eyeball.Store(&eyeball) return session @@ -99,6 +104,7 @@ func (s *session) Migrate(eyeball DatagramConn) { } // The session is already running so we want to restart the idle timeout since no proxied packets have come down yet. s.markActive() + s.metrics.MigrateFlow() } func (s *session) Serve(ctx context.Context) error { @@ -114,18 +120,19 @@ func (s *session) Serve(ctx context.Context) error { // Read from the origin UDP socket n, err := s.origin.Read(readBuffer[DatagramPayloadHeaderLen:]) if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - s.log.Debug().Msg("Session (origin) connection closed") + s.log.Debug().Msg("Flow (origin) connection closed") } if err != nil { s.closeChan <- err return } if n < 0 { - s.log.Warn().Int("packetSize", n).Msg("Session (origin) packet read was negative and was dropped") + s.log.Warn().Int("packetSize", n).Msg("Flow (origin) packet read was negative and was dropped") continue } if n > maxDatagramPayloadLen { - s.log.Error().Int("packetSize", n).Msg("Session (origin) packet read was too large and was dropped") + s.metrics.PayloadTooLarge() + s.log.Error().Int("packetSize", n).Msg("Flow (origin) packet read was too large and was dropped") continue } // We need to synchronize on the eyeball in-case that the connection was migrated. This should be rarely a point @@ -148,12 +155,12 @@ func (s *session) Serve(ctx context.Context) error { func (s *session) Write(payload []byte) (n int, err error) { n, err = s.origin.Write(payload) if err != nil { - s.log.Err(err).Msg("Failed to write payload to session (remote)") + s.log.Err(err).Msg("Failed to write payload to flow (remote)") return n, err } // Write must return a non-nil error if it returns n < len(p). https://pkg.go.dev/io#Writer if n < len(payload) { - s.log.Err(io.ErrShortWrite).Msg("Failed to write the full payload to session (remote)") + s.log.Err(io.ErrShortWrite).Msg("Failed to write the full payload to flow (remote)") return n, io.ErrShortWrite } // Mark the session as active since we proxied a packet to the origin. diff --git a/quic/v3/session_test.go b/quic/v3/session_test.go index c14f2bb7..4379db90 100644 --- a/quic/v3/session_test.go +++ b/quic/v3/session_test.go @@ -18,7 +18,7 @@ var expectedContextCanceled = errors.New("expected context canceled") func TestSessionNew(t *testing.T) { log := zerolog.Nop() - session := v3.NewSession(testRequestID, 5*time.Second, nil, &noopEyeball{}, &log) + session := v3.NewSession(testRequestID, 5*time.Second, nil, &noopEyeball{}, &noopMetrics{}, &log) if testRequestID != session.ID() { t.Fatalf("session id doesn't match: %s != %s", testRequestID, session.ID()) } @@ -27,7 +27,7 @@ func TestSessionNew(t *testing.T) { func testSessionWrite(t *testing.T, payload []byte) { log := zerolog.Nop() origin := newTestOrigin(makePayload(1280)) - session := v3.NewSession(testRequestID, 5*time.Second, &origin, &noopEyeball{}, &log) + session := v3.NewSession(testRequestID, 5*time.Second, &origin, &noopEyeball{}, &noopMetrics{}, &log) n, err := session.Write(payload) if err != nil { t.Fatal(err) @@ -64,7 +64,7 @@ func testSessionServe_Origin(t *testing.T, payload []byte) { log := zerolog.Nop() eyeball := newMockEyeball() origin := newTestOrigin(payload) - session := v3.NewSession(testRequestID, 3*time.Second, &origin, &eyeball, &log) + session := v3.NewSession(testRequestID, 3*time.Second, &origin, &eyeball, &noopMetrics{}, &log) defer session.Close() ctx, cancel := context.WithCancelCause(context.Background()) @@ -103,7 +103,7 @@ func TestSessionServe_OriginTooLarge(t *testing.T) { eyeball := newMockEyeball() payload := makePayload(1281) origin := newTestOrigin(payload) - session := v3.NewSession(testRequestID, 2*time.Second, &origin, &eyeball, &log) + session := v3.NewSession(testRequestID, 2*time.Second, &origin, &eyeball, &noopMetrics{}, &log) defer session.Close() done := make(chan error) @@ -127,7 +127,7 @@ func TestSessionServe_Migrate(t *testing.T) { log := zerolog.Nop() eyeball := newMockEyeball() pipe1, pipe2 := net.Pipe() - session := v3.NewSession(testRequestID, 2*time.Second, pipe2, &eyeball, &log) + session := v3.NewSession(testRequestID, 2*time.Second, pipe2, &eyeball, &noopMetrics{}, &log) defer session.Close() done := make(chan error) @@ -165,7 +165,7 @@ func TestSessionServe_Migrate(t *testing.T) { func TestSessionClose_Multiple(t *testing.T) { log := zerolog.Nop() origin := newTestOrigin(makePayload(128)) - session := v3.NewSession(testRequestID, 5*time.Second, &origin, &noopEyeball{}, &log) + session := v3.NewSession(testRequestID, 5*time.Second, &origin, &noopEyeball{}, &noopMetrics{}, &log) err := session.Close() if err != nil { t.Fatal(err) @@ -184,7 +184,7 @@ func TestSessionServe_IdleTimeout(t *testing.T) { log := zerolog.Nop() origin := newTestIdleOrigin(10 * time.Second) // Make idle time longer than closeAfterIdle closeAfterIdle := 2 * time.Second - session := v3.NewSession(testRequestID, closeAfterIdle, &origin, &noopEyeball{}, &log) + session := v3.NewSession(testRequestID, closeAfterIdle, &origin, &noopEyeball{}, &noopMetrics{}, &log) err := session.Serve(context.Background()) if !errors.Is(err, v3.SessionIdleErr{}) { t.Fatal(err) @@ -206,7 +206,7 @@ func TestSessionServe_ParentContextCanceled(t *testing.T) { origin := newTestIdleOrigin(10 * time.Second) closeAfterIdle := 10 * time.Second - session := v3.NewSession(testRequestID, closeAfterIdle, &origin, &noopEyeball{}, &log) + session := v3.NewSession(testRequestID, closeAfterIdle, &origin, &noopEyeball{}, &noopMetrics{}, &log) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() err := session.Serve(ctx) @@ -227,7 +227,7 @@ func TestSessionServe_ParentContextCanceled(t *testing.T) { func TestSessionServe_ReadErrors(t *testing.T) { log := zerolog.Nop() origin := newTestErrOrigin(net.ErrClosed, nil) - session := v3.NewSession(testRequestID, 30*time.Second, &origin, &noopEyeball{}, &log) + session := v3.NewSession(testRequestID, 30*time.Second, &origin, &noopEyeball{}, &noopMetrics{}, &log) err := session.Serve(context.Background()) if !errors.Is(err, net.ErrClosed) { t.Fatal(err) diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 8d68f6b7..d4a75e77 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/quic-go/quic-go" "github.com/rs/zerolog" @@ -82,12 +83,14 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato edgeAddrHandler := NewIPAddrFallback(config.MaxEdgeAddrRetries) edgeBindAddr := config.EdgeBindAddr - sessionManager := v3.NewSessionManager(config.Log, ingress.DialUDPAddrPort) + datagramMetrics := v3.NewMetrics(prometheus.DefaultRegisterer) + sessionManager := v3.NewSessionManager(datagramMetrics, config.Log, ingress.DialUDPAddrPort) edgeTunnelServer := EdgeTunnelServer{ config: config, orchestrator: orchestrator, sessionManager: sessionManager, + datagramMetrics: datagramMetrics, edgeAddrs: edgeIPs, edgeAddrHandler: edgeAddrHandler, edgeBindAddr: edgeBindAddr, diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 13644f58..6a456ca6 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -176,6 +176,7 @@ type EdgeTunnelServer struct { config *TunnelConfig orchestrator *orchestration.Orchestrator sessionManager v3.SessionManager + datagramMetrics v3.Metrics edgeAddrHandler EdgeAddrHandler edgeAddrs *edgediscovery.Edge edgeBindAddr net.IP @@ -607,6 +608,7 @@ func (e *EdgeTunnelServer) serveQUIC( conn, e.sessionManager, connIndex, + e.datagramMetrics, connLogger.Logger(), ) } else {