From fbe2989f61fcdcecc4062b6c10cdabb571a6d6d7 Mon Sep 17 00:00:00 2001 From: Igor Postelnik Date: Tue, 5 May 2020 17:56:39 -0500 Subject: [PATCH] TUN-2955: Fix connection and goroutine leaks when tunnel conection is terminated on error. Only unregister tunnels that had connected successfully. Close edge connection used to unregister the tunnel. Use buffered channels for error channels where receiver may quit early on context cancellation. --- carrier/carrier.go | 6 +++++- h2mux/h2mux.go | 24 ++++++++++++++++++------ origin/tunnel.go | 7 ++++++- tunnelrpc/pogs/tunnelrpc.go | 1 + watcher/file.go | 6 +++++- 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/carrier/carrier.go b/carrier/carrier.go index 13e02c83..1bdfeb95 100644 --- a/carrier/carrier.go +++ b/carrier/carrier.go @@ -79,7 +79,11 @@ func Serve(remoteConn Connection, listener net.Listener, shutdownC <-chan struct for { conn, err := listener.Accept() if err != nil { - errChan <- err + // don't block if parent goroutine quit early + select { + case errChan <- err: + default: + } return } go serveConnection(remoteConn, conn, options) diff --git a/h2mux/h2mux.go b/h2mux/h2mux.go index 70da0447..6f7d407d 100644 --- a/h2mux/h2mux.go +++ b/h2mux/h2mux.go @@ -206,9 +206,9 @@ func Handshake( initialStreamWindow: m.config.DefaultWindowSize, streamWindowMax: m.config.MaxWindowSize, streamWriteBufferMaxLen: m.config.StreamWriteBufferMaxLen, - r: m.r, - metricsUpdater: m.muxMetricsUpdater, - bytesRead: inBoundCounter, + r: m.r, + metricsUpdater: m.muxMetricsUpdater, + bytesRead: inBoundCounter, } m.muxWriter = &MuxWriter{ f: m.f, @@ -327,7 +327,11 @@ func (m *Muxer) Serve(ctx context.Context) error { m.explicitShutdown.Fuse(false) m.r.Close() m.abort() - ch <- err + // don't block if parent goroutine quit early + select { + case ch <- err: + default: + } }() select { case err := <-ch: @@ -344,7 +348,11 @@ func (m *Muxer) Serve(ctx context.Context) error { m.explicitShutdown.Fuse(false) m.w.Close() m.abort() - ch <- err + // don't block if parent goroutine quit early + select { + case ch <- err: + default: + } }() select { case err := <-ch: @@ -358,7 +366,11 @@ func (m *Muxer) Serve(ctx context.Context) error { ch := make(chan error) go func() { err := m.muxMetricsUpdater.run(m.config.Logger) - ch <- err + // don't block if parent goroutine quit early + select { + case ch <- err: + default: + } }() select { case err := <-ch: diff --git a/origin/tunnel.go b/origin/tunnel.go index a373ee22..42db204b 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -322,7 +322,10 @@ func ServeTunnel( select { case <-serveCtx.Done(): // UnregisterTunnel blocks until the RPC call returns - err := UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger) + var err error + if connectedFuse.Value() { + err = UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger) + } handler.muxer.Shutdown() return err case <-updateMetricsTickC: @@ -519,6 +522,8 @@ func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log // RPC stream open error return err } + defer tunnelServer.Close() + // gracePeriod is encoded in int64 using capnproto return tunnelServer.UnregisterTunnel(ctx, gracePeriod.Nanoseconds()) } diff --git a/tunnelrpc/pogs/tunnelrpc.go b/tunnelrpc/pogs/tunnelrpc.go index 3b8092ed..279c0d09 100644 --- a/tunnelrpc/pogs/tunnelrpc.go +++ b/tunnelrpc/pogs/tunnelrpc.go @@ -521,6 +521,7 @@ type TunnelServer_PogsClient struct { } func (c TunnelServer_PogsClient) Close() error { + c.Client.Close() return c.Conn.Close() } diff --git a/watcher/file.go b/watcher/file.go index 59f2a943..369abe67 100644 --- a/watcher/file.go +++ b/watcher/file.go @@ -30,7 +30,11 @@ func (f *File) Add(filepath string) error { // Shutdown stop the file watching run loop func (f *File) Shutdown() { - f.shutdown <- struct{}{} + // don't block if Start quit early + select { + case f.shutdown <- struct{}{}: + default: + } } // Start is a runloop to watch for files changes from the file paths added from Add()