From 628545d229cac91ff8a802dde23324f3beae8076 Mon Sep 17 00:00:00 2001 From: Nuno Diegues Date: Tue, 4 Jan 2022 19:00:44 +0000 Subject: [PATCH] TUN-5600: Close QUIC transports as soon as possible while respecting graceful shutdown This does a few fixes to make sure that the QUICConnection returns from Serve when the context is cancelled. QUIC transport now behaves like other transports: closes as soon as there is no traffic, or at most by grace-period. Note that we do not wait for UDP traffic since that's connectionless by design. --- connection/quic.go | 33 ++++++++++++++++++++++----------- connection/quic_test.go | 2 +- datagramsession/manager.go | 9 +++++++-- origin/tunnel.go | 6 +++--- 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/connection/quic.go b/connection/quic.go index 579f5196..b586be26 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -34,10 +34,11 @@ const ( // QUICConnection represents the type that facilitates Proxying via QUIC streams. type QUICConnection struct { - session quic.Session - logger *zerolog.Logger - httpProxy OriginProxy - sessionManager datagramsession.Manager + session quic.Session + logger *zerolog.Logger + httpProxy OriginProxy + sessionManager datagramsession.Manager + controlStreamHandler ControlStreamHandler } // NewQUICConnection returns a new instance of QUICConnection. @@ -49,7 +50,7 @@ func NewQUICConnection( httpProxy OriginProxy, connOptions *tunnelpogs.ConnectionOptions, controlStreamHandler ControlStreamHandler, - observer *Observer, + logger *zerolog.Logger, ) (*QUICConnection, error) { session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig) if err != nil { @@ -72,34 +73,44 @@ func NewQUICConnection( return nil, err } - sessionManager := datagramsession.NewManager(datagramMuxer, observer.log) + sessionManager := datagramsession.NewManager(datagramMuxer, logger) return &QUICConnection{ - session: session, - httpProxy: httpProxy, - logger: observer.log, - sessionManager: sessionManager, + session: session, + httpProxy: httpProxy, + logger: logger, + sessionManager: sessionManager, + controlStreamHandler: controlStreamHandler, }, nil } // Serve starts a QUIC session that begins accepting streams. func (q *QUICConnection) Serve(ctx context.Context) error { + // If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits + // as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this + // connection). + // If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the + // other goroutine as fast as possible. + ctx, cancel := context.WithCancel(ctx) errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { + defer cancel() return q.acceptStream(ctx) }) errGroup.Go(func() error { + defer cancel() return q.sessionManager.Serve(ctx) }) return errGroup.Wait() } func (q *QUICConnection) acceptStream(ctx context.Context) error { + defer q.Close() for { stream, err := q.session.AcceptStream(ctx) if err != nil { // context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional. - if errors.Is(err, context.Canceled) { + if errors.Is(err, context.Canceled) || q.controlStreamHandler.IsStopped() { return nil } return fmt.Errorf("failed to accept QUIC stream: %w", err) diff --git a/connection/quic_test.go b/connection/quic_test.go index 02b616c9..4c2ab20e 100644 --- a/connection/quic_test.go +++ b/connection/quic_test.go @@ -661,7 +661,7 @@ func testQUICConnection(ctx context.Context, udpListenerAddr net.Addr, t *testin originProxy, &tunnelpogs.ConnectionOptions{}, fakeControlStream{}, - NewObserver(&log, &log, false), + &log, ) require.NoError(t, err) return qc diff --git a/datagramsession/manager.go b/datagramsession/manager.go index f934c15f..caf9d4fd 100644 --- a/datagramsession/manager.go +++ b/datagramsession/manager.go @@ -5,6 +5,7 @@ import ( "io" "github.com/google/uuid" + "github.com/lucas-clemente/quic-go" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -50,7 +51,11 @@ func (m *manager) Serve(ctx context.Context) error { for { sessionID, payload, err := m.transport.ReceiveFrom() if err != nil { - return err + if aerr, ok := err.(*quic.ApplicationError); ok && uint64(aerr.ErrorCode) == uint64(quic.NoError) { + return nil + } else { + return err + } } datagram := &newDatagram{ sessionID: sessionID, @@ -69,7 +74,7 @@ func (m *manager) Serve(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return nil case datagram := <-m.datagramChan: m.sendToSession(datagram) case registration := <-m.registrationChan: diff --git a/origin/tunnel.go b/origin/tunnel.go index 183060b9..d13eec8b 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -548,7 +548,7 @@ func ServeQUIC( config.ConnectionConfig.OriginProxy, connOptions, controlStreamHandler, - config.Observer) + connLogger.Logger()) if err != nil { connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection") return err, true @@ -556,11 +556,11 @@ func ServeQUIC( errGroup, serveCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - err := quicConn.Serve(ctx) + err := quicConn.Serve(serveCtx) if err != nil { connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection") } - return fmt.Errorf("Connection with edge closed") + return err }) errGroup.Go(func() error {