TUN-3635: Send event when unregistering tunnel for gracful shutdown so /ready endpoint reports down status befoe connections finish handling pending requests.

This commit is contained in:
Igor Postelnik 2021-02-04 15:09:17 -06:00
parent 820e0dfe51
commit cf562ef8c8
11 changed files with 60 additions and 11 deletions

View File

@ -15,7 +15,6 @@ import (
type connState struct { type connState struct {
location string location string
state connection.Status
} }
type uiModel struct { type uiModel struct {
@ -32,6 +31,7 @@ type palette struct {
defaultText string defaultText string
disconnected string disconnected string
reconnecting string reconnecting string
unregistered string
} }
func NewUIModel(version, hostname, metricsURL string, ing *ingress.Ingress, haConnections int) *uiModel { func NewUIModel(version, hostname, metricsURL string, ing *ingress.Ingress, haConnections int) *uiModel {
@ -67,6 +67,7 @@ func (data *uiModel) Launch(
defaultText: "white", defaultText: "white",
disconnected: "red", disconnected: "red",
reconnecting: "orange", reconnecting: "orange",
unregistered: "orange",
} }
app := tview.NewApplication() app := tview.NewApplication()
@ -128,7 +129,7 @@ func (data *uiModel) Launch(
switch event.EventType { switch event.EventType {
case connection.Connected: case connection.Connected:
data.setConnTableCell(event, connTable, palette) data.setConnTableCell(event, connTable, palette)
case connection.Disconnected, connection.Reconnecting: case connection.Disconnected, connection.Reconnecting, connection.Unregistering:
data.changeConnStatus(event, connTable, log, palette) data.changeConnStatus(event, connTable, log, palette)
case connection.SetURL: case connection.SetURL:
tunnelHostText.SetText(event.URL) tunnelHostText.SetText(event.URL)
@ -167,10 +168,7 @@ func (data *uiModel) changeConnStatus(event connection.Event, table *tview.Table
locationState := event.Location locationState := event.Location
if event.EventType == connection.Disconnected { if event.EventType == connection.Reconnecting {
connState.state = connection.Disconnected
} else if event.EventType == connection.Reconnecting {
connState.state = connection.Reconnecting
locationState = "Reconnecting..." locationState = "Reconnecting..."
} }
@ -196,7 +194,6 @@ func (data *uiModel) setConnTableCell(event connection.Event, table *tview.Table
connectionNum := index + 1 connectionNum := index + 1
// Update slice to keep track of connection location and state in UI table // Update slice to keep track of connection location and state in UI table
data.connections[index].state = connection.Connected
data.connections[index].location = event.Location data.connections[index].location = event.Location
// Update text in table cell to show disconnected state // Update text in table cell to show disconnected state
@ -218,6 +215,8 @@ func newCellText(palette palette, connectionNum int, location string, connectedS
dotColor = palette.disconnected dotColor = palette.disconnected
case connection.Reconnecting: case connection.Reconnecting:
dotColor = palette.reconnecting dotColor = palette.reconnecting
case connection.Unregistering:
dotColor = palette.unregistered
} }
return fmt.Sprintf(connFmtString, dotColor, palette.defaultText, connectionNum, location) return fmt.Sprintf(connFmtString, dotColor, palette.defaultText, connectionNum, location)

View File

@ -27,7 +27,6 @@ var (
Scheme: "https", Scheme: "https",
Host: "connectiontest.argotunnel.com", Host: "connectiontest.argotunnel.com",
} }
testObserver = NewObserver(&log, &log, false)
testLargeResp = make([]byte, largeFileSize) testLargeResp = make([]byte, largeFileSize)
) )

View File

@ -22,4 +22,6 @@ const (
SetURL SetURL
// RegisteringTunnel means the non-named tunnel is registering its connection. // RegisteringTunnel means the non-named tunnel is registering its connection.
RegisteringTunnel RegisteringTunnel
// We're unregistering tunnel from the edge in preparation for a disconnect
Unregistering
) )

View File

@ -33,7 +33,7 @@ func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
edgeMuxChan := make(chan *h2mux.Muxer) edgeMuxChan := make(chan *h2mux.Muxer)
go func() { go func() {
edgeMuxConfig := h2mux.MuxerConfig{ edgeMuxConfig := h2mux.MuxerConfig{
Log: testObserver.log, Log: &log,
Handler: h2mux.MuxedStreamFunc(func(stream *h2mux.MuxedStream) error { Handler: h2mux.MuxedStreamFunc(func(stream *h2mux.MuxedStream) error {
// we only expect RPC traffic in client->edge direction, provide minimal support for mocking // we only expect RPC traffic in client->edge direction, provide minimal support for mocking
require.True(t, stream.IsRPCStream()) require.True(t, stream.IsRPCStream())
@ -47,6 +47,7 @@ func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
edgeMuxChan <- edgeMux edgeMuxChan <- edgeMux
}() }()
var connIndex = uint8(0) var connIndex = uint8(0)
testObserver := NewObserver(&log, &log, false)
h2muxConn, err, _ := NewH2muxConnection(testConfig, testMuxerConfig, originConn, connIndex, testObserver, nil) h2muxConn, err, _ := NewH2muxConnection(testConfig, testMuxerConfig, originConn, connIndex, testObserver, nil)
require.NoError(t, err) require.NoError(t, err)
return h2muxConn, <-edgeMuxChan return h2muxConn, <-edgeMuxChan

View File

@ -142,6 +142,7 @@ func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *ht
c.stoppedGracefully = true c.stoppedGracefully = true
} }
c.observer.sendUnregisteringEvent(c.connIndex)
rpcClient.GracefulShutdown(ctx, c.config.GracePeriod) rpcClient.GracefulShutdown(ctx, c.config.GracePeriod)
c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection") c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
return nil return nil

View File

@ -35,7 +35,7 @@ func newTestHTTP2Connection() (*http2Connection, net.Conn) {
testConfig, testConfig,
&NamedTunnelConfig{}, &NamedTunnelConfig{},
&pogs.ConnectionOptions{}, &pogs.ConnectionOptions{},
testObserver, NewObserver(&log, &log, false),
connIndex, connIndex,
mockConnectedFuse{}, mockConnectedFuse{},
nil, nil,
@ -256,7 +256,9 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
registered: make(chan struct{}), registered: make(chan struct{}),
unregistered: make(chan struct{}), unregistered: make(chan struct{}),
} }
events := &eventCollectorSink{}
http2Conn.newRPCClientFunc = rpcClientFactory.newMockRPCClient http2Conn.newRPCClientFunc = rpcClientFactory.newMockRPCClient
http2Conn.observer.RegisterSink(events)
shutdownC := make(chan struct{}) shutdownC := make(chan struct{})
http2Conn.gracefulShutdownC = shutdownC http2Conn.gracefulShutdownC = shutdownC
@ -301,6 +303,11 @@ func TestGracefulShutdownHTTP2(t *testing.T) {
cancel() cancel()
wg.Wait() wg.Wait()
events.assertSawEvent(t, Event{
Index: http2Conn.connIndex,
EventType: Unregistering,
})
} }
func benchmarkServeHTTP(b *testing.B, test testRequest) { func benchmarkServeHTTP(b *testing.B, test testRequest) {

View File

@ -117,6 +117,10 @@ func (o *Observer) SendReconnect(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: Reconnecting}) o.sendEvent(Event{Index: connIndex, EventType: Reconnecting})
} }
func (o *Observer) sendUnregisteringEvent(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: Unregistering})
}
func (o *Observer) SendDisconnect(connIndex uint8) { func (o *Observer) SendDisconnect(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: Disconnected}) o.sendEvent(Event{Index: connIndex, EventType: Disconnected})
} }

View File

@ -66,3 +66,21 @@ func TestObserverEventsDontBlock(t *testing.T) {
mu.Unlock() mu.Unlock()
} }
} }
type eventCollectorSink struct {
observedEvents []Event
mu sync.Mutex
}
func (s *eventCollectorSink) OnTunnelEvent(event Event) {
s.mu.Lock()
defer s.mu.Unlock()
s.observedEvents = append(s.observedEvents, event)
}
func (s *eventCollectorSink) assertSawEvent(t *testing.T, event Event) {
s.mu.Lock()
defer s.mu.Unlock()
assert.Contains(t, s.observedEvents, event)
}

View File

@ -291,6 +291,8 @@ func (h *h2muxConnection) registerNamedTunnel(
} }
func (h *h2muxConnection) unregister(isNamedTunnel bool) { func (h *h2muxConnection) unregister(isNamedTunnel bool) {
h.observer.sendUnregisteringEvent(h.connIndex)
unregisterCtx, cancel := context.WithTimeout(context.Background(), h.config.GracePeriod) unregisterCtx, cancel := context.WithTimeout(context.Background(), h.config.GracePeriod)
defer cancel() defer cancel()

View File

@ -32,7 +32,7 @@ func (rs *ReadyServer) OnTunnelEvent(c conn.Event) {
rs.Lock() rs.Lock()
rs.isConnected[int(c.Index)] = true rs.isConnected[int(c.Index)] = true
rs.Unlock() rs.Unlock()
case conn.Disconnected, conn.Reconnecting, conn.RegisteringTunnel: case conn.Disconnected, conn.Reconnecting, conn.RegisteringTunnel, conn.Unregistering:
rs.Lock() rs.Lock()
rs.isConnected[int(c.Index)] = false rs.isConnected[int(c.Index)] = false
rs.Unlock() rs.Unlock()

View File

@ -107,6 +107,22 @@ func TestReadinessEventHandling(t *testing.T) {
assert.NotEqualValues(t, http.StatusOK, code) assert.NotEqualValues(t, http.StatusOK, code)
assert.Zero(t, ready) assert.Zero(t, ready)
// other connected then unregistered => not ok
rs.OnTunnelEvent(connection.Event{
Index: 1,
EventType: connection.Connected,
})
code, ready = rs.makeResponse()
assert.EqualValues(t, http.StatusOK, code)
assert.EqualValues(t, 1, ready)
rs.OnTunnelEvent(connection.Event{
Index: 1,
EventType: connection.Unregistering,
})
code, ready = rs.makeResponse()
assert.NotEqualValues(t, http.StatusOK, code)
assert.Zero(t, ready)
// other disconnected => not ok // other disconnected => not ok
rs.OnTunnelEvent(connection.Event{ rs.OnTunnelEvent(connection.Event{
Index: 1, Index: 1,