diff --git a/cmd/cloudflared/tunnel/cmd.go b/cmd/cloudflared/tunnel/cmd.go index 3dc9c8a7..6d3deab7 100644 --- a/cmd/cloudflared/tunnel/cmd.go +++ b/cmd/cloudflared/tunnel/cmd.go @@ -612,6 +612,12 @@ func tunnelFlags(shouldHide bool) []cli.Flag { Value: 5, Hidden: true, }), + altsrc.NewIntFlag(&cli.IntFlag{ + Name: "max-edge-addr-retries", + Usage: "Maximum number of times to retry on edge addrs before falling back to a lower protocol", + Value: 8, + Hidden: true, + }), // Note TUN-3758 , we use Int because UInt is not supported with altsrc altsrc.NewIntFlag(&cli.IntFlag{ Name: "retries", diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index 69d272b0..71e8883d 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -376,15 +376,16 @@ func prepareTunnelConfig( Observer: observer, ReportedVersion: info.Version(), // Note TUN-3758 , we use Int because UInt is not supported with altsrc - Retries: uint(c.Int("retries")), - RunFromTerminal: isRunningFromTerminal(), - NamedTunnel: namedTunnel, - ClassicTunnel: classicTunnel, - MuxerConfig: muxerConfig, - ProtocolSelector: protocolSelector, - EdgeTLSConfigs: edgeTLSConfigs, - NeedPQ: needPQ, - PQKexIdx: pqKexIdx, + Retries: uint(c.Int("retries")), + RunFromTerminal: isRunningFromTerminal(), + NamedTunnel: namedTunnel, + ClassicTunnel: classicTunnel, + MuxerConfig: muxerConfig, + ProtocolSelector: protocolSelector, + EdgeTLSConfigs: edgeTLSConfigs, + NeedPQ: needPQ, + PQKexIdx: pqKexIdx, + MaxEdgeAddrRetries: uint8(c.Int("max-edge-addr-retries")), } packetConfig, err := newPacketConfig(c, log) if err != nil { diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index a7a747ca..92d3dc2f 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -14,7 +14,6 @@ import ( "github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/edgediscovery" - "github.com/cloudflare/cloudflared/edgediscovery/allregions" "github.com/cloudflare/cloudflared/h2mux" "github.com/cloudflare/cloudflared/orchestration" "github.com/cloudflare/cloudflared/retry" @@ -94,14 +93,7 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato tracker := tunnelstate.NewConnTracker(config.Log) log := NewConnAwareLogger(config.Log, tracker, config.Observer) - var edgeAddrHandler EdgeAddrHandler - if isStaticEdge { // static edge addresses - edgeAddrHandler = &IPAddrFallback{} - } else if config.EdgeIPVersion == allregions.IPv6Only || config.EdgeIPVersion == allregions.Auto { - edgeAddrHandler = &IPAddrFallback{} - } else { // IPv4Only - edgeAddrHandler = &DefaultAddrFallback{} - } + edgeAddrHandler := NewIPAddrFallback(config.MaxEdgeAddrRetries) edgeTunnelServer := EdgeTunnelServer{ config: config, diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 46e315e4..bba62332 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -41,25 +41,26 @@ const ( ) type TunnelConfig struct { - GracePeriod time.Duration - ReplaceExisting bool - OSArch string - ClientID string - CloseConnOnce *sync.Once // Used to close connectedSignal no more than once - EdgeAddrs []string - Region string - EdgeIPVersion allregions.ConfigIPVersion - HAConnections int - IncidentLookup IncidentLookup - IsAutoupdated bool - LBPool string - Tags []tunnelpogs.Tag - Log *zerolog.Logger - LogTransport *zerolog.Logger - Observer *connection.Observer - ReportedVersion string - Retries uint - RunFromTerminal bool + GracePeriod time.Duration + ReplaceExisting bool + OSArch string + ClientID string + CloseConnOnce *sync.Once // Used to close connectedSignal no more than once + EdgeAddrs []string + Region string + EdgeIPVersion allregions.ConfigIPVersion + HAConnections int + IncidentLookup IncidentLookup + IsAutoupdated bool + LBPool string + Tags []tunnelpogs.Tag + Log *zerolog.Logger + LogTransport *zerolog.Logger + Observer *connection.Observer + ReportedVersion string + Retries uint + MaxEdgeAddrRetries uint8 + RunFromTerminal bool NeedPQ bool @@ -133,6 +134,24 @@ func StartTunnelDaemon( return s.Run(ctx, connectedSignal) } +type ConnectivityError struct { + reachedMaxRetries bool +} + +func NewConnectivityError(hasReachedMaxRetries bool) *ConnectivityError { + return &ConnectivityError{ + reachedMaxRetries: hasReachedMaxRetries, + } +} + +func (e *ConnectivityError) Error() string { + return fmt.Sprintf("connectivity error - reached max retries: %t", e.HasReachedMaxRetries()) +} + +func (e *ConnectivityError) HasReachedMaxRetries() bool { + return e.reachedMaxRetries +} + // EdgeAddrHandler provides a mechanism switch between behaviors in ServeTunnel // for handling the errors when attempting to make edge connections. type EdgeAddrHandler interface { @@ -140,56 +159,47 @@ type EdgeAddrHandler interface { // the edge address should be replaced with a new one. Also, will return if the // error should be recognized as a connectivity error, or otherwise, a general // application error. - ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool) + ShouldGetNewAddress(connIndex uint8, err error) (needsNewAddress bool, connectivityError error) } -// DefaultAddrFallback will always return false for isConnectivityError since this -// handler is a way to provide the legacy behavior in the new edge discovery algorithm. -type DefaultAddrFallback struct { - edgeErrors int -} - -func (f DefaultAddrFallback) ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool) { - switch err.(type) { - case nil: // maintain current IP address - // DupConnRegisterTunnelError should indicate to get a new address immediately - case connection.DupConnRegisterTunnelError: - return true, false - // Try the next address if it was a quic.IdleTimeoutError - case *quic.IdleTimeoutError, - edgediscovery.DialError, - *connection.EdgeQuicDialError: - // Wait for two failures before falling back to a new address - f.edgeErrors++ - if f.edgeErrors >= 2 { - f.edgeErrors = 0 - return true, false - } - default: // maintain current IP address +func NewIPAddrFallback(maxRetries uint8) *ipAddrFallback { + return &ipAddrFallback{ + retriesByConnIndex: make(map[uint8]uint8), + maxRetries: maxRetries, } - return false, false } -// IPAddrFallback will have more conditions to fall back to a new address for certain +// ipAddrFallback will have more conditions to fall back to a new address for certain // edge connection errors. This means that this handler will return true for isConnectivityError // for more cases like duplicate connection register and edge quic dial errors. -type IPAddrFallback struct{} +type ipAddrFallback struct { + m sync.Mutex + retriesByConnIndex map[uint8]uint8 + maxRetries uint8 +} -func (f IPAddrFallback) ShouldGetNewAddress(err error) (needsNewAddress bool, isConnectivityError bool) { +func (f *ipAddrFallback) ShouldGetNewAddress(connIndex uint8, err error) (needsNewAddress bool, connectivityError error) { + f.m.Lock() + defer f.m.Unlock() switch err.(type) { case nil: // maintain current IP address // Try the next address if it was a quic.IdleTimeoutError // DupConnRegisterTunnelError needs to also receive a new ip address case connection.DupConnRegisterTunnelError, *quic.IdleTimeoutError: - return true, false + return true, nil // Network problems should be retried with new address immediately and report // as connectivity error case edgediscovery.DialError, *connection.EdgeQuicDialError: - return true, true + if f.retriesByConnIndex[connIndex] >= f.maxRetries { + f.retriesByConnIndex[connIndex] = 0 + return true, NewConnectivityError(true) + } + f.retriesByConnIndex[connIndex]++ + return true, NewConnectivityError(false) default: // maintain current IP address } - return false, false + return false, nil } type EdgeTunnelServer struct { @@ -238,11 +248,12 @@ func (e *EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, protocolF Uint8(connection.LogFieldConnIndex, connIndex). Logger() connLog := e.connAwareLogger.ReplaceLogger(&logger) + // Each connection to keep its own copy of protocol, because individual connections might fallback // to another protocol when a particular metal doesn't support new protocol // Each connection can also have it's own IP version because individual connections might fallback // to another IP version. - err, recoverable := e.serveTunnel( + err, shouldFallbackProtocol := e.serveTunnel( ctx, connLog, addr, @@ -252,34 +263,39 @@ func (e *EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, protocolF protocolFallback.protocol, ) - // If the connection is recoverable, we want to maintain the same IP - // but backoff a reconnect with some duration. - if recoverable { - duration, ok := protocolFallback.GetMaxBackoffDuration(ctx) - if !ok { - return err - } - - e.config.Observer.SendReconnect(connIndex) - connLog.Logger().Info().Msgf("Retrying connection in up to %s", duration) - } - // Check if the connection error was from an IP issue with the host or // establishing a connection to the edge and if so, rotate the IP address. - yes, hasConnectivityError := e.edgeAddrHandler.ShouldGetNewAddress(err) - if yes { - if _, err := e.edgeAddrs.GetDifferentAddr(int(connIndex), hasConnectivityError); err != nil { + shouldRotateEdgeIP, cErr := e.edgeAddrHandler.ShouldGetNewAddress(connIndex, err) + if shouldRotateEdgeIP { + // rotate IP, but forcing internal state to assign a new IP to connection index. + if _, err := e.edgeAddrs.GetDifferentAddr(int(connIndex), true); err != nil { return err } + + // In addition, if it is a connectivity error, and we have exhausted the configurable maximum edge IPs to rotate, + // then just fallback protocol on next iteration run. + connectivityErr, ok := cErr.(*ConnectivityError) + if ok { + shouldFallbackProtocol = connectivityErr.HasReachedMaxRetries() + } } + // set connection has re-connecting and log the next retrying backoff + duration, ok := protocolFallback.GetMaxBackoffDuration(ctx) + if !ok { + return err + } + e.config.Observer.SendReconnect(connIndex) + connLog.Logger().Info().Msgf("Retrying connection in up to %s", duration) + select { case <-ctx.Done(): return ctx.Err() case <-e.gracefulShutdownC: return nil case <-protocolFallback.BackoffTimer(): - if !recoverable { + // should we fallback protocol? If not, just return. Otherwise, set new protocol for next method call. + if !shouldFallbackProtocol { return err } @@ -426,7 +442,7 @@ func (e *EdgeTunnelServer) serveTunnel( } return err.Cause, !err.Permanent case *connection.EdgeQuicDialError: - return err, true + return err, false case ReconnectSignal: connLog.Logger().Info(). IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).