diff --git a/carrier/carrier.go b/carrier/carrier.go index 13e02c83..1bdfeb95 100644 --- a/carrier/carrier.go +++ b/carrier/carrier.go @@ -79,7 +79,11 @@ func Serve(remoteConn Connection, listener net.Listener, shutdownC <-chan struct for { conn, err := listener.Accept() if err != nil { - errChan <- err + // don't block if parent goroutine quit early + select { + case errChan <- err: + default: + } return } go serveConnection(remoteConn, conn, options) diff --git a/h2mux/h2mux.go b/h2mux/h2mux.go index 70da0447..6f7d407d 100644 --- a/h2mux/h2mux.go +++ b/h2mux/h2mux.go @@ -206,9 +206,9 @@ func Handshake( initialStreamWindow: m.config.DefaultWindowSize, streamWindowMax: m.config.MaxWindowSize, streamWriteBufferMaxLen: m.config.StreamWriteBufferMaxLen, - r: m.r, - metricsUpdater: m.muxMetricsUpdater, - bytesRead: inBoundCounter, + r: m.r, + metricsUpdater: m.muxMetricsUpdater, + bytesRead: inBoundCounter, } m.muxWriter = &MuxWriter{ f: m.f, @@ -327,7 +327,11 @@ func (m *Muxer) Serve(ctx context.Context) error { m.explicitShutdown.Fuse(false) m.r.Close() m.abort() - ch <- err + // don't block if parent goroutine quit early + select { + case ch <- err: + default: + } }() select { case err := <-ch: @@ -344,7 +348,11 @@ func (m *Muxer) Serve(ctx context.Context) error { m.explicitShutdown.Fuse(false) m.w.Close() m.abort() - ch <- err + // don't block if parent goroutine quit early + select { + case ch <- err: + default: + } }() select { case err := <-ch: @@ -358,7 +366,11 @@ func (m *Muxer) Serve(ctx context.Context) error { ch := make(chan error) go func() { err := m.muxMetricsUpdater.run(m.config.Logger) - ch <- err + // don't block if parent goroutine quit early + select { + case ch <- err: + default: + } }() select { case err := <-ch: diff --git a/origin/tunnel.go b/origin/tunnel.go index a373ee22..42db204b 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -322,7 +322,10 @@ func ServeTunnel( select { case <-serveCtx.Done(): // UnregisterTunnel blocks until the RPC call returns - err := UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger) + var err error + if connectedFuse.Value() { + err = UnregisterTunnel(handler.muxer, config.GracePeriod, config.TransportLogger) + } handler.muxer.Shutdown() return err case <-updateMetricsTickC: @@ -519,6 +522,8 @@ func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log // RPC stream open error return err } + defer tunnelServer.Close() + // gracePeriod is encoded in int64 using capnproto return tunnelServer.UnregisterTunnel(ctx, gracePeriod.Nanoseconds()) } diff --git a/tunnelrpc/pogs/tunnelrpc.go b/tunnelrpc/pogs/tunnelrpc.go index 3b8092ed..279c0d09 100644 --- a/tunnelrpc/pogs/tunnelrpc.go +++ b/tunnelrpc/pogs/tunnelrpc.go @@ -521,6 +521,7 @@ type TunnelServer_PogsClient struct { } func (c TunnelServer_PogsClient) Close() error { + c.Client.Close() return c.Conn.Close() } diff --git a/watcher/file.go b/watcher/file.go index 59f2a943..369abe67 100644 --- a/watcher/file.go +++ b/watcher/file.go @@ -30,7 +30,11 @@ func (f *File) Add(filepath string) error { // Shutdown stop the file watching run loop func (f *File) Shutdown() { - f.shutdown <- struct{}{} + // don't block if Start quit early + select { + case f.shutdown <- struct{}{}: + default: + } } // Start is a runloop to watch for files changes from the file paths added from Add()