cloudflared-mirror/supervisor/tunnel.go

659 lines
19 KiB
Go

package supervisor
import (
"context"
"crypto/tls"
"fmt"
"net"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/quic-go/quic-go"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/edgediscovery/allregions"
"github.com/cloudflare/cloudflared/features"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/management"
"github.com/cloudflare/cloudflared/orchestration"
quicpogs "github.com/cloudflare/cloudflared/quic"
"github.com/cloudflare/cloudflared/retry"
"github.com/cloudflare/cloudflared/signal"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/cloudflare/cloudflared/tunnelstate"
)
const (
dialTimeout = 15 * time.Second
)
type TunnelConfig struct {
GracePeriod time.Duration
ReplaceExisting bool
OSArch string
ClientID string
CloseConnOnce *sync.Once // Used to close connectedSignal no more than once
EdgeAddrs []string
Region string
EdgeIPVersion allregions.ConfigIPVersion
EdgeBindAddr net.IP
HAConnections int
IsAutoupdated bool
LBPool string
Tags []pogs.Tag
Log *zerolog.Logger
LogTransport *zerolog.Logger
Observer *connection.Observer
ReportedVersion string
Retries uint
MaxEdgeAddrRetries uint8
RunFromTerminal bool
NeedPQ bool
NamedTunnel *connection.TunnelProperties
ProtocolSelector connection.ProtocolSelector
EdgeTLSConfigs map[connection.Protocol]*tls.Config
PacketConfig *ingress.GlobalRouterConfig
RPCTimeout time.Duration
WriteStreamTimeout time.Duration
DisableQUICPathMTUDiscovery bool
QUICConnectionLevelFlowControlLimit uint64
QUICStreamLevelFlowControlLimit uint64
FeatureSelector *features.FeatureSelector
}
func (c *TunnelConfig) connectionOptions(originLocalAddr string, numPreviousAttempts uint8) *pogs.ConnectionOptions {
// attempt to parse out origin IP, but don't fail since it's informational field
host, _, _ := net.SplitHostPort(originLocalAddr)
originIP := net.ParseIP(host)
return &pogs.ConnectionOptions{
Client: c.NamedTunnel.Client,
OriginLocalIP: originIP,
ReplaceExisting: c.ReplaceExisting,
CompressionQuality: 0,
NumPreviousAttempts: numPreviousAttempts,
}
}
func (c *TunnelConfig) SupportedFeatures() []string {
supported := []string{features.FeatureSerializedHeaders}
if c.NamedTunnel == nil {
supported = append(supported, features.FeatureQuickReconnects)
}
return supported
}
func StartTunnelDaemon(
ctx context.Context,
config *TunnelConfig,
orchestrator *orchestration.Orchestrator,
connectedSignal *signal.Signal,
reconnectCh chan ReconnectSignal,
graceShutdownC <-chan struct{},
) error {
s, err := NewSupervisor(config, orchestrator, reconnectCh, graceShutdownC)
if err != nil {
return err
}
return s.Run(ctx, connectedSignal)
}
type ConnectivityError struct {
reachedMaxRetries bool
}
func NewConnectivityError(hasReachedMaxRetries bool) *ConnectivityError {
return &ConnectivityError{
reachedMaxRetries: hasReachedMaxRetries,
}
}
func (e *ConnectivityError) Error() string {
return fmt.Sprintf("connectivity error - reached max retries: %t", e.HasReachedMaxRetries())
}
func (e *ConnectivityError) HasReachedMaxRetries() bool {
return e.reachedMaxRetries
}
// EdgeAddrHandler provides a mechanism switch between behaviors in ServeTunnel
// for handling the errors when attempting to make edge connections.
type EdgeAddrHandler interface {
// ShouldGetNewAddress will check the edge connection error and determine if
// the edge address should be replaced with a new one. Also, will return if the
// error should be recognized as a connectivity error, or otherwise, a general
// application error.
ShouldGetNewAddress(connIndex uint8, err error) (needsNewAddress bool, connectivityError error)
}
func NewIPAddrFallback(maxRetries uint8) *ipAddrFallback {
return &ipAddrFallback{
retriesByConnIndex: make(map[uint8]uint8),
maxRetries: maxRetries,
}
}
// ipAddrFallback will have more conditions to fall back to a new address for certain
// edge connection errors. This means that this handler will return true for isConnectivityError
// for more cases like duplicate connection register and edge quic dial errors.
type ipAddrFallback struct {
m sync.Mutex
retriesByConnIndex map[uint8]uint8
maxRetries uint8
}
func (f *ipAddrFallback) ShouldGetNewAddress(connIndex uint8, err error) (needsNewAddress bool, connectivityError error) {
f.m.Lock()
defer f.m.Unlock()
switch err.(type) {
case nil: // maintain current IP address
// Try the next address if it was a quic.IdleTimeoutError
// DupConnRegisterTunnelError needs to also receive a new ip address
case connection.DupConnRegisterTunnelError,
*quic.IdleTimeoutError:
return true, nil
// Network problems should be retried with new address immediately and report
// as connectivity error
case edgediscovery.DialError, *connection.EdgeQuicDialError:
if f.retriesByConnIndex[connIndex] >= f.maxRetries {
f.retriesByConnIndex[connIndex] = 0
return true, NewConnectivityError(true)
}
f.retriesByConnIndex[connIndex]++
return true, NewConnectivityError(false)
default: // maintain current IP address
}
return false, nil
}
type EdgeTunnelServer struct {
config *TunnelConfig
orchestrator *orchestration.Orchestrator
edgeAddrHandler EdgeAddrHandler
edgeAddrs *edgediscovery.Edge
edgeBindAddr net.IP
reconnectCh chan ReconnectSignal
gracefulShutdownC <-chan struct{}
tracker *tunnelstate.ConnTracker
connAwareLogger *ConnAwareLogger
}
type TunnelServer interface {
Serve(ctx context.Context, connIndex uint8, protocolFallback *protocolFallback, connectedSignal *signal.Signal) error
}
func (e *EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, protocolFallback *protocolFallback, connectedSignal *signal.Signal) error {
haConnections.Inc()
defer haConnections.Dec()
connectedFuse := newBooleanFuse()
go func() {
if connectedFuse.Await() {
connectedSignal.Notify()
}
}()
// Ensure the above goroutine will terminate if we return without connecting
defer connectedFuse.Fuse(false)
// Fetch IP address to associated connection index
addr, err := e.edgeAddrs.GetAddr(int(connIndex))
switch err.(type) {
case nil: // no error
case edgediscovery.ErrNoAddressesLeft:
return err
default:
return err
}
logger := e.config.Log.With().
Int(management.EventTypeKey, int(management.Cloudflared)).
IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
Uint8(connection.LogFieldConnIndex, connIndex).
Logger()
connLog := e.connAwareLogger.ReplaceLogger(&logger)
// Each connection to keep its own copy of protocol, because individual connections might fallback
// to another protocol when a particular metal doesn't support new protocol
// Each connection can also have it's own IP version because individual connections might fallback
// to another IP version.
err, shouldFallbackProtocol := e.serveTunnel(
ctx,
connLog,
addr,
connIndex,
connectedFuse,
protocolFallback,
protocolFallback.protocol,
)
// Check if the connection error was from an IP issue with the host or
// establishing a connection to the edge and if so, rotate the IP address.
shouldRotateEdgeIP, cErr := e.edgeAddrHandler.ShouldGetNewAddress(connIndex, err)
if shouldRotateEdgeIP {
// rotate IP, but forcing internal state to assign a new IP to connection index.
if _, err := e.edgeAddrs.GetDifferentAddr(int(connIndex), true); err != nil {
return err
}
// In addition, if it is a connectivity error, and we have exhausted the configurable maximum edge IPs to rotate,
// then just fallback protocol on next iteration run.
connectivityErr, ok := cErr.(*ConnectivityError)
if ok {
shouldFallbackProtocol = connectivityErr.HasReachedMaxRetries()
}
}
// set connection has re-connecting and log the next retrying backoff
duration, ok := protocolFallback.GetMaxBackoffDuration(ctx)
if !ok {
return err
}
e.config.Observer.SendReconnect(connIndex)
connLog.Logger().Info().Msgf("Retrying connection in up to %s", duration)
select {
case <-ctx.Done():
return ctx.Err()
case <-e.gracefulShutdownC:
return nil
case <-protocolFallback.BackoffTimer():
// should we fallback protocol? If not, just return. Otherwise, set new protocol for next method call.
if !shouldFallbackProtocol {
return err
}
// If a single connection has connected with the current protocol, we know we know we don't have to fallback
// to a different protocol.
if e.tracker.HasConnectedWith(e.config.ProtocolSelector.Current()) {
return err
}
if !selectNextProtocol(
connLog.Logger(),
protocolFallback,
e.config.ProtocolSelector,
err,
) {
return err
}
}
return err
}
// protocolFallback is a wrapper around backoffHandler that will try fallback option when backoff reaches
// max retries
type protocolFallback struct {
retry.BackoffHandler
protocol connection.Protocol
inFallback bool
}
func (pf *protocolFallback) reset() {
pf.ResetNow()
pf.inFallback = false
}
func (pf *protocolFallback) fallback(fallback connection.Protocol) {
pf.ResetNow()
pf.protocol = fallback
pf.inFallback = true
}
// selectNextProtocol picks connection protocol for the next retry iteration,
// returns true if it was able to pick the protocol, false if we are out of options and should stop retrying
func selectNextProtocol(
connLog *zerolog.Logger,
protocolBackoff *protocolFallback,
selector connection.ProtocolSelector,
cause error,
) bool {
isQuicBroken := isQuicBroken(cause)
_, hasFallback := selector.Fallback()
if protocolBackoff.ReachedMaxRetries() || (hasFallback && isQuicBroken) {
if isQuicBroken {
connLog.Warn().Msg("If this log occurs persistently, and cloudflared is unable to connect to " +
"Cloudflare Network with `quic` protocol, then most likely your machine/network is getting its egress " +
"UDP to port 7844 (or others) blocked or dropped. Make sure to allow egress connectivity as per " +
"https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/configuration/ports-and-ips/\n" +
"If you are using private routing to this Tunnel, then ICMP, UDP (and Private DNS Resolution) will not work " +
"unless your cloudflared can connect with Cloudflare Network with `quic`.")
}
fallback, hasFallback := selector.Fallback()
if !hasFallback {
return false
}
// Already using fallback protocol, no point to retry
if protocolBackoff.protocol == fallback {
return false
}
connLog.Info().Msgf("Switching to fallback protocol %s", fallback)
protocolBackoff.fallback(fallback)
} else if !protocolBackoff.inFallback {
current := selector.Current()
if protocolBackoff.protocol != current {
protocolBackoff.protocol = current
connLog.Info().Msgf("Changing protocol to %s", current)
}
}
return true
}
func isQuicBroken(cause error) bool {
var idleTimeoutError *quic.IdleTimeoutError
if errors.As(cause, &idleTimeoutError) {
return true
}
var transportError *quic.TransportError
if errors.As(cause, &transportError) && strings.Contains(cause.Error(), "operation not permitted") {
return true
}
return false
}
// ServeTunnel runs a single tunnel connection, returns nil on graceful shutdown,
// on error returns a flag indicating if error can be retried
func (e *EdgeTunnelServer) serveTunnel(
ctx context.Context,
connLog *ConnAwareLogger,
addr *allregions.EdgeAddr,
connIndex uint8,
fuse *booleanFuse,
backoff *protocolFallback,
protocol connection.Protocol,
) (err error, recoverable bool) {
// Treat panics as recoverable errors
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("ServeTunnel: %v", r)
}
err = errors.Wrapf(err, "stack trace: %s", string(debug.Stack()))
recoverable = true
}
}()
defer e.config.Observer.SendDisconnect(connIndex)
err, recoverable = e.serveConnection(
ctx,
connLog,
addr,
connIndex,
fuse,
backoff,
protocol,
)
if err != nil {
switch err := err.(type) {
case connection.DupConnRegisterTunnelError:
connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection.")
// don't retry this connection anymore, let supervisor pick a new address
return err, false
case connection.ServerRegisterTunnelError:
connLog.ConnAwareLogger().Err(err).Msg("Register tunnel error from server side")
// Don't send registration error return from server to Sentry. They are
// logged on server side
return err.Cause, !err.Permanent
case *connection.EdgeQuicDialError:
return err, false
case ReconnectSignal:
connLog.Logger().Info().
IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
Uint8(connection.LogFieldConnIndex, connIndex).
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
err.DelayBeforeReconnect()
return err, true
default:
if err == context.Canceled {
connLog.Logger().Debug().Err(err).Msgf("Serve tunnel error")
return err, false
}
connLog.ConnAwareLogger().Err(err).Msgf("Serve tunnel error")
_, permanent := err.(unrecoverableError)
return err, !permanent
}
}
return nil, false
}
func (e *EdgeTunnelServer) serveConnection(
ctx context.Context,
connLog *ConnAwareLogger,
addr *allregions.EdgeAddr,
connIndex uint8,
fuse *booleanFuse,
backoff *protocolFallback,
protocol connection.Protocol,
) (err error, recoverable bool) {
connectedFuse := &connectedFuse{
fuse: fuse,
backoff: backoff,
}
controlStream := connection.NewControlStream(
e.config.Observer,
connectedFuse,
e.config.NamedTunnel,
connIndex,
addr.UDP.IP,
nil,
e.config.RPCTimeout,
e.gracefulShutdownC,
e.config.GracePeriod,
protocol,
)
switch protocol {
case connection.QUIC:
connOptions := e.config.connectionOptions(addr.UDP.String(), uint8(backoff.Retries()))
return e.serveQUIC(ctx,
addr.UDP,
connLog,
connOptions,
controlStream,
connIndex)
case connection.HTTP2:
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, e.config.EdgeTLSConfigs[protocol], addr.TCP, e.edgeBindAddr)
if err != nil {
connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge")
return err, true
}
connOptions := e.config.connectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries()))
if err := e.serveHTTP2(
ctx,
connLog,
edgeConn,
connOptions,
controlStream,
connIndex,
); err != nil {
return err, false
}
default:
return fmt.Errorf("invalid protocol selected: %s", protocol), false
}
return
}
type unrecoverableError struct {
err error
}
func (r unrecoverableError) Error() string {
return r.err.Error()
}
func (e *EdgeTunnelServer) serveHTTP2(
ctx context.Context,
connLog *ConnAwareLogger,
tlsServerConn net.Conn,
connOptions *pogs.ConnectionOptions,
controlStreamHandler connection.ControlStreamHandler,
connIndex uint8,
) error {
pqMode := e.config.FeatureSelector.PostQuantumMode()
if pqMode == features.PostQuantumStrict {
return unrecoverableError{errors.New("HTTP/2 transport does not support post-quantum")}
}
connLog.Logger().Debug().Msgf("Connecting via http2")
h2conn := connection.NewHTTP2Connection(
tlsServerConn,
e.orchestrator,
connOptions,
e.config.Observer,
connIndex,
controlStreamHandler,
e.config.Log,
)
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return h2conn.Serve(serveCtx)
})
errGroup.Go(func() error {
err := listenReconnect(serveCtx, e.reconnectCh, e.gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
// errgroup will return context canceled for the h2conn.Serve
connLog.Logger().Debug().Msg("Forcefully breaking http2 connection")
}
return err
})
return errGroup.Wait()
}
func (e *EdgeTunnelServer) serveQUIC(
ctx context.Context,
edgeAddr *net.UDPAddr,
connLogger *ConnAwareLogger,
connOptions *pogs.ConnectionOptions,
controlStreamHandler connection.ControlStreamHandler,
connIndex uint8,
) (err error, recoverable bool) {
tlsConfig := e.config.EdgeTLSConfigs[connection.QUIC]
pqMode := e.config.FeatureSelector.PostQuantumMode()
if pqMode == features.PostQuantumStrict || pqMode == features.PostQuantumPrefer {
connOptions.Client.Features = features.Dedup(append(connOptions.Client.Features, features.FeaturePostQuantum))
}
curvePref, err := curvePreference(pqMode, tlsConfig.CurvePreferences)
if err != nil {
return err, true
}
tlsConfig.CurvePreferences = curvePref
// quic-go 0.44 increases the initial packet size to 1280 by default. That breaks anyone running tunnel through WARP
// because WARP MTU is 1280.
var initialPacketSize uint16 = 1252
if edgeAddr.IP.To4() == nil {
initialPacketSize = 1232
}
quicConfig := &quic.Config{
HandshakeIdleTimeout: quicpogs.HandshakeIdleTimeout,
MaxIdleTimeout: quicpogs.MaxIdleTimeout,
KeepAlivePeriod: quicpogs.MaxIdlePingPeriod,
MaxIncomingStreams: quicpogs.MaxIncomingStreams,
MaxIncomingUniStreams: quicpogs.MaxIncomingStreams,
EnableDatagrams: true,
Tracer: quicpogs.NewClientTracer(connLogger.Logger(), connIndex),
DisablePathMTUDiscovery: e.config.DisableQUICPathMTUDiscovery,
MaxConnectionReceiveWindow: e.config.QUICConnectionLevelFlowControlLimit,
MaxStreamReceiveWindow: e.config.QUICStreamLevelFlowControlLimit,
InitialPacketSize: initialPacketSize,
}
quicConn, err := connection.NewQUICConnection(
ctx,
quicConfig,
edgeAddr,
e.edgeBindAddr,
connIndex,
tlsConfig,
e.orchestrator,
connOptions,
controlStreamHandler,
connLogger.Logger(),
e.config.PacketConfig,
e.config.RPCTimeout,
e.config.WriteStreamTimeout,
e.config.GracePeriod,
)
if err != nil {
connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection")
return err, true
}
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
err := quicConn.Serve(serveCtx)
if err != nil {
connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection")
}
return err
})
errGroup.Go(func() error {
err := listenReconnect(serveCtx, e.reconnectCh, e.gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
// errgroup will return context canceled for the quicConn.Serve
connLogger.Logger().Debug().Msg("Forcefully breaking quic connection")
}
return err
})
return errGroup.Wait(), false
}
func listenReconnect(ctx context.Context, reconnectCh <-chan ReconnectSignal, gracefulShutdownCh <-chan struct{}) error {
select {
case reconnect := <-reconnectCh:
return reconnect
case <-gracefulShutdownCh:
return nil
case <-ctx.Done():
return nil
}
}
type connectedFuse struct {
fuse *booleanFuse
backoff *protocolFallback
}
func (cf *connectedFuse) Connected() {
cf.fuse.Fuse(true)
cf.backoff.reset()
}
func (cf *connectedFuse) IsConnected() bool {
return cf.fuse.Value()
}