TUN-2955: Fix connection and goroutine leaks when tunnel conection is terminated on error. Only unregister tunnels that had connected successfully. Close edge connection used to unregister the tunnel. Use buffered channels for error channels where receiver may quit early on context cancellation.

This commit is contained in:
Igor Postelnik 2020-05-05 17:56:39 -05:00 committed by Chung Ting Huang
parent c3fa4552aa
commit fbe2989f61
5 changed files with 35 additions and 9 deletions

View File

@ -79,7 +79,11 @@ func Serve(remoteConn Connection, listener net.Listener, shutdownC <-chan struct
for { for {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
errChan <- err // don't block if parent goroutine quit early
select {
case errChan <- err:
default:
}
return return
} }
go serveConnection(remoteConn, conn, options) go serveConnection(remoteConn, conn, options)

View File

@ -327,7 +327,11 @@ func (m *Muxer) Serve(ctx context.Context) error {
m.explicitShutdown.Fuse(false) m.explicitShutdown.Fuse(false)
m.r.Close() m.r.Close()
m.abort() m.abort()
ch <- err // don't block if parent goroutine quit early
select {
case ch <- err:
default:
}
}() }()
select { select {
case err := <-ch: case err := <-ch:
@ -344,7 +348,11 @@ func (m *Muxer) Serve(ctx context.Context) error {
m.explicitShutdown.Fuse(false) m.explicitShutdown.Fuse(false)
m.w.Close() m.w.Close()
m.abort() m.abort()
ch <- err // don't block if parent goroutine quit early
select {
case ch <- err:
default:
}
}() }()
select { select {
case err := <-ch: case err := <-ch:
@ -358,7 +366,11 @@ func (m *Muxer) Serve(ctx context.Context) error {
ch := make(chan error) ch := make(chan error)
go func() { go func() {
err := m.muxMetricsUpdater.run(m.config.Logger) err := m.muxMetricsUpdater.run(m.config.Logger)
ch <- err // don't block if parent goroutine quit early
select {
case ch <- err:
default:
}
}() }()
select { select {
case err := <-ch: case err := <-ch:

View File

@ -322,7 +322,10 @@ func ServeTunnel(
select { select {
case <-serveCtx.Done(): case <-serveCtx.Done():
// UnregisterTunnel blocks until the RPC call returns // UnregisterTunnel blocks until the RPC call returns
err := UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger) var err error
if connectedFuse.Value() {
err = UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger)
}
handler.muxer.Shutdown() handler.muxer.Shutdown()
return err return err
case <-updateMetricsTickC: case <-updateMetricsTickC:
@ -519,6 +522,8 @@ func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log
// RPC stream open error // RPC stream open error
return err return err
} }
defer tunnelServer.Close()
// gracePeriod is encoded in int64 using capnproto // gracePeriod is encoded in int64 using capnproto
return tunnelServer.UnregisterTunnel(ctx, gracePeriod.Nanoseconds()) return tunnelServer.UnregisterTunnel(ctx, gracePeriod.Nanoseconds())
} }

View File

@ -521,6 +521,7 @@ type TunnelServer_PogsClient struct {
} }
func (c TunnelServer_PogsClient) Close() error { func (c TunnelServer_PogsClient) Close() error {
c.Client.Close()
return c.Conn.Close() return c.Conn.Close()
} }

View File

@ -30,7 +30,11 @@ func (f *File) Add(filepath string) error {
// Shutdown stop the file watching run loop // Shutdown stop the file watching run loop
func (f *File) Shutdown() { func (f *File) Shutdown() {
f.shutdown <- struct{}{} // don't block if Start quit early
select {
case f.shutdown <- struct{}{}:
default:
}
} }
// Start is a runloop to watch for files changes from the file paths added from Add() // Start is a runloop to watch for files changes from the file paths added from Add()