From db0562c7b8aee8c7432a22286add968480e31525 Mon Sep 17 00:00:00 2001 From: Igor Postelnik Date: Wed, 20 Jan 2021 11:52:35 -0600 Subject: [PATCH] Fixed connection error handling by removing duplicated errors, standardizing on non-pointer error types --- connection/errors.go | 47 ++++++++++++++------------------------ connection/h2mux.go | 2 +- connection/rpc.go | 6 ++--- h2mux/muxmetrics.go | 2 +- origin/external_control.go | 4 ++-- origin/tunnel.go | 45 ++++++++---------------------------- 6 files changed, 34 insertions(+), 72 deletions(-) diff --git a/connection/errors.go b/connection/errors.go index 935698bf..9d095c93 100644 --- a/connection/errors.go +++ b/connection/errors.go @@ -4,64 +4,51 @@ import ( "github.com/cloudflare/cloudflared/edgediscovery" "github.com/cloudflare/cloudflared/h2mux" tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" - "github.com/prometheus/client_golang/prometheus" ) const ( DuplicateConnectionError = "EDUPCONN" ) -// RegisterTunnel error from client -type clientRegisterTunnelError struct { - cause error -} - -func newRPCError(cause error, counter *prometheus.CounterVec, name rpcName) clientRegisterTunnelError { - counter.WithLabelValues(cause.Error(), string(name)).Inc() - return clientRegisterTunnelError{cause: cause} -} - -func (e clientRegisterTunnelError) Error() string { - return e.cause.Error() -} - type DupConnRegisterTunnelError struct{} -var errDuplicationConnection = &DupConnRegisterTunnelError{} +var errDuplicationConnection = DupConnRegisterTunnelError{} func (e DupConnRegisterTunnelError) Error() string { return "already connected to this server, trying another address" } // RegisterTunnel error from server -type serverRegisterTunnelError struct { - cause error - permanent bool +type ServerRegisterTunnelError struct { + Cause error + Permanent bool } -func (e serverRegisterTunnelError) Error() string { - return e.cause.Error() +func (e ServerRegisterTunnelError) Error() string { + return e.Cause.Error() } -func serverRegistrationErrorFromRPC(err error) *serverRegisterTunnelError { +func serverRegistrationErrorFromRPC(err error) ServerRegisterTunnelError { if retryable, ok := err.(*tunnelpogs.RetryableError); ok { - return &serverRegisterTunnelError{ - cause: retryable.Unwrap(), - permanent: false, + return ServerRegisterTunnelError{ + Cause: retryable.Unwrap(), + Permanent: false, } } - return &serverRegisterTunnelError{ - cause: err, - permanent: true, + return ServerRegisterTunnelError{ + Cause: err, + Permanent: true, } } -type muxerShutdownError struct{} +type MuxerShutdownError struct{} -func (e muxerShutdownError) Error() string { +func (e MuxerShutdownError) Error() string { return "muxer shutdown" } +var errMuxerStopped = MuxerShutdownError{} + func isHandshakeErrRecoverable(err error, connIndex uint8, observer *Observer) bool { log := observer.log.With(). Uint8(LogFieldConnIndex, connIndex). diff --git a/connection/h2mux.go b/connection/h2mux.go index e1181440..b627c305 100644 --- a/connection/h2mux.go +++ b/connection/h2mux.go @@ -143,7 +143,7 @@ func (h *h2muxConnection) serveMuxer(ctx context.Context) error { // here to notify other routines to stop err := h.muxer.Serve(ctx) if err == nil { - return muxerShutdownError{} + return errMuxerStopped } return err } diff --git a/connection/rpc.go b/connection/rpc.go index 6aa5418d..514d69db 100644 --- a/connection/rpc.go +++ b/connection/rpc.go @@ -209,9 +209,9 @@ func (h *h2muxConnection) processRegisterTunnelError(err tunnelpogs.TunnelRegist return errDuplicationConnection } h.observer.metrics.regFail.WithLabelValues("server_error", string(name)).Inc() - return serverRegisterTunnelError{ - cause: err, - permanent: err.IsPermanent(), + return ServerRegisterTunnelError{ + Cause: err, + Permanent: err.IsPermanent(), } } diff --git a/h2mux/muxmetrics.go b/h2mux/muxmetrics.go index db0355d5..3423bde0 100644 --- a/h2mux/muxmetrics.go +++ b/h2mux/muxmetrics.go @@ -142,7 +142,7 @@ func (updater *muxMetricsUpdaterImpl) run(log *zerolog.Logger) error { for { select { case <-updater.abortChan: - log.Info().Msgf("mux - metrics: Stopping mux metrics updater") + log.Debug().Msgf("mux - metrics: Stopping mux metrics updater") return nil case roundTripMeasurement := <-updater.updateRTTChan: go updater.rttData.update(roundTripMeasurement) diff --git a/origin/external_control.go b/origin/external_control.go index d59759ed..cd9ef364 100644 --- a/origin/external_control.go +++ b/origin/external_control.go @@ -10,11 +10,11 @@ type ReconnectSignal struct { } // Error allows us to use ReconnectSignal as a special error to force connection abort -func (r *ReconnectSignal) Error() string { +func (r ReconnectSignal) Error() string { return "reconnect signal" } -func (r *ReconnectSignal) DelayBeforeReconnect() { +func (r ReconnectSignal) DelayBeforeReconnect() { if r.Delay > 0 { time.Sleep(r.Delay) } diff --git a/origin/tunnel.go b/origin/tunnel.go index eb299112..f681dac2 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -63,31 +63,6 @@ type TunnelConfig struct { EdgeTLSConfigs map[connection.Protocol]*tls.Config } -type muxerShutdownError struct{} - -func (e muxerShutdownError) Error() string { - return "muxer shutdown" -} - -// RegisterTunnel error from server -type serverRegisterTunnelError struct { - cause error - permanent bool -} - -func (e serverRegisterTunnelError) Error() string { - return e.cause.Error() -} - -// RegisterTunnel error from client -type clientRegisterTunnelError struct { - cause error -} - -func (e clientRegisterTunnelError) Error() string { - return e.cause.Error() -} - func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string, uuid uuid.UUID) *tunnelpogs.RegistrationOptions { policy := tunnelrpc.ExistingTunnelPolicy_balance if c.HAConnections <= 1 && c.LBPool == "" { @@ -348,24 +323,24 @@ func ServeH2mux( err = errGroup.Wait() if err != nil { switch err := err.(type) { - case *connection.DupConnRegisterTunnelError: - // don't retry this connection anymore, let supervisor pick new a address + case connection.DupConnRegisterTunnelError: + // don't retry this connection anymore, let supervisor pick a new address return err, false - case *serverRegisterTunnelError: + case connection.ServerRegisterTunnelError: log.Err(err).Msg("Register tunnel error from server side") // Don't send registration error return from server to Sentry. They are // logged on server side if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 { log.Error().Msg(activeIncidentsMsg(incidents)) } - return err.cause, !err.permanent - case *clientRegisterTunnelError: - log.Err(err).Msg("Register tunnel error on client side") + return err.Cause, !err.Permanent + case connection.MuxerShutdownError: + if handler.StoppedGracefully() { + return nil, false + } + log.Info().Msg("Unexpected muxer shutdown") return err, true - case *muxerShutdownError: - log.Info().Msg("Muxer shutdown") - return err, true - case *ReconnectSignal: + case ReconnectSignal: log.Info(). Uint8(connection.LogFieldConnIndex, connIndex). Msgf("Restarting connection due to reconnect signal in %s", err.Delay)