diff --git a/CHANGES.md b/CHANGES.md index ba3cac48..f01f62c6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,7 @@ +## 2024.9.1 +### Bug Fixes +- We fixed a bug related to `--grace-period`. Tunnels that use QUIC as transport weren't abiding by this waiting period before forcefully closing the connections to the edge. From now on, both QUIC and HTTP2 tunnels will wait for either the grace period to end (defaults to 30 seconds) or until the last in-flight request is handled. Users that wish to maintain the previous behavior should set `--grace-period` to 0 if `--protocol` is set to `quic`. This will force `cloudflared` to shutdown as soon as either SIGTERM or SIGINT is received. + ## 2024.2.1 ### Notices - Starting from this version, tunnel diagnostics will be enabled by default. This will allow the engineering team to remotely get diagnostics from cloudflared during debug activities. Users still have the capability to opt-out of this feature by defining `--management-diagnostics=false` (or env `TUNNEL_MANAGEMENT_DIAGNOSTICS`). diff --git a/component-tests/test_termination.py b/component-tests/test_termination.py index 26f4fea4..128d95d6 100644 --- a/component-tests/test_termination.py +++ b/component-tests/test_termination.py @@ -45,9 +45,10 @@ class TestTermination: with connected: connected.wait(self.timeout) # Send signal after the SSE connection is established - self.terminate_by_signal(cloudflared, signal) - self.wait_eyeball_thread( - in_flight_req, self.grace_period + self.timeout) + with self.within_grace_period(): + self.terminate_by_signal(cloudflared, signal) + self.wait_eyeball_thread( + in_flight_req, self.grace_period + self.timeout) # test cloudflared terminates before grace period expires when all eyeball # connections are drained @@ -66,7 +67,7 @@ class TestTermination: with connected: connected.wait(self.timeout) - with self.within_grace_period(): + with self.within_grace_period(has_connection=False): # Send signal after the SSE connection is established self.terminate_by_signal(cloudflared, signal) self.wait_eyeball_thread(in_flight_req, self.grace_period) @@ -78,7 +79,7 @@ class TestTermination: with start_cloudflared( tmp_path, config, cfd_pre_args=["tunnel", "--ha-connections", "1"], new_process=True, capture_output=False) as cloudflared: wait_tunnel_ready(tunnel_url=config.get_url()) - with self.within_grace_period(): + with self.within_grace_period(has_connection=False): self.terminate_by_signal(cloudflared, signal) def terminate_by_signal(self, cloudflared, sig): @@ -92,13 +93,21 @@ class TestTermination: # Using this context asserts logic within the context is executed within grace period @contextmanager - def within_grace_period(self): + def within_grace_period(self, has_connection=True): try: start = time.time() yield finally: + + # If the request takes longer than the grace period then we need to wait at most the grace period. + # If the request fell within the grace period cloudflared can close earlier, but to ensure that it doesn't + # close immediately we add a minimum boundary. If cloudflared shutdown in less than 1s it's likely that + # it shutdown as soon as it received SIGINT. The only way cloudflared can close immediately is if it has no + # in-flight requests + minimum = 1 if has_connection else 0 duration = time.time() - start - assert duration < self.grace_period + # Here we truncate to ensure that we don't fail on minute differences like 10.1 instead of 10 + assert minimum <= int(duration) <= self.grace_period def stream_request(self, config, connected, early_terminate): expected_terminate_message = "502 Bad Gateway" diff --git a/connection/control.go b/connection/control.go index e0bfeae9..94e0d66b 100644 --- a/connection/control.go +++ b/connection/control.go @@ -6,6 +6,8 @@ import ( "net" "time" + "github.com/pkg/errors" + "github.com/cloudflare/cloudflared/management" "github.com/cloudflare/cloudflared/tunnelrpc" tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" @@ -116,27 +118,32 @@ func (c *controlStream) ServeControlStream( } } - c.waitForUnregister(ctx, registrationClient) - return nil + return c.waitForUnregister(ctx, registrationClient) } -func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) { +func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) error { // wait for connection termination or start of graceful shutdown defer registrationClient.Close() + var shutdownError error select { case <-ctx.Done(): + shutdownError = ctx.Err() break case <-c.gracefulShutdownC: c.stoppedGracefully = true } c.observer.sendUnregisteringEvent(c.connIndex) - registrationClient.GracefulShutdown(ctx, c.gracePeriod) + err := registrationClient.GracefulShutdown(ctx, c.gracePeriod) + if err != nil { + return errors.Wrap(err, "Error shutting down control stream") + } c.observer.log.Info(). Int(management.EventTypeKey, int(management.Cloudflared)). Uint8(LogFieldConnIndex, c.connIndex). IPAddr(LogFieldIPAddress, c.edgeAddress). Msg("Unregistered tunnel connection") + return shutdownError } func (c *controlStream) IsStopped() bool { diff --git a/connection/http2_test.go b/connection/http2_test.go index a0ec8b45..92665688 100644 --- a/connection/http2_test.go +++ b/connection/http2_test.go @@ -192,8 +192,9 @@ func (mc mockNamedTunnelRPCClient) RegisterConnection( }, nil } -func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) { +func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error { close(mc.unregistered) + return nil } func (mockNamedTunnelRPCClient) Close() {} diff --git a/connection/quic.go b/connection/quic.go index e1048e3a..b24a6cce 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -83,6 +83,7 @@ type QUICConnection struct { rpcTimeout time.Duration streamWriteTimeout time.Duration + gracePeriod time.Duration } // NewQUICConnection returns a new instance of QUICConnection. @@ -100,6 +101,7 @@ func NewQUICConnection( packetRouterConfig *ingress.GlobalRouterConfig, rpcTimeout time.Duration, streamWriteTimeout time.Duration, + gracePeriod time.Duration, ) (*QUICConnection, error) { udpConn, err := createUDPConnForConnIndex(connIndex, localAddr, logger) if err != nil { @@ -136,6 +138,7 @@ func NewQUICConnection( connIndex: connIndex, rpcTimeout: rpcTimeout, streamWriteTimeout: streamWriteTimeout, + gracePeriod: gracePeriod, }, nil } @@ -158,8 +161,17 @@ func (q *QUICConnection) Serve(ctx context.Context) error { // In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control // stream is already fully registered before the other goroutines can proceed. errGroup.Go(func() error { - defer cancel() - return q.serveControlStream(ctx, controlStream) + // err is equal to nil if we exit due to unregistration. If that happens we want to wait the full + // amount of the grace period, allowing requests to finish before we cancel the context, which will + // make cloudflared exit. + if err := q.serveControlStream(ctx, controlStream); err == nil { + select { + case <-ctx.Done(): + case <-time.Tick(q.gracePeriod): + } + } + cancel() + return err }) errGroup.Go(func() error { defer cancel() diff --git a/connection/quic_test.go b/connection/quic_test.go index 302cb7f9..0a54e345 100644 --- a/connection/quic_test.go +++ b/connection/quic_test.go @@ -855,6 +855,7 @@ func testQUICConnection(udpListenerAddr net.Addr, t *testing.T, index uint8) *QU nil, 15*time.Second, 0*time.Second, + 0*time.Second, ) require.NoError(t, err) return qc diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 85637798..03d7f930 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -604,6 +604,7 @@ func (e *EdgeTunnelServer) serveQUIC( e.config.PacketConfig, e.config.RPCTimeout, e.config.WriteStreamTimeout, + e.config.GracePeriod, ) if err != nil { connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection") diff --git a/tunnelrpc/registration_client.go b/tunnelrpc/registration_client.go index f41819f3..7d945f58 100644 --- a/tunnelrpc/registration_client.go +++ b/tunnelrpc/registration_client.go @@ -23,7 +23,7 @@ type RegistrationClient interface { edgeAddress net.IP, ) (*pogs.ConnectionDetails, error) SendLocalConfiguration(ctx context.Context, config []byte) error - GracefulShutdown(ctx context.Context, gracePeriod time.Duration) + GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error Close() } @@ -79,7 +79,7 @@ func (r *registrationClient) SendLocalConfiguration(ctx context.Context, config return err } -func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) { +func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error { ctx, cancel := context.WithTimeout(ctx, gracePeriod) defer cancel() defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc() @@ -88,7 +88,9 @@ func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod t err := r.client.UnregisterConnection(ctx) if err != nil { metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc() + return err } + return nil } func (r *registrationClient) Close() {