From fd14bf440b070a915366262f4c6a28d00c8e5740 Mon Sep 17 00:00:00 2001 From: Sudarsan Reddy Date: Tue, 21 Sep 2021 07:11:36 +0100 Subject: [PATCH] TUN-5118: Quic connection now detects duplicate connections similar to http2 --- connection/quic.go | 10 ++- connection/quic_test.go | 35 +++------ origin/tunnel.go | 163 +++++++++++++++++++++++++--------------- 3 files changed, 124 insertions(+), 84 deletions(-) diff --git a/connection/quic.go b/connection/quic.go index e22def0a..a3e349c5 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -57,7 +57,11 @@ func NewQUICConnection( return nil, errors.Wrap(err, "failed to open a registration stream") } - go controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions) + err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions) + if err != nil { + // Not wrapping error here to be consistent with the http2 message. + return nil, err + } return &QUICConnection{ session: session, @@ -74,6 +78,10 @@ func (q *QUICConnection) Serve(ctx context.Context) error { for { stream, err := q.session.AcceptStream(ctx) if err != nil { + // context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional. + if errors.Is(err, context.Canceled) { + return nil + } return errors.Wrap(err, "failed to accept QUIC stream") } go func() { diff --git a/connection/quic_test.go b/connection/quic_test.go index 56046a35..d29f7cdc 100644 --- a/connection/quic_test.go +++ b/connection/quic_test.go @@ -16,7 +16,6 @@ import ( "os" "sync" "testing" - "time" "github.com/gobwas/ws/wsutil" "github.com/lucas-clemente/quic-go" @@ -27,6 +26,7 @@ import ( quicpogs "github.com/cloudflare/cloudflared/quic" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" + tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) // TestQUICServer tests if a quic server accepts and responds to a quic client with the acceptance protocol. @@ -158,21 +158,7 @@ func TestQUICServer(t *testing.T) { ) }() - rpcClientFactory := mockRPCClientFactory{ - registered: make(chan struct{}), - unregistered: make(chan struct{}), - } - - obs := NewObserver(&log, &log, false) - controlStream := NewControlStream( - obs, - mockConnectedFuse{}, - &NamedTunnelConfig{}, - 1, - rpcClientFactory.newMockRPCClient, - nil, - 1*time.Second, - ) + controlStream := fakeControlStream{} qC, err := NewQUICConnection( ctx, @@ -188,17 +174,20 @@ func TestQUICServer(t *testing.T) { go qC.Serve(ctx) wg.Wait() - - select { - case <-rpcClientFactory.registered: - break //ok - case <-time.Tick(time.Second): - t.Fatal("timeout out waiting for registration") - } cancel() }) } +} +type fakeControlStream struct { + ControlStreamHandler +} + +func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error { + return nil +} +func (fakeControlStream) IsStopped() bool { + return true } func quicServer( diff --git a/origin/tunnel.go b/origin/tunnel.go index 4b56a464..798c821c 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -30,6 +30,8 @@ const ( dialTimeout = 15 * time.Second FeatureSerializedHeaders = "serialized_headers" FeatureQuickReconnects = "quick_reconnects" + quicHandshakeIdleTimeout = 5 * time.Second + quicMaxIdleTimeout = 15 * time.Second ) type rpcName string @@ -271,68 +273,21 @@ func ServeTunnel( }() defer config.Observer.SendDisconnect(connIndex) - - connectedFuse := &connectedFuse{ - fuse: fuse, - backoff: backoff, - } - - controlStream := connection.NewControlStream( - config.Observer, - connectedFuse, - config.NamedTunnel, + err, recoverable = serveTunnel( + ctx, + connLog, + credentialManager, + config, + addr, connIndex, - nil, + fuse, + backoff, + cloudflaredUUID, + reconnectCh, + protocol, gracefulShutdownC, - config.ConnectionConfig.GracePeriod, ) - if protocol == connection.QUIC { - connOptions := config.ConnectionOptions(addr.UDP.String(), uint8(backoff.Retries())) - return ServeQUIC(ctx, - addr.UDP, - config, - connOptions, - controlStream, - connectedFuse, - reconnectCh, - gracefulShutdownC) - } - - edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP) - if err != nil { - connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge") - return err, true - } - - if protocol == connection.HTTP2 { - connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries())) - err = ServeHTTP2( - ctx, - connLog, - config, - edgeConn, - connOptions, - controlStream, - connIndex, - gracefulShutdownC, - reconnectCh, - ) - } else { - err = ServeH2mux( - ctx, - connLog, - credentialManager, - config, - edgeConn, - connIndex, - connectedFuse, - cloudflaredUUID, - reconnectCh, - gracefulShutdownC, - ) - } - if err != nil { switch err := err.(type) { case connection.DupConnRegisterTunnelError: @@ -366,6 +321,94 @@ func ServeTunnel( return nil, false } +func serveTunnel( + ctx context.Context, + connLog *zerolog.Logger, + credentialManager *reconnectCredentialManager, + config *TunnelConfig, + addr *allregions.EdgeAddr, + connIndex uint8, + fuse *h2mux.BooleanFuse, + backoff *protocolFallback, + cloudflaredUUID uuid.UUID, + reconnectCh chan ReconnectSignal, + protocol connection.Protocol, + gracefulShutdownC <-chan struct{}, +) (err error, recoverable bool) { + + connectedFuse := &connectedFuse{ + fuse: fuse, + backoff: backoff, + } + controlStream := connection.NewControlStream( + config.Observer, + connectedFuse, + config.NamedTunnel, + connIndex, + nil, + gracefulShutdownC, + config.ConnectionConfig.GracePeriod, + ) + + switch protocol { + case connection.QUIC: + connOptions := config.ConnectionOptions(addr.UDP.String(), uint8(backoff.Retries())) + return ServeQUIC(ctx, + addr.UDP, + config, + connOptions, + controlStream, + connectedFuse, + reconnectCh, + gracefulShutdownC) + + case connection.HTTP2: + edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP) + if err != nil { + connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge") + return err, true + } + + connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries())) + if err := ServeHTTP2( + ctx, + connLog, + config, + edgeConn, + connOptions, + controlStream, + connIndex, + gracefulShutdownC, + reconnectCh, + ); err != nil { + return err, false + } + + default: + edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP) + if err != nil { + connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge") + return err, true + } + + if err := ServeH2mux( + ctx, + connLog, + credentialManager, + config, + edgeConn, + connIndex, + connectedFuse, + cloudflaredUUID, + reconnectCh, + gracefulShutdownC, + ); err != nil { + return err, false + } + } + return +} + type unrecoverableError struct { err error } @@ -472,7 +515,8 @@ func ServeQUIC( ) (err error, recoverable bool) { tlsConfig := config.EdgeTLSConfigs[connection.QUIC] quicConfig := &quic.Config{ - HandshakeIdleTimeout: time.Second * 10, + HandshakeIdleTimeout: quicHandshakeIdleTimeout, + MaxIdleTimeout: quicMaxIdleTimeout, KeepAlive: true, } for { @@ -511,7 +555,6 @@ func ServeQUIC( if err == nil { return nil, false } - config.Log.Info().Msg("Reconnecting with the same udp conn") } } }