diff --git a/metrics/readiness.go b/metrics/readiness.go index 030f71cb..8524bae8 100644 --- a/metrics/readiness.go +++ b/metrics/readiness.go @@ -4,46 +4,32 @@ import ( "encoding/json" "fmt" "net/http" - "sync" conn "github.com/cloudflare/cloudflared/connection" + "github.com/cloudflare/cloudflared/tunnelstate" "github.com/rs/zerolog" ) // ReadyServer serves HTTP 200 if the tunnel can serve traffic. Intended for k8s readiness checks. type ReadyServer struct { - sync.RWMutex - isConnected map[int]bool - log *zerolog.Logger + tracker *tunnelstate.ConnTracker } // NewReadyServer initializes a ReadyServer and starts listening for dis/connection events. func NewReadyServer(log *zerolog.Logger) *ReadyServer { return &ReadyServer{ - isConnected: make(map[int]bool, 0), - log: log, + tracker: tunnelstate.NewConnTracker(log), } } func (rs *ReadyServer) OnTunnelEvent(c conn.Event) { - switch c.EventType { - case conn.Connected: - rs.Lock() - rs.isConnected[int(c.Index)] = true - rs.Unlock() - case conn.Disconnected, conn.Reconnecting, conn.RegisteringTunnel, conn.Unregistering: - rs.Lock() - rs.isConnected[int(c.Index)] = false - rs.Unlock() - default: - rs.log.Error().Msgf("Unknown connection event case %v", c) - } + rs.tracker.OnTunnelEvent(c) } type body struct { - Status int `json:"status"` - ReadyConnections int `json:"readyConnections"` + Status int `json:"status"` + ReadyConnections uint `json:"readyConnections"` } // ServeHTTP responds with HTTP 200 if the tunnel is connected to the edge. @@ -63,15 +49,11 @@ func (rs *ReadyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { // This is the bulk of the logic for ServeHTTP, broken into its own pure function // to make unit testing easy. -func (rs *ReadyServer) makeResponse() (statusCode, readyConnections int) { - statusCode = http.StatusServiceUnavailable - rs.RLock() - defer rs.RUnlock() - for _, connected := range rs.isConnected { - if connected { - statusCode = http.StatusOK - readyConnections++ - } +func (rs *ReadyServer) makeResponse() (statusCode int, readyConnections uint) { + readyConnections = rs.tracker.CountActiveConns() + if readyConnections > 0 { + return http.StatusOK, readyConnections + } else { + return http.StatusServiceUnavailable, readyConnections } - return statusCode, readyConnections } diff --git a/metrics/readiness_test.go b/metrics/readiness_test.go index 5b4d8290..e6578962 100644 --- a/metrics/readiness_test.go +++ b/metrics/readiness_test.go @@ -7,6 +7,8 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + "github.com/cloudflare/cloudflared/tunnelstate" + "github.com/cloudflare/cloudflared/connection" ) @@ -18,7 +20,7 @@ func TestReadyServer_makeResponse(t *testing.T) { name string fields fields wantOK bool - wantReadyConnections int + wantReadyConnections uint }{ { name: "One connection online => HTTP 200", @@ -49,7 +51,7 @@ func TestReadyServer_makeResponse(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { rs := &ReadyServer{ - isConnected: tt.fields.isConnected, + tracker: tunnelstate.MockedConnTracker(tt.fields.isConnected), } gotStatusCode, gotReadyConnections := rs.makeResponse() if tt.wantOK && gotStatusCode != http.StatusOK { diff --git a/origin/conn_aware_logger.go b/origin/conn_aware_logger.go new file mode 100644 index 00000000..b8021121 --- /dev/null +++ b/origin/conn_aware_logger.go @@ -0,0 +1,42 @@ +package origin + +import ( + "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/connection" + "github.com/cloudflare/cloudflared/tunnelstate" +) + +type ConnAwareLogger struct { + tracker *tunnelstate.ConnTracker + logger *zerolog.Logger +} + +func NewConnAwareLogger(logger *zerolog.Logger, observer *connection.Observer) *ConnAwareLogger { + connAwareLogger := &ConnAwareLogger{ + tracker: tunnelstate.NewConnTracker(logger), + logger: logger, + } + + observer.RegisterSink(connAwareLogger.tracker) + + return connAwareLogger +} + +func (c *ConnAwareLogger) ReplaceLogger(logger *zerolog.Logger) *ConnAwareLogger { + return &ConnAwareLogger{ + tracker: c.tracker, + logger: logger, + } +} + +func (c *ConnAwareLogger) ConnAwareLogger() *zerolog.Event { + if c.tracker.CountActiveConns() == 0 { + return c.logger.Error() + } + return c.logger.Warn() +} + +func (c *ConnAwareLogger) Logger() *zerolog.Logger { + return c.logger +} diff --git a/origin/supervisor.go b/origin/supervisor.go index 4508f2ca..304fc3c2 100644 --- a/origin/supervisor.go +++ b/origin/supervisor.go @@ -46,7 +46,7 @@ type Supervisor struct { nextConnectedIndex int nextConnectedSignal chan struct{} - log *zerolog.Logger + log *ConnAwareLogger logTransport *zerolog.Logger reconnectCredentialManager *reconnectCredentialManager @@ -91,7 +91,7 @@ func NewSupervisor(config *TunnelConfig, reconnectCh chan ReconnectSignal, grace edgeIPs: edgeIPs, tunnelErrors: make(chan tunnelError), tunnelsConnecting: map[int]chan struct{}{}, - log: config.Log, + log: NewConnAwareLogger(config.Log, config.Observer), logTransport: config.LogTransport, reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections), useReconnectToken: useReconnectToken, @@ -123,7 +123,7 @@ func (s *Supervisor) Run( if timer, err := s.reconnectCredentialManager.RefreshAuth(ctx, refreshAuthBackoff, s.authenticate); err == nil { refreshAuthBackoffTimer = timer } else { - s.log.Err(err). + s.log.Logger().Err(err). Dur("refreshAuthRetryDuration", refreshAuthRetryDuration). Msgf("supervisor: initial refreshAuth failed, retrying in %v", refreshAuthRetryDuration) refreshAuthBackoffTimer = time.After(refreshAuthRetryDuration) @@ -145,7 +145,7 @@ func (s *Supervisor) Run( case tunnelError := <-s.tunnelErrors: tunnelsActive-- if tunnelError.err != nil && !shuttingDown { - s.log.Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated") + s.log.ConnAwareLogger().Err(tunnelError.err).Int(connection.LogFieldConnIndex, tunnelError.index).Msg("Connection terminated") tunnelsWaiting = append(tunnelsWaiting, tunnelError.index) s.waitForNextTunnel(tunnelError.index) @@ -170,7 +170,7 @@ func (s *Supervisor) Run( case <-refreshAuthBackoffTimer: newTimer, err := s.reconnectCredentialManager.RefreshAuth(ctx, refreshAuthBackoff, s.authenticate) if err != nil { - s.log.Err(err).Msg("supervisor: Authentication failed") + s.log.Logger().Err(err).Msg("supervisor: Authentication failed") // Permanent failure. Leave the `select` without setting the // channel to be non-null, so we'll never hit this case of the `select` again. continue @@ -195,7 +195,7 @@ func (s *Supervisor) initialize( ) error { availableAddrs := s.edgeIPs.AvailableAddrs() if s.config.HAConnections > availableAddrs { - s.log.Info().Msgf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, availableAddrs) + s.log.Logger().Info().Msgf("You requested %d HA connections but I can give you at most %d.", s.config.HAConnections, availableAddrs) s.config.HAConnections = availableAddrs } @@ -244,6 +244,7 @@ func (s *Supervisor) startFirstTunnel( s.reconnectCredentialManager, s.config, addr, + s.log, firstConnIndex, connectedSignal, s.cloudflaredUUID, @@ -277,6 +278,7 @@ func (s *Supervisor) startFirstTunnel( s.reconnectCredentialManager, s.config, addr, + s.log, firstConnIndex, connectedSignal, s.cloudflaredUUID, @@ -310,6 +312,7 @@ func (s *Supervisor) startTunnel( s.reconnectCredentialManager, s.config, addr, + s.log, uint8(index), connectedSignal, s.cloudflaredUUID, @@ -373,7 +376,7 @@ func (s *Supervisor) authenticate(ctx context.Context, numPreviousAttempts int) if err != nil { return nil, err } - rpcClient := connection.NewTunnelServerClient(ctx, stream, s.log) + rpcClient := connection.NewTunnelServerClient(ctx, stream, s.log.Logger()) defer rpcClient.Close() const arbitraryConnectionID = uint8(0) diff --git a/origin/tunnel.go b/origin/tunnel.go index 3727ce50..247f0f88 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -131,6 +131,7 @@ func ServeTunnelLoop( credentialManager *reconnectCredentialManager, config *TunnelConfig, addr *allregions.EdgeAddr, + connAwareLogger *ConnAwareLogger, connIndex uint8, connectedSignal *signal.Signal, cloudflaredUUID uuid.UUID, @@ -140,7 +141,8 @@ func ServeTunnelLoop( haConnections.Inc() defer haConnections.Dec() - connLog := config.Log.With().Uint8(connection.LogFieldConnIndex, connIndex).Logger() + logger := config.Log.With().Uint8(connection.LogFieldConnIndex, connIndex).Logger() + connLog := connAwareLogger.ReplaceLogger(&logger) protocolFallback := &protocolFallback{ retry.BackoffHandler{MaxRetries: config.Retries}, @@ -160,7 +162,7 @@ func ServeTunnelLoop( for { err, recoverable := ServeTunnel( ctx, - &connLog, + connLog, credentialManager, config, addr, @@ -182,7 +184,7 @@ func ServeTunnelLoop( if !ok { return err } - connLog.Info().Msgf("Retrying connection in up to %s seconds", duration) + connLog.Logger().Info().Msgf("Retrying connection in up to %s seconds", duration) select { case <-ctx.Done(): @@ -192,7 +194,7 @@ func ServeTunnelLoop( case <-protocolFallback.BackoffTimer(): var idleTimeoutError *quic.IdleTimeoutError if !selectNextProtocol( - &connLog, + connLog.Logger(), protocolFallback, config.ProtocolSelector, errors.As(err, &idleTimeoutError), @@ -255,7 +257,7 @@ func selectNextProtocol( // on error returns a flag indicating if error can be retried func ServeTunnel( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, credentialManager *reconnectCredentialManager, config *TunnelConfig, addr *allregions.EdgeAddr, @@ -299,29 +301,29 @@ func ServeTunnel( if err != nil { switch err := err.(type) { case connection.DupConnRegisterTunnelError: - connLog.Err(err).Msg("Unable to establish connection.") + connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection.") // don't retry this connection anymore, let supervisor pick a new address return err, false case connection.ServerRegisterTunnelError: - connLog.Err(err).Msg("Register tunnel error from server side") + connLog.ConnAwareLogger().Err(err).Msg("Register tunnel error from server side") // Don't send registration error return from server to Sentry. They are // logged on server side if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 { - connLog.Error().Msg(activeIncidentsMsg(incidents)) + connLog.ConnAwareLogger().Msg(activeIncidentsMsg(incidents)) } return err.Cause, !err.Permanent case ReconnectSignal: - connLog.Info(). + connLog.Logger().Info(). Uint8(connection.LogFieldConnIndex, connIndex). Msgf("Restarting connection due to reconnect signal in %s", err.Delay) err.DelayBeforeReconnect() return err, true default: if err == context.Canceled { - connLog.Debug().Err(err).Msgf("Serve tunnel error") + connLog.Logger().Debug().Err(err).Msgf("Serve tunnel error") return err, false } - connLog.Err(err).Msgf("Serve tunnel error") + connLog.ConnAwareLogger().Err(err).Msgf("Serve tunnel error") _, permanent := err.(unrecoverableError) return err, !permanent } @@ -331,7 +333,7 @@ func ServeTunnel( func serveTunnel( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, credentialManager *reconnectCredentialManager, config *TunnelConfig, addr *allregions.EdgeAddr, @@ -364,6 +366,7 @@ func serveTunnel( return ServeQUIC(ctx, addr.UDP, config, + connLog, connOptions, controlStream, connIndex, @@ -373,7 +376,7 @@ func serveTunnel( case connection.HTTP2, connection.HTTP2Warp: edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP) if err != nil { - connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge") + connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge") return err, true } @@ -395,7 +398,7 @@ func serveTunnel( default: edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, config.EdgeTLSConfigs[protocol], addr.TCP) if err != nil { - connLog.Err(err).Msg("Unable to establish connection with Cloudflare edge") + connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge") return err, true } @@ -427,7 +430,7 @@ func (r unrecoverableError) Error() string { func ServeH2mux( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, credentialManager *reconnectCredentialManager, config *TunnelConfig, edgeConn net.Conn, @@ -437,7 +440,7 @@ func ServeH2mux( reconnectCh chan ReconnectSignal, gracefulShutdownC <-chan struct{}, ) error { - connLog.Debug().Msgf("Connecting via h2mux") + connLog.Logger().Debug().Msgf("Connecting via h2mux") // Returns error from parsing the origin URL or handshake errors handler, err, recoverable := connection.NewH2muxConnection( config.ConnectionConfig, @@ -474,7 +477,7 @@ func ServeH2mux( func ServeHTTP2( ctx context.Context, - connLog *zerolog.Logger, + connLog *ConnAwareLogger, config *TunnelConfig, tlsServerConn net.Conn, connOptions *tunnelpogs.ConnectionOptions, @@ -483,7 +486,7 @@ func ServeHTTP2( gracefulShutdownC <-chan struct{}, reconnectCh chan ReconnectSignal, ) error { - connLog.Debug().Msgf("Connecting via http2") + connLog.Logger().Debug().Msgf("Connecting via http2") h2conn := connection.NewHTTP2Connection( tlsServerConn, config.ConnectionConfig, @@ -515,6 +518,7 @@ func ServeQUIC( ctx context.Context, edgeAddr *net.UDPAddr, config *TunnelConfig, + connLogger *ConnAwareLogger, connOptions *tunnelpogs.ConnectionOptions, controlStreamHandler connection.ControlStreamHandler, connIndex uint8, @@ -528,7 +532,7 @@ func ServeQUIC( MaxIncomingStreams: connection.MaxConcurrentStreams, MaxIncomingUniStreams: connection.MaxConcurrentStreams, KeepAlive: true, - Tracer: quicpogs.NewClientTracer(config.Log, connIndex), + Tracer: quicpogs.NewClientTracer(connLogger.Logger(), connIndex), } for { select { @@ -545,7 +549,7 @@ func ServeQUIC( controlStreamHandler, config.Observer) if err != nil { - config.Log.Error().Msgf("Failed to create new quic connection, err: %v", err) + connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection") return err, true } @@ -553,7 +557,7 @@ func ServeQUIC( errGroup.Go(func() error { err := quicConn.Serve(ctx) if err != nil { - config.Log.Error().Msgf("Failed to serve quic connection, err: %v", err) + connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve quic connection") } return fmt.Errorf("Connection with edge closed") }) diff --git a/tunnelstate/conntracker.go b/tunnelstate/conntracker.go new file mode 100644 index 00000000..06fb176f --- /dev/null +++ b/tunnelstate/conntracker.go @@ -0,0 +1,55 @@ +package tunnelstate + +import ( + "sync" + + "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/connection" +) + +type ConnTracker struct { + sync.RWMutex + isConnected map[int]bool + log *zerolog.Logger +} + +func NewConnTracker(log *zerolog.Logger) *ConnTracker { + return &ConnTracker{ + isConnected: make(map[int]bool, 0), + log: log, + } +} + +func MockedConnTracker(mocked map[int]bool) *ConnTracker { + return &ConnTracker{ + isConnected: mocked, + } +} + +func (ct *ConnTracker) OnTunnelEvent(c connection.Event) { + switch c.EventType { + case connection.Connected: + ct.Lock() + ct.isConnected[int(c.Index)] = true + ct.Unlock() + case connection.Disconnected, connection.Reconnecting, connection.RegisteringTunnel, connection.Unregistering: + ct.Lock() + ct.isConnected[int(c.Index)] = false + ct.Unlock() + default: + ct.log.Error().Msgf("Unknown connection event case %v", c) + } +} + +func (ct *ConnTracker) CountActiveConns() uint { + ct.RLock() + defer ct.RUnlock() + active := uint(0) + for _, connected := range ct.isConnected { + if connected { + active++ + } + } + return active +}