From 1ee540a166db395c7b067e44608a76154374b1fb Mon Sep 17 00:00:00 2001 From: Nuno Diegues Date: Mon, 8 Nov 2021 15:43:36 +0000 Subject: [PATCH] TUN-5368: Log connection issues with LogLevel that depends on tunnel state Connections from cloudflared to Cloudflare edge are long lived and may break over time. That is expected for many reasons (ranging from network conditions to operations within Cloudflare edge). Hence, logging that as Error feels too strong and leads to users being concerned that something is failing when it is actually expected. With this change, we wrap logging about connection issues to be aware of the tunnel state: - if the tunnel has no connections active, we log as error - otherwise we log as warning --- metrics/readiness.go | 42 ++++++++-------------------- metrics/readiness_test.go | 6 ++-- origin/conn_aware_logger.go | 42 ++++++++++++++++++++++++++++ origin/supervisor.go | 17 +++++++----- origin/tunnel.go | 46 +++++++++++++++++-------------- tunnelstate/conntracker.go | 55 +++++++++++++++++++++++++++++++++++++ 6 files changed, 148 insertions(+), 60 deletions(-) create mode 100644 origin/conn_aware_logger.go create mode 100644 tunnelstate/conntracker.go diff --git a/metrics/readiness.go b/metrics/readiness.go index 030f71cb..8524bae8 100644 --- a/metrics/readiness.go +++ b/metrics/readiness.go @@ -4,46 +4,32 @@ import ( "encoding/json" "fmt" "net/http" - "sync" conn "github.com/cloudflare/cloudflared/connection" + "github.com/cloudflare/cloudflared/tunnelstate" "github.com/rs/zerolog" ) // ReadyServer serves HTTP 200 if the tunnel can serve traffic. Intended for k8s readiness checks. type ReadyServer struct { - sync.RWMutex - isConnected map[int]bool - log *zerolog.Logger + tracker *tunnelstate.ConnTracker } // NewReadyServer initializes a ReadyServer and starts listening for dis/connection events. func NewReadyServer(log *zerolog.Logger) *ReadyServer { return &ReadyServer{ - isConnected: make(map[int]bool, 0), - log: log, + tracker: tunnelstate.NewConnTracker(log), } } func (rs *ReadyServer) OnTunnelEvent(c conn.Event) { - switch c.EventType { - case conn.Connected: - rs.Lock() - rs.isConnected[int(c.Index)] = true - rs.Unlock() - case conn.Disconnected, conn.Reconnecting, conn.RegisteringTunnel, conn.Unregistering: - rs.Lock() - rs.isConnected[int(c.Index)] = false - rs.Unlock() - default: - rs.log.Error().Msgf("Unknown connection event case %v", c) - } + rs.tracker.OnTunnelEvent(c) } type body struct { - Status int `json:"status"` - ReadyConnections int `json:"readyConnections"` + Status int `json:"status"` + ReadyConnections uint `json:"readyConnections"` } // ServeHTTP responds with HTTP 200 if the tunnel is connected to the edge. @@ -63,15 +49,11 @@ func (rs *ReadyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { // This is the bulk of the logic for ServeHTTP, broken into its own pure function // to make unit testing easy. -func (rs *ReadyServer) makeResponse() (statusCode, readyConnections int) { - statusCode = http.StatusServiceUnavailable - rs.RLock() - defer rs.RUnlock() - for _, connected := range rs.isConnected { - if connected { - statusCode = http.StatusOK - readyConnections++ - } +func (rs *ReadyServer) makeResponse() (statusCode int, readyConnections uint) { + readyConnections = rs.tracker.CountActiveConns() + if readyConnections > 0 { + return http.StatusOK, readyConnections + } else { + return http.StatusServiceUnavailable, readyConnections } - return statusCode, readyConnections } diff --git a/metrics/readiness_test.go b/metrics/readiness_test.go index 5b4d8290..e6578962 100644 --- a/metrics/readiness_test.go +++ b/metrics/readiness_test.go @@ -7,6 +7,8 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + "github.com/cloudflare/cloudflared/tunnelstate" + "github.com/cloudflare/cloudflared/connection" ) @@ -18,7 +20,7 @@ func TestReadyServer_makeResponse(t *testing.T) { name string fields fields wantOK bool - wantReadyConnections int + wantReadyConnections uint }{ { name: "One connection online => HTTP 200", @@ -49,7 +51,7 @@ func TestReadyServer_makeResponse(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { rs := &ReadyServer{ - isConnected: tt.fields.isConnected, + tracker: tunnelstate.MockedConnTracker(tt.fields.isConnected), } gotStatusCode, gotReadyConnections := rs.makeResponse() if tt.wantOK && gotStatusCode != http.StatusOK { diff --git a/origin/conn_aware_logger.go b/origin/conn_aware_logger.go new file mode 100644 index 00000000..b8021121 --- /dev/null +++ b/origin/conn_aware_logger.go @@ -0,0 +1,42 @@ +package origin + +import ( + "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/connection" + "github.com/cloudflare/cloudflared/tunnelstate" +) + +type ConnAwareLogger struct { + tracker *tunnelstate.ConnTracker + logger *zerolog.Logger +} + +func NewConnAwareLogger(logger *zerolog.Logger, observer *connection.Observer) *ConnAwareLogger { + connAwareLogger := &ConnAwareLogger{ + tracker: tunnelstate.NewConnTracker(logger), + logger: logger, + } + + observer.RegisterSink(connAwareLogger.tracker) + + return connAwareLogger +} + +func (c *ConnAwareLogger) ReplaceLogger(logger *zerolog.Logger) *ConnAwareLogger { + return &ConnAwareLogger{ + tracker: c.tracker, + logger: logger, + } +} + +func (c *ConnAwareLogger) ConnAwareLogger() *zerolog.Event { + if c.tracker.CountActiveConns() == 0 { + return c.logger.Error() + } + return c.logger.Warn() +} + +func (c *ConnAwareLogger) Logger() *zerolog.Logger { + return c.logger +} diff --git a/origin/supervisor.go b/origin/supervisor.go index 4508f2ca..304fc3c2 100644 --- a/origin/supervisor.go +++ b/origin/supervisor.go @@ -46,7 +46,7 @@ type Supervisor struct { nextConnectedIndex int nextConnectedSignal chan struct{} - log *zerolog.Logger + log *ConnAwareLogger logTransport *zerolog.Logger reconnectCredentialManager *reconnectCredentialManager @@ -91,7 +91,7 @@ func NewSupervisor(config *TunnelConfig, reconnectCh chan ReconnectSignal, grace edgeIPs: edgeIPs, tunnelErrors: make(chan tunnelError), tunnelsConnecting: map[int]chan struct{}{}, - log: config.Log, + log: NewConnAwareLogger(config.Log, config.Observer), logTransport: config.LogTransport, reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections), useReconnectToken: useReconnectToken, @@ -123,7 +123,7 @@ func (s *Supervisor) Run( if timer, err := s.reconnectCredentialManager.RefreshAuth(ctx, refreshAuthBackoff, s.authenticate); err == nil { refreshAuthBackoffTimer = timer } else { - s.log.Err(err). + s.log.Logger().Err(err). Dur("refreshAuthRetryDuration", refreshAuthRetryDuration). Msgf("supervisor: initial refreshAuth failed, retrying in %v", refreshAuthRetryDuration) refreshAuthBackoffTimer = time.After(refreshAuthRetryDuration) @@ -145,7 +145,7 @@ func (s *Supervisor) Run( case tunnelError := <-s.tunnelErrors: tunnelsActive-- if tunnelError.err != nil && !shuttingDown { - s.log.Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated") + s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated") tunnelsWaiting = append(tunnelsWaiting, tunnelError.index) s.waitForNextTunnel(tunnelError.index) @@ -170,7 +170,7 @@ func (s *Supervisor) Run( case <-refreshAuthBackoffTimer: newTimer, err := s.reconnectCredentialManager.RefreshAuth(ctx, refreshAuthBackoff, s.authenticate) if err != nil { - s.log.Err(err).Msg("supervisor: Authentication failed") + s.log.Logger().Err(err).Msg("supervisor: Authentication failed") // Permanent failure. Leave the `select` without setting the // channel to be non-null, so we'll never hit this case of the `select` again. continue @@ -195,7 +195,7 @@ func (s *Supervisor) initialize( ) error { availableAddrs := s.edgeIPs.AvailableAddrs() if s.config.HAConnections > availableAddrs { - s.log.Info().Msgf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, availableAddrs) + s.log.Logger().Info().Msgf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, availableAddrs) s.config.HAConnections = availableAddrs } @@ -244,6 +244,7 @@ func (s *Supervisor) startFirstTunnel( s.reconnectCredentialManager, s.config, addr, + s.log, firstConnIndex, connectedSignal, s.cloudflaredUUID, @@ -277,6 +278,7 @@ func (s *Supervisor) startFirstTunnel( s.reconnectCredentialManager, s.config, addr, + s.log, firstConnIndex, connectedSignal, s.cloudflaredUUID, @@ -310,6 +312,7 @@ func (s *Supervisor) startTunnel( s.reconnectCredentialManager, s.config, addr, + s.log, uint8(index), connectedSignal, s.cloudflaredUUID, @@ -373,7 +376,7 @@ func (s *Supervisor) authenticate(ctx context.Context, numPreviousAttempts int) if err != nil { return nil, err } - rpcClient := connection.NewTunnelServerClient(ctx, stream, s.log) + rpcClient := connection.NewTunnelServerClient(ctx, stream, s.log.Logger()) defer rpcClient.Close() const arbitraryConnectionID = uint8(0) diff --git a/origin/tunnel.go b/origin/tunnel.go index 3727ce50..247f0f88 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -131,6 +131,7 @@ func ServeTunnelLoop( credentialManager *reconnectCredentialManager, config *TunnelConfig, addr *allregions.EdgeAddr, + connAwareLogger *ConnAwareLogger, connIndex uint8, connectedSignal *signal.Signal, cloudflaredUUID uuid.UUID, @@ -140,7 +141,8 @@ func ServeTunnelLoop( haConnections.Inc() defer haConnections.Dec() - connLog := config.Log.With().Uint8(connection.LogFieldConnIndex, connIndex).Logger() + logger := config.Log.With().Uint8(connection.LogFieldConnIndex, connIndex).Logger() + connLog := connAwareLogger.ReplaceLogger(&logger) protocolFallback := &protocolFallback{ retry.BackoffHandler{MaxRetries: config.Retries}, @@ -160,7 +162,7 @@ func ServeTunnelLoop( for { err, recoverable := ServeTunnel( ctx, - &connLog, + connLog, credentialManager, config, addr, @@ -182,7 +184,7 @@ func ServeTunnelLoop( if !ok { return err } - connLog.Info().Msgf("Retrying connection in up to %s seconds", duration) + connLog.Logger().Info().Msgf("Retrying connection in up to %s seconds", duration) select { case <-ctx.Done(): @@ -192,7 +194,7 @@ func ServeTunnelLoop( case <-protocolFallback.BackoffTimer(): var idleTimeoutError *quic.IdleTimeoutError if !selectNextProtocol( - &connLog, + connLog.Logger(), protocolFallback, config.ProtocolSelector, errors.As(err, &idleTimeoutError), @@ -255,7 +257,7 @@ func selectNextProtocol( // on error returns a flag indicating if error can be retried func ServeTunnel( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, credentialManager *reconnectCredentialManager, config *TunnelConfig, addr *allregions.EdgeAddr, @@ -299,29 +301,29 @@ func ServeTunnel( if err != nil { switch err := err.(type) { case connection.DupConnRegisterTunnelError: - connLog.Err(err).Msg("Unable to establish connection.") + connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection.") // don't retry this connection anymore, let supervisor pick a new address return err, false case connection.ServerRegisterTunnelError: - connLog.Err(err).Msg("Register tunnel error from server side") + connLog.ConnAwareLogger().Err(err).Msg("Register tunnel error from server side") // Don't send registration error return from server to Sentry. They are // logged on server side if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 { - connLog.Error().Msg(activeIncidentsMsg(incidents)) + connLog.ConnAwareLogger().Msg(activeIncidentsMsg(incidents)) } return err.Cause, !err.Permanent case ReconnectSignal: - connLog.Info(). + connLog.Logger().Info(). Uint8(connection.LogFieldConnIndex, connIndex). Msgf("Restarting connection due to reconnect signal in %s", err.Delay) err.DelayBeforeReconnect() return err, true default: if err == context.Canceled { - connLog.Debug().Err(err).Msgf("Serve tunnel error") + connLog.Logger().Debug().Err(err).Msgf("Serve tunnel error") return err, false } - connLog.Err(err).Msgf("Serve tunnel error") + connLog.ConnAwareLogger().Err(err).Msgf("Serve tunnel error") _, permanent := err.(unrecoverableError) return err, !permanent } @@ -331,7 +333,7 @@ func ServeTunnel( func serveTunnel( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, credentialManager *reconnectCredentialManager, config *TunnelConfig, addr *allregions.EdgeAddr, @@ -364,6 +366,7 @@ func serveTunnel( return ServeQUIC(ctx, addr.UDP, config, + connLog, connOptions, controlStream, connIndex, @@ -373,7 +376,7 @@ func serveTunnel( case connection.HTTP2, connection.HTTP2Warp: edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP) if err != nil { - connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge") + connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge") return err, true } @@ -395,7 +398,7 @@ func serveTunnel( default: edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP) if err != nil { - connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge") + connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge") return err, true } @@ -427,7 +430,7 @@ func (r unrecoverableError) Error() string { func ServeH2mux( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, credentialManager *reconnectCredentialManager, config *TunnelConfig, edgeConn net.Conn, @@ -437,7 +440,7 @@ func ServeH2mux( reconnectCh chan ReconnectSignal, gracefulShutdownC <-chan struct{}, ) error { - connLog.Debug().Msgf("Connecting via h2mux") + connLog.Logger().Debug().Msgf("Connecting via h2mux") // Returns error from parsing the origin URL or handshake errors handler, err, recoverable := connection.NewH2muxConnection( config.ConnectionConfig, @@ -474,7 +477,7 @@ func ServeH2mux( func ServeHTTP2( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, config *TunnelConfig, tlsServerConn net.Conn, connOptions *tunnelpogs.ConnectionOptions, @@ -483,7 +486,7 @@ func ServeHTTP2( gracefulShutdownC <-chan struct{}, reconnectCh chan ReconnectSignal, ) error { - connLog.Debug().Msgf("Connecting via http2") + connLog.Logger().Debug().Msgf("Connecting via http2") h2conn := connection.NewHTTP2Connection( tlsServerConn, config.ConnectionConfig, @@ -515,6 +518,7 @@ func ServeQUIC( ctx context.Context, edgeAddr *net.UDPAddr, config *TunnelConfig, + connLogger *ConnAwareLogger, connOptions *tunnelpogs.ConnectionOptions, controlStreamHandler connection.ControlStreamHandler, connIndex uint8, @@ -528,7 +532,7 @@ func ServeQUIC( MaxIncomingStreams: connection.MaxConcurrentStreams, MaxIncomingUniStreams: connection.MaxConcurrentStreams, KeepAlive: true, - Tracer: quicpogs.NewClientTracer(config.Log, connIndex), + Tracer: quicpogs.NewClientTracer(connLogger.Logger(), connIndex), } for { select { @@ -545,7 +549,7 @@ func ServeQUIC( controlStreamHandler, config.Observer) if err != nil { - config.Log.Error().Msgf("Failed to create new quic connection, err: %v", err) + connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection") return err, true } @@ -553,7 +557,7 @@ func ServeQUIC( errGroup.Go(func() error { err := quicConn.Serve(ctx) if err != nil { - config.Log.Error().Msgf("Failed to serve quic connection, err: %v", err) + connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection") } return fmt.Errorf("Connection with edge closed") }) diff --git a/tunnelstate/conntracker.go b/tunnelstate/conntracker.go new file mode 100644 index 00000000..06fb176f --- /dev/null +++ b/tunnelstate/conntracker.go @@ -0,0 +1,55 @@ +package tunnelstate + +import ( + "sync" + + "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/connection" +) + +type ConnTracker struct { + sync.RWMutex + isConnected map[int]bool + log *zerolog.Logger +} + +func NewConnTracker(log *zerolog.Logger) *ConnTracker { + return &ConnTracker{ + isConnected: make(map[int]bool, 0), + log: log, + } +} + +func MockedConnTracker(mocked map[int]bool) *ConnTracker { + return &ConnTracker{ + isConnected: mocked, + } +} + +func (ct *ConnTracker) OnTunnelEvent(c connection.Event) { + switch c.EventType { + case connection.Connected: + ct.Lock() + ct.isConnected[int(c.Index)] = true + ct.Unlock() + case connection.Disconnected, connection.Reconnecting, connection.RegisteringTunnel, connection.Unregistering: + ct.Lock() + ct.isConnected[int(c.Index)] = false + ct.Unlock() + default: + ct.log.Error().Msgf("Unknown connection event case %v", c) + } +} + +func (ct *ConnTracker) CountActiveConns() uint { + ct.RLock() + defer ct.RUnlock() + active := uint(0) + for _, connected := range ct.isConnected { + if connected { + active++ + } + } + return active +}