TUN-5142: Add asynchronous servecontrolstream for QUIC

ServeControlStream accidentally became non-blocking in the last quic
change causing stream to not be returned until a SIGTERM was received.
This change makes ServeControlStream be non-blocking for QUIC streams.
This commit is contained in:
Sudarsan Reddy 2021-09-24 10:31:28 +01:00
parent 6238fd9022
commit 27e1277a3b
4 changed files with 15 additions and 5 deletions

View File

@ -29,7 +29,7 @@ type controlStream struct {
// ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown. // ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
type ControlStreamHandler interface { type ControlStreamHandler interface {
ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, shouldWaitForUnregister bool) error
IsStopped() bool IsStopped() bool
} }
@ -61,6 +61,7 @@ func (c *controlStream) ServeControlStream(
ctx context.Context, ctx context.Context,
rw io.ReadWriteCloser, rw io.ReadWriteCloser,
connOptions *tunnelpogs.ConnectionOptions, connOptions *tunnelpogs.ConnectionOptions,
shouldWaitForUnregister bool,
) error { ) error {
rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log) rpcClient := c.newRPCClientFunc(ctx, rw, c.observer.log)
defer rpcClient.Close() defer rpcClient.Close()
@ -70,6 +71,16 @@ func (c *controlStream) ServeControlStream(
} }
c.connectedFuse.Connected() c.connectedFuse.Connected()
if shouldWaitForUnregister {
c.waitForUnregister(ctx, rpcClient)
} else {
go c.waitForUnregister(ctx, rpcClient)
}
return nil
}
func (c *controlStream) waitForUnregister(ctx context.Context, rpcClient NamedTunnelRPCClient) {
// wait for connection termination or start of graceful shutdown // wait for connection termination or start of graceful shutdown
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -81,7 +92,6 @@ func (c *controlStream) ServeControlStream(
c.observer.sendUnregisteringEvent(c.connIndex) c.observer.sendUnregisteringEvent(c.connIndex)
rpcClient.GracefulShutdown(ctx, c.gracePeriod) rpcClient.GracefulShutdown(ctx, c.gracePeriod)
c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection") c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
return nil
} }
func (c *controlStream) IsStopped() bool { func (c *controlStream) IsStopped() bool {

View File

@ -109,7 +109,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch connType { switch connType {
case TypeControlStream: case TypeControlStream:
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions); err != nil { if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, true); err != nil {
c.controlStreamErr = err c.controlStreamErr = err
c.log.Error().Err(err) c.log.Error().Err(err)
respWriter.WriteErrorResponse() respWriter.WriteErrorResponse()

View File

@ -57,7 +57,7 @@ func NewQUICConnection(
return nil, errors.Wrap(err, "failed to open a registration stream") return nil, errors.Wrap(err, "failed to open a registration stream")
} }
err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions) err = controlStreamHandler.ServeControlStream(ctx, registrationStream, connOptions, false)
if err != nil { if err != nil {
// Not wrapping error here to be consistent with the http2 message. // Not wrapping error here to be consistent with the http2 message.
return nil, err return nil, err

View File

@ -183,7 +183,7 @@ type fakeControlStream struct {
ControlStreamHandler ControlStreamHandler
} }
func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions) error { func (fakeControlStream) ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, shouldWaitForUnregister bool) error {
return nil return nil
} }
func (fakeControlStream) IsStopped() bool { func (fakeControlStream) IsStopped() bool {