TUN-5118: Quic connection now detects duplicate connections similar to http2

This commit is contained in:
Sudarsan Reddy 2021-09-21 07:11:36 +01:00
parent e2b18364f4
commit fd14bf440b
3 changed files with 124 additions and 84 deletions

View File

@ -57,7 +57,11 @@ func NewQUICConnection(
return nil, errors.Wrap(err, "failed to open a registration stream") return nil, errors.Wrap(err, "failed to open a registration stream")
} }
go controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions) err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions)
if err != nil {
// Not wrapping error here to be consistent with the http2 message.
return nil, err
}
return &QUICConnection{ return &QUICConnection{
session: session, session: session,
@ -74,6 +78,10 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
for { for {
stream, err := q.session.AcceptStream(ctx) stream, err := q.session.AcceptStream(ctx)
if err != nil { if err != nil {
// context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
if errors.Is(err, context.Canceled) {
return nil
}
return errors.Wrap(err, "failed to accept QUIC stream") return errors.Wrap(err, "failed to accept QUIC stream")
} }
go func() { go func() {

View File

@ -16,7 +16,6 @@ import (
"os" "os"
"sync" "sync"
"testing" "testing"
"time"
"github.com/gobwas/ws/wsutil" "github.com/gobwas/ws/wsutil"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
@ -27,6 +26,7 @@ import (
quicpogs "github.com/cloudflare/cloudflared/quic" quicpogs "github.com/cloudflare/cloudflared/quic"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs" "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
) )
// TestQUICServer tests if a quic server accepts and responds to a quic client with the acceptance protocol. // TestQUICServer tests if a quic server accepts and responds to a quic client with the acceptance protocol.
@ -158,21 +158,7 @@ func TestQUICServer(t *testing.T) {
) )
}() }()
rpcClientFactory := mockRPCClientFactory{ controlStream := fakeControlStream{}
registered: make(chan struct{}),
unregistered: make(chan struct{}),
}
obs := NewObserver(&log, &log, false)
controlStream := NewControlStream(
obs,
mockConnectedFuse{},
&NamedTunnelConfig{},
1,
rpcClientFactory.newMockRPCClient,
nil,
1*time.Second,
)
qC, err := NewQUICConnection( qC, err := NewQUICConnection(
ctx, ctx,
@ -188,17 +174,20 @@ func TestQUICServer(t *testing.T) {
go qC.Serve(ctx) go qC.Serve(ctx)
wg.Wait() wg.Wait()
select {
case <-rpcClientFactory.registered:
break //ok
case <-time.Tick(time.Second):
t.Fatal("timeout out waiting for registration")
}
cancel() cancel()
}) })
} }
}
type fakeControlStream struct {
ControlStreamHandler
}
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error {
return nil
}
func (fakeControlStream) IsStopped() bool {
return true
} }
func quicServer( func quicServer(

View File

@ -30,6 +30,8 @@ const (
dialTimeout = 15 * time.Second dialTimeout = 15 * time.Second
FeatureSerializedHeaders = "serialized_headers" FeatureSerializedHeaders = "serialized_headers"
FeatureQuickReconnects = "quick_reconnects" FeatureQuickReconnects = "quick_reconnects"
quicHandshakeIdleTimeout = 5 * time.Second
quicMaxIdleTimeout = 15 * time.Second
) )
type rpcName string type rpcName string
@ -271,68 +273,21 @@ func ServeTunnel(
}() }()
defer config.Observer.SendDisconnect(connIndex) defer config.Observer.SendDisconnect(connIndex)
err, recoverable = serveTunnel(
connectedFuse := &connectedFuse{ ctx,
fuse: fuse, connLog,
backoff: backoff, credentialManager,
} config,
addr,
controlStream := connection.NewControlStream(
config.Observer,
connectedFuse,
config.NamedTunnel,
connIndex, connIndex,
nil, fuse,
backoff,
cloudflaredUUID,
reconnectCh,
protocol,
gracefulShutdownC, gracefulShutdownC,
config.ConnectionConfig.GracePeriod,
) )
if protocol == connection.QUIC {
connOptions := config.ConnectionOptions(addr.UDP.String(), uint8(backoff.Retries()))
return ServeQUIC(ctx,
addr.UDP,
config,
connOptions,
controlStream,
connectedFuse,
reconnectCh,
gracefulShutdownC)
}
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
if err != nil {
connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge")
return err, true
}
if protocol == connection.HTTP2 {
connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries()))
err = ServeHTTP2(
ctx,
connLog,
config,
edgeConn,
connOptions,
controlStream,
connIndex,
gracefulShutdownC,
reconnectCh,
)
} else {
err = ServeH2mux(
ctx,
connLog,
credentialManager,
config,
edgeConn,
connIndex,
connectedFuse,
cloudflaredUUID,
reconnectCh,
gracefulShutdownC,
)
}
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case connection.DupConnRegisterTunnelError: case connection.DupConnRegisterTunnelError:
@ -366,6 +321,94 @@ func ServeTunnel(
return nil, false return nil, false
} }
func serveTunnel(
ctx context.Context,
connLog *zerolog.Logger,
credentialManager *reconnectCredentialManager,
config *TunnelConfig,
addr *allregions.EdgeAddr,
connIndex uint8,
fuse *h2mux.BooleanFuse,
backoff *protocolFallback,
cloudflaredUUID uuid.UUID,
reconnectCh chan ReconnectSignal,
protocol connection.Protocol,
gracefulShutdownC <-chan struct{},
) (err error, recoverable bool) {
connectedFuse := &connectedFuse{
fuse: fuse,
backoff: backoff,
}
controlStream := connection.NewControlStream(
config.Observer,
connectedFuse,
config.NamedTunnel,
connIndex,
nil,
gracefulShutdownC,
config.ConnectionConfig.GracePeriod,
)
switch protocol {
case connection.QUIC:
connOptions := config.ConnectionOptions(addr.UDP.String(), uint8(backoff.Retries()))
return ServeQUIC(ctx,
addr.UDP,
config,
connOptions,
controlStream,
connectedFuse,
reconnectCh,
gracefulShutdownC)
case connection.HTTP2:
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
if err != nil {
connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge")
return err, true
}
connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries()))
if err := ServeHTTP2(
ctx,
connLog,
config,
edgeConn,
connOptions,
controlStream,
connIndex,
gracefulShutdownC,
reconnectCh,
); err != nil {
return err, false
}
default:
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP)
if err != nil {
connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge")
return err, true
}
if err := ServeH2mux(
ctx,
connLog,
credentialManager,
config,
edgeConn,
connIndex,
connectedFuse,
cloudflaredUUID,
reconnectCh,
gracefulShutdownC,
); err != nil {
return err, false
}
}
return
}
type unrecoverableError struct { type unrecoverableError struct {
err error err error
} }
@ -472,7 +515,8 @@ func ServeQUIC(
) (err error, recoverable bool) { ) (err error, recoverable bool) {
tlsConfig := config.EdgeTLSConfigs[connection.QUIC] tlsConfig := config.EdgeTLSConfigs[connection.QUIC]
quicConfig := &quic.Config{ quicConfig := &quic.Config{
HandshakeIdleTimeout: time.Second * 10, HandshakeIdleTimeout: quicHandshakeIdleTimeout,
MaxIdleTimeout: quicMaxIdleTimeout,
KeepAlive: true, KeepAlive: true,
} }
for { for {
@ -511,7 +555,6 @@ func ServeQUIC(
if err == nil { if err == nil {
return nil, false return nil, false
} }
config.Log.Info().Msg("Reconnecting with the same udp conn")
} }
} }
} }