diff --git a/connection/control.go b/connection/control.go index d590b078..23ddedb1 100644 --- a/connection/control.go +++ b/connection/control.go @@ -82,7 +82,7 @@ func (c *controlStream) ServeControlStream( tunnelConfigGetter TunnelConfigJSONGetter, ) error { registrationClient := c.registerClientFunc(ctx, rw, c.registerTimeout) - + c.observer.logConnecting(c.connIndex, c.edgeAddress, c.protocol) registrationDetails, err := registrationClient.RegisterConnection( ctx, c.tunnelProperties.Credentials.Auth(), diff --git a/connection/errors.go b/connection/errors.go index 1bb34d6d..2cc60566 100644 --- a/connection/errors.go +++ b/connection/errors.go @@ -1,7 +1,6 @@ package connection import ( - "github.com/cloudflare/cloudflared/edgediscovery" tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -53,26 +52,26 @@ func serverRegistrationErrorFromRPC(err error) ServerRegisterTunnelError { } } -type muxerShutdownError struct{} +type ControlStreamError struct{} -func (e muxerShutdownError) Error() string { - return "muxer shutdown" +var _ error = &ControlStreamError{} + +func (e *ControlStreamError) Error() string { + return "control stream encountered a failure while serving" } -var errMuxerStopped = muxerShutdownError{} +type StreamListenerError struct{} -func isHandshakeErrRecoverable(err error, connIndex uint8, observer *Observer) bool { - log := observer.log.With(). - Uint8(LogFieldConnIndex, connIndex). - Err(err). - Logger() +var _ error = &StreamListenerError{} - switch err.(type) { - case edgediscovery.DialError: - log.Error().Msg("Connection unable to dial edge") - default: - log.Error().Msg("Connection failed") - return false - } - return true +func (e *StreamListenerError) Error() string { + return "accept stream listener encountered a failure while serving" +} + +type DatagramManagerError struct{} + +var _ error = &DatagramManagerError{} + +func (e *DatagramManagerError) Error() string { + return "datagram manager encountered a failure while serving" } diff --git a/connection/observer.go b/connection/observer.go index 817e6d2e..7e63cb3d 100644 --- a/connection/observer.go +++ b/connection/observer.go @@ -46,6 +46,15 @@ func (o *Observer) RegisterSink(sink EventSink) { o.addSinkChan <- sink } +func (o *Observer) logConnecting(connIndex uint8, address net.IP, protocol Protocol) { + o.log.Debug(). + Int(management.EventTypeKey, int(management.Cloudflared)). + Uint8(LogFieldConnIndex, connIndex). + IPAddr(LogFieldIPAddress, address). + Str(LogFieldProtocol, protocol.String()). + Msg("Registering tunnel connection") +} + func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) { o.log.Info(). Int(management.EventTypeKey, int(management.Cloudflared)). diff --git a/connection/quic_connection.go b/connection/quic_connection.go index 74e1f4a4..1ca5647e 100644 --- a/connection/quic_connection.go +++ b/connection/quic_connection.go @@ -3,6 +3,7 @@ package connection import ( "bufio" "context" + "errors" "fmt" "io" "net" @@ -12,7 +13,6 @@ import ( "sync/atomic" "time" - "github.com/pkg/errors" "github.com/quic-go/quic-go" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" @@ -65,7 +65,7 @@ func NewTunnelConnection( streamWriteTimeout time.Duration, gracePeriod time.Duration, logger *zerolog.Logger, -) (TunnelConnection, error) { +) TunnelConnection { return &quicConnection{ conn: conn, logger: logger, @@ -77,10 +77,11 @@ func NewTunnelConnection( rpcTimeout: rpcTimeout, streamWriteTimeout: streamWriteTimeout, gracePeriod: gracePeriod, - }, nil + } } // Serve starts a QUIC connection that begins accepting streams. +// Returning a nil error means cloudflared will exit for good and will not attempt to reconnect. func (q *quicConnection) Serve(ctx context.Context) error { // The edge assumes the first stream is used for the control plane controlStream, err := q.conn.OpenStream() @@ -88,16 +89,16 @@ func (q *quicConnection) Serve(ctx context.Context) error { return fmt.Errorf("failed to open a registration control stream: %w", err) } - // If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits - // as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this - // connection). // If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the - // other goroutine as fast as possible. - ctx, cancel := context.WithCancel(ctx) + // other goroutines. We enforce returning a not-nil error for each function started in the errgroup by logging + // the error returned and returning a custom error type instead. errGroup, ctx := errgroup.WithContext(ctx) - // In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control - // stream is already fully registered before the other goroutines can proceed. + // Close the quic connection if any of the following routines return from the errgroup (regardless of their error) + // because they are no longer processing requests for the connection. + defer q.Close() + + // Start the control stream routine errGroup.Go(func() error { // err is equal to nil if we exit due to unregistration. If that happens we want to wait the full // amount of the grace period, allowing requests to finish before we cancel the context, which will @@ -114,16 +115,26 @@ func (q *quicConnection) Serve(ctx context.Context) error { } } } - cancel() - return err + if err != nil { + q.logger.Error().Err(err).Msg("failed to serve the control stream") + } + return &ControlStreamError{} }) + // Start the accept stream loop routine errGroup.Go(func() error { - defer cancel() - return q.acceptStream(ctx) + err := q.acceptStream(ctx) + if err != nil { + q.logger.Error().Err(err).Msg("failed to accept incoming stream requests") + } + return &StreamListenerError{} }) + // Start the datagram handler routine errGroup.Go(func() error { - defer cancel() - return q.datagramHandler.Serve(ctx) + err := q.datagramHandler.Serve(ctx) + if err != nil { + q.logger.Error().Err(err).Msg("failed to run the datagram handler") + } + return &DatagramManagerError{} }) return errGroup.Wait() @@ -140,7 +151,6 @@ func (q *quicConnection) Close() { } func (q *quicConnection) acceptStream(ctx context.Context) error { - defer q.Close() for { quicStream, err := q.conn.AcceptStream(ctx) if err != nil { @@ -230,7 +240,7 @@ func (q *quicConnection) dispatchRequest(ctx context.Context, stream *rpcquic.Re ConnIndex: q.connIndex, }), rwa.connectResponseSent default: - return errors.Errorf("unsupported error type: %s", request.Type), false + return fmt.Errorf("unsupported error type: %s", request.Type), false } } diff --git a/connection/quic_connection_test.go b/connection/quic_connection_test.go index 8765fd29..fc831532 100644 --- a/connection/quic_connection_test.go +++ b/connection/quic_connection_test.go @@ -847,7 +847,7 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8) &log, } - tunnelConn, err := NewTunnelConnection( + tunnelConn := NewTunnelConnection( ctx, conn, index, @@ -860,7 +860,6 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8) 0*time.Second, &log, ) - require.NoError(t, err) return tunnelConn, datagramConn } diff --git a/connection/quic_datagram_v2.go b/connection/quic_datagram_v2.go index aebead70..e39f06e6 100644 --- a/connection/quic_datagram_v2.go +++ b/connection/quic_datagram_v2.go @@ -98,24 +98,17 @@ func NewDatagramV2Connection(ctx context.Context, } func (d *datagramV2Connection) Serve(ctx context.Context) error { - // If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits - // as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this - // connection). - // If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the - // other goroutine as fast as possible. - ctx, cancel := context.WithCancel(ctx) + // If either goroutine from the errgroup returns at all (error or nil), we rely on its cancellation to make sure + // the other goroutines as well. errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { - defer cancel() return d.sessionManager.Serve(ctx) }) errGroup.Go(func() error { - defer cancel() return d.datagramMuxer.ServeReceive(ctx) }) errGroup.Go(func() error { - defer cancel() return d.packetRouter.Serve(ctx) }) diff --git a/quic/v3/muxer.go b/quic/v3/muxer.go index 4f5605bf..6a614814 100644 --- a/quic/v3/muxer.go +++ b/quic/v3/muxer.go @@ -175,7 +175,7 @@ func (c *datagramConn) Serve(ctx context.Context) error { // Monitor the context of cloudflared case <-ctx.Done(): return ctx.Err() - // Monitor the context of the underlying connection + // Monitor the context of the underlying quic connection case <-connCtx.Done(): return connCtx.Err() // Monitor for any hard errors from reading the connection diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index cb25d68a..5bd49749 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -132,6 +132,7 @@ func (s *Supervisor) Run( if err == errEarlyShutdown { return nil } + s.log.Logger().Error().Err(err).Msg("initial tunnel connection failed") return err } var tunnelsWaiting []int @@ -154,6 +155,7 @@ func (s *Supervisor) Run( // (note that this may also be caused by context cancellation) case tunnelError := <-s.tunnelErrors: tunnelsActive-- + s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated") if tunnelError.err != nil && !shuttingDown { switch tunnelError.err.(type) { case ReconnectSignal: @@ -166,7 +168,6 @@ func (s *Supervisor) Run( if _, retry := s.tunnelsProtocolFallback[tunnelError.index].GetMaxBackoffDuration(ctx); !retry { continue } - s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated") tunnelsWaiting = append(tunnelsWaiting, tunnelError.index) s.waitForNextTunnel(tunnelError.index) @@ -285,7 +286,10 @@ func (s *Supervisor) startFirstTunnel( *quic.IdleTimeoutError, *quic.ApplicationError, edgediscovery.DialError, - *connection.EdgeQuicDialError: + *connection.EdgeQuicDialError, + *connection.ControlStreamError, + *connection.StreamListenerError, + *connection.DatagramManagerError: // Try again for these types of errors default: // Uncaught errors should bail startup @@ -301,13 +305,9 @@ func (s *Supervisor) startTunnel( index int, connectedSignal *signal.Signal, ) { - var err error - defer func() { - s.tunnelErrors <- tunnelError{index: index, err: err} - }() - // nolint: gosec - err = s.edgeTunnelServer.Serve(ctx, uint8(index), s.tunnelsProtocolFallback[index], connectedSignal) + err := s.edgeTunnelServer.Serve(ctx, uint8(index), s.tunnelsProtocolFallback[index], connectedSignal) + s.tunnelErrors <- tunnelError{index: index, err: err} } func (s *Supervisor) newConnectedTunnelSignal(index int) *signal.Signal { diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index b73eecb9..97013f5a 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -556,6 +556,7 @@ func (e *EdgeTunnelServer) serveQUIC( pqMode := connOptions.FeatureSnapshot.PostQuantum curvePref, err := curvePreference(pqMode, fips.IsFipsEnabled(), tlsConfig.CurvePreferences) if err != nil { + connLogger.ConnAwareLogger().Err(err).Msgf("failed to get curve preferences") return err, true } @@ -627,7 +628,7 @@ func (e *EdgeTunnelServer) serveQUIC( } // Wrap the [quic.Connection] as a TunnelConnection - tunnelConn, err := connection.NewTunnelConnection( + tunnelConn := connection.NewTunnelConnection( ctx, conn, connIndex, @@ -640,17 +641,13 @@ func (e *EdgeTunnelServer) serveQUIC( e.config.GracePeriod, connLogger.Logger(), ) - if err != nil { - connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new tunnel connection") - return err, true - } // Serve the TunnelConnection errGroup, serveCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { err := tunnelConn.Serve(serveCtx) if err != nil { - connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve tunnel connection") + connLogger.ConnAwareLogger().Err(err).Msg("failed to serve tunnel connection") } return err })