TUN-8728: implement diag/tunnel endpoint

## Summary
The new endpoint returns the current information to be used when calling the diagnostic procedure.
This also adds:
- add indexed connection info and method to extract active connections from connTracker
- add edge address to Event struct and conn tracker
- remove unnecessary event send
- add tunnel configuration handler
- adjust cmd and metrics to create diagnostic server

Closes TUN-8728
This commit is contained in:
Luis Neto 2024-11-25 10:43:32 -08:00
parent aab5364252
commit 4b0b6dc8c6
11 changed files with 177 additions and 37 deletions

View File

@ -461,10 +461,11 @@ func StartServer(
go func() { go func() {
defer wg.Done() defer wg.Done()
readinessServer := metrics.NewReadyServer(clientID, tracker := tunnelstate.NewConnTracker(log)
tunnelstate.NewConnTracker(log)) observer.RegisterSink(tracker)
observer.RegisterSink(readinessServer)
diagnosticHandler := diagnostic.NewDiagnosticHandler(log, 0, diagnostic.NewSystemCollectorImpl(buildInfo.CloudflaredVersion)) readinessServer := metrics.NewReadyServer(clientID, tracker)
diagnosticHandler := diagnostic.NewDiagnosticHandler(log, 0, diagnostic.NewSystemCollectorImpl(buildInfo.CloudflaredVersion), tunnelConfig.NamedTunnel.Credentials.TunnelID, clientID, tracker)
metricsConfig := metrics.Config{ metricsConfig := metrics.Config{
ReadyServer: readinessServer, ReadyServer: readinessServer,
DiagnosticHandler: diagnosticHandler, DiagnosticHandler: diagnosticHandler,

View File

@ -102,7 +102,7 @@ func (c *controlStream) ServeControlStream(
c.observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc() c.observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol) c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol)
c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location) c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location, c.edgeAddress)
c.connectedFuse.Connected() c.connectedFuse.Connected()
// if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration // if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration

View File

@ -1,12 +1,15 @@
package connection package connection
import "net"
// Event is something that happened to a connection, e.g. disconnection or registration. // Event is something that happened to a connection, e.g. disconnection or registration.
type Event struct { type Event struct {
Index uint8 Index uint8
EventType Status EventType Status
Location string Location string
Protocol Protocol Protocol Protocol
URL string URL string
EdgeAddress net.IP
} }
// Status is the status of a connection. // Status is the status of a connection.

View File

@ -47,7 +47,6 @@ func (o *Observer) RegisterSink(sink EventSink) {
} }
func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) { func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) {
o.sendEvent(Event{Index: connIndex, EventType: Connected, Location: location})
o.log.Info(). o.log.Info().
Int(management.EventTypeKey, int(management.Cloudflared)). Int(management.EventTypeKey, int(management.Cloudflared)).
Str(LogFieldConnectionID, connectionID.String()). Str(LogFieldConnectionID, connectionID.String()).
@ -63,8 +62,8 @@ func (o *Observer) sendRegisteringEvent(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: RegisteringTunnel}) o.sendEvent(Event{Index: connIndex, EventType: RegisteringTunnel})
} }
func (o *Observer) sendConnectedEvent(connIndex uint8, protocol Protocol, location string) { func (o *Observer) sendConnectedEvent(connIndex uint8, protocol Protocol, location string, edgeAddress net.IP) {
o.sendEvent(Event{Index: connIndex, EventType: Connected, Protocol: protocol, Location: location}) o.sendEvent(Event{Index: connIndex, EventType: Connected, Protocol: protocol, Location: location, EdgeAddress: edgeAddress})
} }
func (o *Observer) SendURL(url string) { func (o *Observer) SendURL(url string) {

View File

@ -3,7 +3,8 @@ package diagnostic
import "time" import "time"
const ( const (
defaultCollectorTimeout = time.Second * 10 // This const define the timeout value of a collector operation. defaultCollectorTimeout = time.Second * 10 // This const define the timeout value of a collector operation.
collectorField = "collector" // used for logging purposes collectorField = "collector" // used for logging purposes
systemCollectorName = "system" // used for logging purposes systemCollectorName = "system" // used for logging purposes
tunnelStateCollectorName = "tunnelState" // used for logging purposes
) )

View File

@ -6,28 +6,41 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/google/uuid"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/tunnelstate"
) )
type Handler struct { type Handler struct {
log *zerolog.Logger log *zerolog.Logger
timeout time.Duration timeout time.Duration
systemCollector SystemCollector systemCollector SystemCollector
tunnelID uuid.UUID
connectorID uuid.UUID
tracker *tunnelstate.ConnTracker
} }
func NewDiagnosticHandler( func NewDiagnosticHandler(
log *zerolog.Logger, log *zerolog.Logger,
timeout time.Duration, timeout time.Duration,
systemCollector SystemCollector, systemCollector SystemCollector,
tunnelID uuid.UUID,
connectorID uuid.UUID,
tracker *tunnelstate.ConnTracker,
) *Handler { ) *Handler {
logger := log.With().Logger()
if timeout == 0 { if timeout == 0 {
timeout = defaultCollectorTimeout timeout = defaultCollectorTimeout
} }
return &Handler{ return &Handler{
log, log: &logger,
timeout, timeout: timeout,
systemCollector, systemCollector: systemCollector,
tunnelID: tunnelID,
connectorID: connectorID,
tracker: tracker,
} }
} }
@ -35,9 +48,7 @@ func (handler *Handler) SystemHandler(writer http.ResponseWriter, request *http.
logger := handler.log.With().Str(collectorField, systemCollectorName).Logger() logger := handler.log.With().Str(collectorField, systemCollectorName).Logger()
logger.Info().Msg("Collection started") logger.Info().Msg("Collection started")
defer func() { defer logger.Info().Msg("Collection finished")
logger.Info().Msg("Collection finished")
}()
ctx, cancel := context.WithTimeout(request.Context(), handler.timeout) ctx, cancel := context.WithTimeout(request.Context(), handler.timeout)
@ -73,6 +84,32 @@ func (handler *Handler) SystemHandler(writer http.ResponseWriter, request *http.
} }
} }
type tunnelStateResponse struct {
TunnelID uuid.UUID `json:"tunnelID,omitempty"`
ConnectorID uuid.UUID `json:"connectorID,omitempty"`
Connections []tunnelstate.IndexedConnectionInfo `json:"connections,omitempty"`
}
func (handler *Handler) TunnelStateHandler(writer http.ResponseWriter, _ *http.Request) {
log := handler.log.With().Str(collectorField, tunnelStateCollectorName).Logger()
log.Info().Msg("Collection started")
defer log.Info().Msg("Collection finished")
body := tunnelStateResponse{
handler.tunnelID,
handler.connectorID,
handler.tracker.GetActiveConnections(),
}
encoder := json.NewEncoder(writer)
err := encoder.Encode(body)
if err != nil {
handler.log.Error().Err(err).Msgf("error occurred whilst serializing information")
writer.WriteHeader(http.StatusInternalServerError)
}
}
func writeResponse(writer http.ResponseWriter, bytes []byte, logger *zerolog.Logger) { func writeResponse(writer http.ResponseWriter, bytes []byte, logger *zerolog.Logger) {
bytesWritten, err := writer.Write(bytes) bytesWritten, err := writer.Write(bytes)
if err != nil { if err != nil {

View File

@ -5,15 +5,19 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"io" "io"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/google/uuid"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/diagnostic" "github.com/cloudflare/cloudflared/diagnostic"
"github.com/cloudflare/cloudflared/tunnelstate"
) )
type SystemCollectorMock struct{} type SystemCollectorMock struct{}
@ -24,6 +28,23 @@ const (
errorKey = "errkey" errorKey = "errkey"
) )
func newTrackerFromConns(t *testing.T, connections []tunnelstate.IndexedConnectionInfo) *tunnelstate.ConnTracker {
t.Helper()
log := zerolog.Nop()
tracker := tunnelstate.NewConnTracker(&log)
for _, conn := range connections {
tracker.OnTunnelEvent(connection.Event{
Index: conn.Index,
EventType: connection.Connected,
Protocol: conn.Protocol,
EdgeAddress: conn.EdgeAddress,
})
}
return tracker
}
func setCtxValuesForSystemCollector( func setCtxValuesForSystemCollector(
systemInfo *diagnostic.SystemInformation, systemInfo *diagnostic.SystemInformation,
rawInfo string, rawInfo string,
@ -83,7 +104,7 @@ func TestSystemHandler(t *testing.T) {
for _, tCase := range tests { for _, tCase := range tests {
t.Run(tCase.name, func(t *testing.T) { t.Run(tCase.name, func(t *testing.T) {
t.Parallel() t.Parallel()
handler := diagnostic.NewDiagnosticHandler(&log, 0, &SystemCollectorMock{}) handler := diagnostic.NewDiagnosticHandler(&log, 0, &SystemCollectorMock{}, uuid.New(), uuid.New(), nil)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
ctx := setCtxValuesForSystemCollector(tCase.systemInfo, tCase.rawInfo, tCase.err) ctx := setCtxValuesForSystemCollector(tCase.systemInfo, tCase.rawInfo, tCase.err)
request, err := http.NewRequestWithContext(ctx, http.MethodGet, "/diag/syste,", nil) request, err := http.NewRequestWithContext(ctx, http.MethodGet, "/diag/syste,", nil)
@ -106,3 +127,58 @@ func TestSystemHandler(t *testing.T) {
}) })
} }
} }
func TestTunnelStateHandler(t *testing.T) {
t.Parallel()
log := zerolog.Nop()
tests := []struct {
name string
tunnelID uuid.UUID
clientID uuid.UUID
connections []tunnelstate.IndexedConnectionInfo
}{
{
name: "case1",
tunnelID: uuid.New(),
clientID: uuid.New(),
},
{
name: "case2",
tunnelID: uuid.New(),
clientID: uuid.New(),
connections: []tunnelstate.IndexedConnectionInfo{{
ConnectionInfo: tunnelstate.ConnectionInfo{
IsConnected: true,
Protocol: connection.QUIC,
EdgeAddress: net.IPv4(100, 100, 100, 100),
},
Index: 0,
}},
},
}
for _, tCase := range tests {
t.Run(tCase.name, func(t *testing.T) {
t.Parallel()
tracker := newTrackerFromConns(t, tCase.connections)
handler := diagnostic.NewDiagnosticHandler(&log, 0, nil, tCase.tunnelID, tCase.clientID, tracker)
recorder := httptest.NewRecorder()
handler.TunnelStateHandler(recorder, nil)
decoder := json.NewDecoder(recorder.Body)
var response struct {
TunnelID uuid.UUID `json:"tunnelID,omitempty"`
ConnectorID uuid.UUID `json:"connectorID,omitempty"`
Connections []tunnelstate.IndexedConnectionInfo `json:"connections,omitempty"`
}
err := decoder.Decode(&response)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, recorder.Code)
assert.Equal(t, tCase.tunnelID, response.TunnelID)
assert.Equal(t, tCase.clientID, response.ConnectorID)
assert.Equal(t, tCase.connections, response.Connections)
})
}
}

View File

@ -94,6 +94,7 @@ func newMetricsHandler(
}) })
} }
router.HandleFunc("/diag/tunnel", config.DiagnosticHandler.TunnelStateHandler)
router.HandleFunc("/diag/system", config.DiagnosticHandler.SystemHandler) router.HandleFunc("/diag/system", config.DiagnosticHandler.SystemHandler)
return router return router

View File

@ -7,7 +7,6 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
conn "github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/tunnelstate" "github.com/cloudflare/cloudflared/tunnelstate"
) )
@ -28,10 +27,6 @@ func NewReadyServer(
} }
} }
func (rs *ReadyServer) OnTunnelEvent(c conn.Event) {
rs.tracker.OnTunnelEvent(c)
}
type body struct { type body struct {
Status int `json:"status"` Status int `json:"status"`
ReadyConnections uint `json:"readyConnections"` ReadyConnections uint `json:"readyConnections"`

View File

@ -44,7 +44,7 @@ func TestReadinessEventHandling(t *testing.T) {
assert.Zero(t, readyConnections) assert.Zero(t, readyConnections)
// one connected => ok // one connected => ok
rs.OnTunnelEvent(connection.Event{ tracker.OnTunnelEvent(connection.Event{
Index: 1, Index: 1,
EventType: connection.Connected, EventType: connection.Connected,
}) })
@ -53,7 +53,7 @@ func TestReadinessEventHandling(t *testing.T) {
assert.EqualValues(t, 1, readyConnections) assert.EqualValues(t, 1, readyConnections)
// another connected => still ok // another connected => still ok
rs.OnTunnelEvent(connection.Event{ tracker.OnTunnelEvent(connection.Event{
Index: 2, Index: 2,
EventType: connection.Connected, EventType: connection.Connected,
}) })
@ -62,7 +62,7 @@ func TestReadinessEventHandling(t *testing.T) {
assert.EqualValues(t, 2, readyConnections) assert.EqualValues(t, 2, readyConnections)
// one reconnecting => still ok // one reconnecting => still ok
rs.OnTunnelEvent(connection.Event{ tracker.OnTunnelEvent(connection.Event{
Index: 2, Index: 2,
EventType: connection.Reconnecting, EventType: connection.Reconnecting,
}) })
@ -71,7 +71,7 @@ func TestReadinessEventHandling(t *testing.T) {
assert.EqualValues(t, 1, readyConnections) assert.EqualValues(t, 1, readyConnections)
// Regression test for TUN-3777 // Regression test for TUN-3777
rs.OnTunnelEvent(connection.Event{ tracker.OnTunnelEvent(connection.Event{
Index: 1, Index: 1,
EventType: connection.RegisteringTunnel, EventType: connection.RegisteringTunnel,
}) })
@ -80,14 +80,14 @@ func TestReadinessEventHandling(t *testing.T) {
assert.Zero(t, readyConnections) assert.Zero(t, readyConnections)
// other connected then unregistered => not ok // other connected then unregistered => not ok
rs.OnTunnelEvent(connection.Event{ tracker.OnTunnelEvent(connection.Event{
Index: 1, Index: 1,
EventType: connection.Connected, EventType: connection.Connected,
}) })
code, readyConnections = mockRequest(t, rs) code, readyConnections = mockRequest(t, rs)
assert.EqualValues(t, http.StatusOK, code) assert.EqualValues(t, http.StatusOK, code)
assert.EqualValues(t, 1, readyConnections) assert.EqualValues(t, 1, readyConnections)
rs.OnTunnelEvent(connection.Event{ tracker.OnTunnelEvent(connection.Event{
Index: 1, Index: 1,
EventType: connection.Unregistering, EventType: connection.Unregistering,
}) })
@ -96,7 +96,7 @@ func TestReadinessEventHandling(t *testing.T) {
assert.Zero(t, readyConnections) assert.Zero(t, readyConnections)
// other disconnected => not ok // other disconnected => not ok
rs.OnTunnelEvent(connection.Event{ tracker.OnTunnelEvent(connection.Event{
Index: 1, Index: 1,
EventType: connection.Disconnected, EventType: connection.Disconnected,
}) })

View File

@ -1,6 +1,7 @@
package tunnelstate package tunnelstate
import ( import (
"net"
"sync" "sync"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -16,8 +17,15 @@ type ConnTracker struct {
} }
type ConnectionInfo struct { type ConnectionInfo struct {
IsConnected bool IsConnected bool `json:"isConnected,omitempty"`
Protocol connection.Protocol Protocol connection.Protocol `json:"protocol,omitempty"`
EdgeAddress net.IP `json:"edgeAddress,omitempty"`
}
// Convinience struct to extend the connection with its index.
type IndexedConnectionInfo struct {
ConnectionInfo
Index uint8 `json:"index,omitempty"`
} }
func NewConnTracker( func NewConnTracker(
@ -36,6 +44,7 @@ func (ct *ConnTracker) OnTunnelEvent(c connection.Event) {
ci := ConnectionInfo{ ci := ConnectionInfo{
IsConnected: true, IsConnected: true,
Protocol: c.Protocol, Protocol: c.Protocol,
EdgeAddress: c.EdgeAddress,
} }
ct.connectionInfo[c.Index] = ci ct.connectionInfo[c.Index] = ci
ct.mutex.Unlock() ct.mutex.Unlock()
@ -74,3 +83,21 @@ func (ct *ConnTracker) HasConnectedWith(protocol connection.Protocol) bool {
} }
return false return false
} }
// Returns the connection information iff it is connected this
// also leverages the [IndexedConnectionInfo] to also provide the connection index
func (ct *ConnTracker) GetActiveConnections() []IndexedConnectionInfo {
ct.mutex.RLock()
defer ct.mutex.RUnlock()
connections := make([]IndexedConnectionInfo, 0)
for key, value := range ct.connectionInfo {
if value.IsConnected {
info := IndexedConnectionInfo{value, key}
connections = append(connections, info)
}
}
return connections
}