From cf562ef8c8af155d2d765126d711271cba0a988a Mon Sep 17 00:00:00 2001 From: Igor Postelnik Date: Thu, 4 Feb 2021 15:09:17 -0600 Subject: [PATCH] TUN-3635: Send event when unregistering tunnel for gracful shutdown so /ready endpoint reports down status befoe connections finish handling pending requests. --- cmd/cloudflared/ui/launch_ui.go | 13 ++++++------- connection/connection_test.go | 1 - connection/event.go | 2 ++ connection/h2mux_test.go | 3 ++- connection/http2.go | 1 + connection/http2_test.go | 9 ++++++++- connection/observer.go | 4 ++++ connection/observer_test.go | 18 ++++++++++++++++++ connection/rpc.go | 2 ++ metrics/readiness.go | 2 +- metrics/readiness_test.go | 16 ++++++++++++++++ 11 files changed, 60 insertions(+), 11 deletions(-) diff --git a/cmd/cloudflared/ui/launch_ui.go b/cmd/cloudflared/ui/launch_ui.go index 1eb1c534..6ca0deb0 100644 --- a/cmd/cloudflared/ui/launch_ui.go +++ b/cmd/cloudflared/ui/launch_ui.go @@ -15,7 +15,6 @@ import ( type connState struct { location string - state connection.Status } type uiModel struct { @@ -32,6 +31,7 @@ type palette struct { defaultText string disconnected string reconnecting string + unregistered string } func NewUIModel(version, hostname, metricsURL string, ing *ingress.Ingress, haConnections int) *uiModel { @@ -67,6 +67,7 @@ func (data *uiModel) Launch( defaultText: "white", disconnected: "red", reconnecting: "orange", + unregistered: "orange", } app := tview.NewApplication() @@ -128,7 +129,7 @@ func (data *uiModel) Launch( switch event.EventType { case connection.Connected: data.setConnTableCell(event, connTable, palette) - case connection.Disconnected, connection.Reconnecting: + case connection.Disconnected, connection.Reconnecting, connection.Unregistering: data.changeConnStatus(event, connTable, log, palette) case connection.SetURL: tunnelHostText.SetText(event.URL) @@ -167,10 +168,7 @@ func (data *uiModel) changeConnStatus(event connection.Event, table *tview.Table locationState := event.Location - if event.EventType == connection.Disconnected { - connState.state = connection.Disconnected - } else if event.EventType == connection.Reconnecting { - connState.state = connection.Reconnecting + if event.EventType == connection.Reconnecting { locationState = "Reconnecting..." } @@ -196,7 +194,6 @@ func (data *uiModel) setConnTableCell(event connection.Event, table *tview.Table connectionNum := index + 1 // Update slice to keep track of connection location and state in UI table - data.connections[index].state = connection.Connected data.connections[index].location = event.Location // Update text in table cell to show disconnected state @@ -218,6 +215,8 @@ func newCellText(palette palette, connectionNum int, location string, connectedS dotColor = palette.disconnected case connection.Reconnecting: dotColor = palette.reconnecting + case connection.Unregistering: + dotColor = palette.unregistered } return fmt.Sprintf(connFmtString, dotColor, palette.defaultText, connectionNum, location) diff --git a/connection/connection_test.go b/connection/connection_test.go index df14fa91..01f03999 100644 --- a/connection/connection_test.go +++ b/connection/connection_test.go @@ -27,7 +27,6 @@ var ( Scheme: "https", Host: "connectiontest.argotunnel.com", } - testObserver = NewObserver(&log, &log, false) testLargeResp = make([]byte, largeFileSize) ) diff --git a/connection/event.go b/connection/event.go index 64218f91..6afde5bb 100644 --- a/connection/event.go +++ b/connection/event.go @@ -22,4 +22,6 @@ const ( SetURL // RegisteringTunnel means the non-named tunnel is registering its connection. RegisteringTunnel + // We're unregistering tunnel from the edge in preparation for a disconnect + Unregistering ) diff --git a/connection/h2mux_test.go b/connection/h2mux_test.go index 13314ba9..3b82bfbd 100644 --- a/connection/h2mux_test.go +++ b/connection/h2mux_test.go @@ -33,7 +33,7 @@ func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) { edgeMuxChan := make(chan *h2mux.Muxer) go func() { edgeMuxConfig := h2mux.MuxerConfig{ - Log: testObserver.log, + Log: &log, Handler: h2mux.MuxedStreamFunc(func(stream *h2mux.MuxedStream) error { // we only expect RPC traffic in client->edge direction, provide minimal support for mocking require.True(t, stream.IsRPCStream()) @@ -47,6 +47,7 @@ func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) { edgeMuxChan <- edgeMux }() var connIndex = uint8(0) + testObserver := NewObserver(&log, &log, false) h2muxConn, err, _ := NewH2muxConnection(testConfig, testMuxerConfig, originConn, connIndex, testObserver, nil) require.NoError(t, err) return h2muxConn, <-edgeMuxChan diff --git a/connection/http2.go b/connection/http2.go index ab303294..dbadd555 100644 --- a/connection/http2.go +++ b/connection/http2.go @@ -142,6 +142,7 @@ func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *ht c.stoppedGracefully = true } + c.observer.sendUnregisteringEvent(c.connIndex) rpcClient.GracefulShutdown(ctx, c.config.GracePeriod) c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection") return nil diff --git a/connection/http2_test.go b/connection/http2_test.go index 13f9da00..8124ff86 100644 --- a/connection/http2_test.go +++ b/connection/http2_test.go @@ -35,7 +35,7 @@ func newTestHTTP2Connection() (*http2Connection, net.Conn) { testConfig, &NamedTunnelConfig{}, &pogs.ConnectionOptions{}, - testObserver, + NewObserver(&log, &log, false), connIndex, mockConnectedFuse{}, nil, @@ -256,7 +256,9 @@ func TestGracefulShutdownHTTP2(t *testing.T) { registered: make(chan struct{}), unregistered: make(chan struct{}), } + events := &eventCollectorSink{} http2Conn.newRPCClientFunc = rpcClientFactory.newMockRPCClient + http2Conn.observer.RegisterSink(events) shutdownC := make(chan struct{}) http2Conn.gracefulShutdownC = shutdownC @@ -301,6 +303,11 @@ func TestGracefulShutdownHTTP2(t *testing.T) { cancel() wg.Wait() + + events.assertSawEvent(t, Event{ + Index: http2Conn.connIndex, + EventType: Unregistering, + }) } func benchmarkServeHTTP(b *testing.B, test testRequest) { diff --git a/connection/observer.go b/connection/observer.go index 2882fe41..17047051 100644 --- a/connection/observer.go +++ b/connection/observer.go @@ -117,6 +117,10 @@ func (o *Observer) SendReconnect(connIndex uint8) { o.sendEvent(Event{Index: connIndex, EventType: Reconnecting}) } +func (o *Observer) sendUnregisteringEvent(connIndex uint8) { + o.sendEvent(Event{Index: connIndex, EventType: Unregistering}) +} + func (o *Observer) SendDisconnect(connIndex uint8) { o.sendEvent(Event{Index: connIndex, EventType: Disconnected}) } diff --git a/connection/observer_test.go b/connection/observer_test.go index b0387ca8..6a6b521e 100644 --- a/connection/observer_test.go +++ b/connection/observer_test.go @@ -66,3 +66,21 @@ func TestObserverEventsDontBlock(t *testing.T) { mu.Unlock() } } + + +type eventCollectorSink struct { + observedEvents []Event + mu sync.Mutex +} + +func (s *eventCollectorSink) OnTunnelEvent(event Event) { + s.mu.Lock() + defer s.mu.Unlock() + s.observedEvents = append(s.observedEvents, event) +} + +func (s *eventCollectorSink) assertSawEvent(t *testing.T, event Event) { + s.mu.Lock() + defer s.mu.Unlock() + assert.Contains(t, s.observedEvents, event) +} \ No newline at end of file diff --git a/connection/rpc.go b/connection/rpc.go index 74a78339..a1006334 100644 --- a/connection/rpc.go +++ b/connection/rpc.go @@ -291,6 +291,8 @@ func (h *h2muxConnection) registerNamedTunnel( } func (h *h2muxConnection) unregister(isNamedTunnel bool) { + h.observer.sendUnregisteringEvent(h.connIndex) + unregisterCtx, cancel := context.WithTimeout(context.Background(), h.config.GracePeriod) defer cancel() diff --git a/metrics/readiness.go b/metrics/readiness.go index 4cae7592..62d9f8f4 100644 --- a/metrics/readiness.go +++ b/metrics/readiness.go @@ -32,7 +32,7 @@ func (rs *ReadyServer) OnTunnelEvent(c conn.Event) { rs.Lock() rs.isConnected[int(c.Index)] = true rs.Unlock() - case conn.Disconnected, conn.Reconnecting, conn.RegisteringTunnel: + case conn.Disconnected, conn.Reconnecting, conn.RegisteringTunnel, conn.Unregistering: rs.Lock() rs.isConnected[int(c.Index)] = false rs.Unlock() diff --git a/metrics/readiness_test.go b/metrics/readiness_test.go index 013dcf96..5b4d8290 100644 --- a/metrics/readiness_test.go +++ b/metrics/readiness_test.go @@ -107,6 +107,22 @@ func TestReadinessEventHandling(t *testing.T) { assert.NotEqualValues(t, http.StatusOK, code) assert.Zero(t, ready) + // other connected then unregistered => not ok + rs.OnTunnelEvent(connection.Event{ + Index: 1, + EventType: connection.Connected, + }) + code, ready = rs.makeResponse() + assert.EqualValues(t, http.StatusOK, code) + assert.EqualValues(t, 1, ready) + rs.OnTunnelEvent(connection.Event{ + Index: 1, + EventType: connection.Unregistering, + }) + code, ready = rs.makeResponse() + assert.NotEqualValues(t, http.StatusOK, code) + assert.Zero(t, ready) + // other disconnected => not ok rs.OnTunnelEvent(connection.Event{ Index: 1,