TUN-3811: Better error reporting on http2 connection termination. Registration errors from control loop are now propagated out of the connection server code. Unified error handling between h2mux and http2 connections so we log and retry errors the same way, regardless of underlying transport.

This commit is contained in:
Igor Postelnik 2021-01-27 07:19:37 -06:00
parent 6cdd20e820
commit a945518404
4 changed files with 126 additions and 96 deletions

View File

@ -41,13 +41,13 @@ func serverRegistrationErrorFromRPC(err error) ServerRegisterTunnelError {
} }
} }
type MuxerShutdownError struct{} type muxerShutdownError struct{}
func (e MuxerShutdownError) Error() string { func (e muxerShutdownError) Error() string {
return "muxer shutdown" return "muxer shutdown"
} }
var errMuxerStopped = MuxerShutdownError{} var errMuxerStopped = muxerShutdownError{}
func isHandshakeErrRecoverable(err error, connIndex uint8, observer *Observer) bool { func isHandshakeErrRecoverable(err error, connIndex uint8, observer *Observer) bool {
log := observer.log.With(). log := observer.log.With().

View File

@ -104,7 +104,15 @@ func (h *h2muxConnection) ServeNamedTunnel(ctx context.Context, namedTunnel *Nam
h.controlLoop(serveCtx, connectedFuse, true) h.controlLoop(serveCtx, connectedFuse, true)
return nil return nil
}) })
return errGroup.Wait()
err := errGroup.Wait()
if err == errMuxerStopped {
if h.stoppedGracefully {
return nil
}
h.observer.log.Info().Uint8(LogFieldConnIndex, h.connIndex).Msg("Unexpected muxer shutdown")
}
return err
} }
func (h *h2muxConnection) ServeClassicTunnel(ctx context.Context, classicTunnel *ClassicTunnelConfig, credentialManager CredentialManager, registrationOptions *tunnelpogs.RegistrationOptions, connectedFuse ConnectedFuse) error { func (h *h2muxConnection) ServeClassicTunnel(ctx context.Context, classicTunnel *ClassicTunnelConfig, credentialManager CredentialManager, registrationOptions *tunnelpogs.RegistrationOptions, connectedFuse ConnectedFuse) error {
@ -136,11 +144,15 @@ func (h *h2muxConnection) ServeClassicTunnel(ctx context.Context, classicTunnel
h.controlLoop(serveCtx, connectedFuse, false) h.controlLoop(serveCtx, connectedFuse, false)
return nil return nil
}) })
return errGroup.Wait()
}
func (h *h2muxConnection) StoppedGracefully() bool { err := errGroup.Wait()
return h.stoppedGracefully if err == errMuxerStopped {
if h.stoppedGracefully {
return nil
}
h.observer.log.Info().Uint8(LogFieldConnIndex, h.connIndex).Msg("Unexpected muxer shutdown")
}
return err
} }
func (h *h2muxConnection) serveMuxer(ctx context.Context) error { func (h *h2muxConnection) serveMuxer(ctx context.Context) error {

View File

@ -34,12 +34,14 @@ type http2Connection struct {
observer *Observer observer *Observer
connIndexStr string connIndexStr string
connIndex uint8 connIndex uint8
wg *sync.WaitGroup
// newRPCClientFunc allows us to mock RPCs during testing // newRPCClientFunc allows us to mock RPCs during testing
newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
activeRequestsWG sync.WaitGroup
connectedFuse ConnectedFuse connectedFuse ConnectedFuse
gracefulShutdownC chan struct{} gracefulShutdownC chan struct{}
stoppedGracefully bool stoppedGracefully bool
controlStreamErr error // result of running control stream handler
} }
func NewHTTP2Connection( func NewHTTP2Connection(
@ -63,7 +65,6 @@ func NewHTTP2Connection(
observer: observer, observer: observer,
connIndexStr: uint8ToString(connIndex), connIndexStr: uint8ToString(connIndex),
connIndex: connIndex, connIndex: connIndex,
wg: &sync.WaitGroup{},
newRPCClientFunc: newRegistrationRPCClient, newRPCClientFunc: newRegistrationRPCClient,
connectedFuse: connectedFuse, connectedFuse: connectedFuse,
gracefulShutdownC: gracefulShutdownC, gracefulShutdownC: gracefulShutdownC,
@ -80,15 +81,20 @@ func (c *http2Connection) Serve(ctx context.Context) error {
Handler: c, Handler: c,
}) })
if !c.stoppedGracefully { switch {
case c.stoppedGracefully:
return nil
case c.controlStreamErr != nil:
return c.controlStreamErr
default:
c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Lost connection with the edge")
return errEdgeConnectionClosed return errEdgeConnectionClosed
} }
return nil
} }
func (c *http2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (c *http2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c.wg.Add(1) c.activeRequestsWG.Add(1)
defer c.wg.Done() defer c.activeRequestsWG.Done()
respWriter := &http2RespWriter{ respWriter := &http2RespWriter{
r: r.Body, r: r.Body,
@ -105,6 +111,7 @@ func (c *http2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if isControlStreamUpgrade(r) { if isControlStreamUpgrade(r) {
respWriter.shouldFlush = true respWriter.shouldFlush = true
err = c.serveControlStream(r.Context(), respWriter) err = c.serveControlStream(r.Context(), respWriter)
c.controlStreamErr = err
} else if isWebsocketUpgrade(r) { } else if isWebsocketUpgrade(r) {
respWriter.shouldFlush = true respWriter.shouldFlush = true
stripWebsocketUpgradeHeader(r) stripWebsocketUpgradeHeader(r)
@ -118,10 +125,6 @@ func (c *http2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
func (c *http2Connection) StoppedGracefully() bool {
return c.stoppedGracefully
}
func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *http2RespWriter) error { func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *http2RespWriter) error {
rpcClient := c.newRPCClientFunc(ctx, respWriter, c.observer.log) rpcClient := c.newRPCClientFunc(ctx, respWriter, c.observer.log)
defer rpcClient.Close() defer rpcClient.Close()
@ -146,7 +149,7 @@ func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *ht
func (c *http2Connection) close() { func (c *http2Connection) close() {
// Wait for all serve HTTP handlers to return // Wait for all serve HTTP handlers to return
c.wg.Wait() c.activeRequestsWG.Wait()
c.conn.Close() c.conn.Close()
} }

View File

@ -214,6 +214,7 @@ func waitForBackoff(
config.Observer.SendReconnect(connIndex) config.Observer.SendReconnect(connIndex)
log.Info(). log.Info().
Err(err). Err(err).
Uint8(connection.LogFieldConnIndex, connIndex).
Msgf("Retrying connection in %s seconds", duration) Msgf("Retrying connection in %s seconds", duration)
protobackoff.Backoff(ctx) protobackoff.Backoff(ctx)
@ -238,9 +239,11 @@ func waitForBackoff(
return nil return nil
} }
// ServeTunnel runs a single tunnel connection, returns nil on graceful shutdown,
// on error returns a flag indicating if error can be retried
func ServeTunnel( func ServeTunnel(
ctx context.Context, ctx context.Context,
log *zerolog.Logger, connLong *zerolog.Logger,
credentialManager *reconnectCredentialManager, credentialManager *reconnectCredentialManager,
config *TunnelConfig, config *TunnelConfig,
addr *net.TCPAddr, addr *net.TCPAddr,
@ -275,11 +278,12 @@ func ServeTunnel(
fuse: fuse, fuse: fuse,
backoff: backoff, backoff: backoff,
} }
if protocol == connection.HTTP2 { if protocol == connection.HTTP2 {
connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.retries)) connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.retries))
return ServeHTTP2( err = ServeHTTP2(
ctx, ctx,
log, connLong,
config, config,
edgeConn, edgeConn,
connOptions, connOptions,
@ -288,24 +292,64 @@ func ServeTunnel(
reconnectCh, reconnectCh,
gracefulShutdownC, gracefulShutdownC,
) )
} else {
err = ServeH2mux(
ctx,
connLong,
credentialManager,
config,
edgeConn,
connIndex,
connectedFuse,
cloudflaredUUID,
reconnectCh,
gracefulShutdownC,
)
} }
return ServeH2mux(
ctx, if err != nil {
log, switch err := err.(type) {
credentialManager, case connection.DupConnRegisterTunnelError:
config, // don't retry this connection anymore, let supervisor pick a new address
edgeConn, return err, false
connIndex, case connection.ServerRegisterTunnelError:
connectedFuse, connLong.Err(err).Msg("Register tunnel error from server side")
cloudflaredUUID, // Don't send registration error return from server to Sentry. They are
reconnectCh, // logged on server side
gracefulShutdownC, if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
) connLong.Error().Msg(activeIncidentsMsg(incidents))
}
return err.Cause, !err.Permanent
case ReconnectSignal:
connLong.Info().
Uint8(connection.LogFieldConnIndex, connIndex).
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
err.DelayBeforeReconnect()
return err, true
default:
if err == context.Canceled {
connLong.Debug().Err(err).Msgf("Serve tunnel error")
return err, false
}
connLong.Err(err).Msgf("Serve tunnel error")
_, permanent := err.(unrecoverableError)
return err, !permanent
}
}
return nil, false
}
type unrecoverableError struct {
err error
}
func (r unrecoverableError) Error() string {
return r.err.Error()
} }
func ServeH2mux( func ServeH2mux(
ctx context.Context, ctx context.Context,
log *zerolog.Logger, connLog *zerolog.Logger,
credentialManager *reconnectCredentialManager, credentialManager *reconnectCredentialManager,
config *TunnelConfig, config *TunnelConfig,
edgeConn net.Conn, edgeConn net.Conn,
@ -314,8 +358,8 @@ func ServeH2mux(
cloudflaredUUID uuid.UUID, cloudflaredUUID uuid.UUID,
reconnectCh chan ReconnectSignal, reconnectCh chan ReconnectSignal,
gracefulShutdownC chan struct{}, gracefulShutdownC chan struct{},
) (err error, recoverable bool) { ) error {
config.Log.Debug().Msgf("Connecting via h2mux") connLog.Debug().Msgf("Connecting via h2mux")
// Returns error from parsing the origin URL or handshake errors // Returns error from parsing the origin URL or handshake errors
handler, err, recoverable := connection.NewH2muxConnection( handler, err, recoverable := connection.NewH2muxConnection(
config.ConnectionConfig, config.ConnectionConfig,
@ -326,12 +370,15 @@ func ServeH2mux(
gracefulShutdownC, gracefulShutdownC,
) )
if err != nil { if err != nil {
return err, recoverable if !recoverable {
return unrecoverableError{err}
}
return err
} }
errGroup, serveCtx := errgroup.WithContext(ctx) errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() (err error) { errGroup.Go(func() error {
if config.NamedTunnel != nil { if config.NamedTunnel != nil {
connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(connectedFuse.backoff.retries)) connOptions := config.ConnectionOptions(edgeConn.LocalAddr().String(), uint8(connectedFuse.backoff.retries))
return handler.ServeNamedTunnel(serveCtx, config.NamedTunnel, connOptions, connectedFuse) return handler.ServeNamedTunnel(serveCtx, config.NamedTunnel, connOptions, connectedFuse)
@ -340,49 +387,16 @@ func ServeH2mux(
return handler.ServeClassicTunnel(serveCtx, config.ClassicTunnel, credentialManager, registrationOptions, connectedFuse) return handler.ServeClassicTunnel(serveCtx, config.ClassicTunnel, credentialManager, registrationOptions, connectedFuse)
}) })
errGroup.Go(listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)) errGroup.Go(func() error {
return listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
})
err = errGroup.Wait() return errGroup.Wait()
if err != nil {
switch err := err.(type) {
case connection.DupConnRegisterTunnelError:
// don't retry this connection anymore, let supervisor pick a new address
return err, false
case connection.ServerRegisterTunnelError:
log.Err(err).Msg("Register tunnel error from server side")
// Don't send registration error return from server to Sentry. They are
// logged on server side
if incidents := config.IncidentLookup.ActiveIncidents(); len(incidents) > 0 {
log.Error().Msg(activeIncidentsMsg(incidents))
}
return err.Cause, !err.Permanent
case connection.MuxerShutdownError:
if handler.StoppedGracefully() {
return nil, false
}
log.Info().Msg("Unexpected muxer shutdown")
return err, true
case ReconnectSignal:
log.Info().
Uint8(connection.LogFieldConnIndex, connIndex).
Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
err.DelayBeforeReconnect()
return err, true
default:
if err == context.Canceled {
log.Debug().Err(err).Msgf("Serve tunnel error")
return err, false
}
log.Err(err).Msgf("Serve tunnel error")
return err, true
}
}
return nil, true
} }
func ServeHTTP2( func ServeHTTP2(
ctx context.Context, ctx context.Context,
log *zerolog.Logger, connLog *zerolog.Logger,
config *TunnelConfig, config *TunnelConfig,
tlsServerConn net.Conn, tlsServerConn net.Conn,
connOptions *tunnelpogs.ConnectionOptions, connOptions *tunnelpogs.ConnectionOptions,
@ -390,8 +404,8 @@ func ServeHTTP2(
connectedFuse connection.ConnectedFuse, connectedFuse connection.ConnectedFuse,
reconnectCh chan ReconnectSignal, reconnectCh chan ReconnectSignal,
gracefulShutdownC chan struct{}, gracefulShutdownC chan struct{},
) (err error, recoverable bool) { ) error {
log.Debug().Msgf("Connecting via http2") connLog.Debug().Msgf("Connecting via http2")
h2conn := connection.NewHTTP2Connection( h2conn := connection.NewHTTP2Connection(
tlsServerConn, tlsServerConn,
config.ConnectionConfig, config.ConnectionConfig,
@ -408,25 +422,26 @@ func ServeHTTP2(
return h2conn.Serve(serveCtx) return h2conn.Serve(serveCtx)
}) })
errGroup.Go(listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)) errGroup.Go(func() error {
err := listenReconnect(serveCtx, reconnectCh, gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
_ = tlsServerConn.Close()
}
return err
})
err = errGroup.Wait() return errGroup.Wait()
if err != nil {
return err, true
}
return nil, false
} }
func listenReconnect(ctx context.Context, reconnectCh <-chan ReconnectSignal, gracefulShutdownCh chan struct{}) func() error { func listenReconnect(ctx context.Context, reconnectCh <-chan ReconnectSignal, gracefulShutdownCh chan struct{}) error {
return func() error { select {
select { case reconnect := <-reconnectCh:
case reconnect := <-reconnectCh: return reconnect
return reconnect case <-gracefulShutdownCh:
case <-gracefulShutdownCh: return nil
return nil case <-ctx.Done():
case <-ctx.Done(): return nil
return nil
}
} }
} }