2023-03-21 18:42:25 +00:00
package management
import (
2023-04-04 22:45:32 +00:00
"context"
2023-04-19 11:41:01 +00:00
"fmt"
2023-04-18 08:59:55 +00:00
"net"
2023-03-21 18:42:25 +00:00
"net/http"
2023-06-30 18:27:37 +00:00
"net/http/pprof"
2023-04-18 08:59:55 +00:00
"os"
2023-04-04 22:45:32 +00:00
"sync"
2023-04-06 23:00:19 +00:00
"time"
2023-03-21 18:42:25 +00:00
"github.com/go-chi/chi/v5"
2023-05-17 05:18:57 +00:00
"github.com/go-chi/cors"
2023-04-18 08:59:55 +00:00
"github.com/google/uuid"
2023-06-30 16:38:26 +00:00
"github.com/prometheus/client_golang/prometheus/promhttp"
2023-03-30 21:12:00 +00:00
"github.com/rs/zerolog"
2023-04-04 22:45:32 +00:00
"nhooyr.io/websocket"
)
const (
// In the current state, an invalid command was provided by the client
StatusInvalidCommand websocket . StatusCode = 4001
reasonInvalidCommand = "expected start streaming as first event"
// There are a limited number of available streaming log sessions that cloudflared will service, exceeding this
// value will return this error to incoming requests.
StatusSessionLimitExceeded websocket . StatusCode = 4002
reasonSessionLimitExceeded = "limit exceeded for streaming sessions"
2023-04-21 18:54:37 +00:00
// There is a limited idle time while not actively serving a session for a request before dropping the connection.
2023-04-06 23:00:19 +00:00
StatusIdleLimitExceeded websocket . StatusCode = 4003
reasonIdleLimitExceeded = "session was idle for too long"
2023-03-21 18:42:25 +00:00
)
2023-07-05 20:28:30 +00:00
var (
// CORS middleware required to allow dash to access management.argotunnel.com requests
corsHandler = cors . Handler ( cors . Options {
// Allows for any subdomain of cloudflare.com
AllowedOrigins : [ ] string { "https://*.cloudflare.com" } ,
// Required to present cookies or other authentication across origin boundries
AllowCredentials : true ,
MaxAge : 300 , // Maximum value not ignored by any of major browsers
} )
)
2023-03-21 18:42:25 +00:00
type ManagementService struct {
// The management tunnel hostname
Hostname string
2023-04-19 11:41:01 +00:00
// Host details related configurations
2023-04-18 08:59:55 +00:00
serviceIP string
clientID uuid . UUID
2023-04-19 11:41:01 +00:00
label string
2023-04-18 08:59:55 +00:00
2023-06-30 16:38:26 +00:00
// Additional Handlers
metricsHandler http . Handler
2023-03-30 21:12:00 +00:00
log * zerolog . Logger
2023-03-21 18:42:25 +00:00
router chi . Router
2023-04-04 22:45:32 +00:00
// streamingMut is a lock to prevent concurrent requests to start streaming. Utilizing the atomic.Bool is not
// sufficient to complete this operation since many other checks during an incoming new request are needed
// to validate this before setting streaming to true.
streamingMut sync . Mutex
logger LoggerListener
2023-03-21 18:42:25 +00:00
}
2023-04-18 08:59:55 +00:00
func New ( managementHostname string ,
2023-07-05 20:28:30 +00:00
enableDiagServices bool ,
2023-04-18 08:59:55 +00:00
serviceIP string ,
clientID uuid . UUID ,
2023-04-19 11:41:01 +00:00
label string ,
2023-04-18 08:59:55 +00:00
log * zerolog . Logger ,
logger LoggerListener ,
) * ManagementService {
2023-03-30 21:12:00 +00:00
s := & ManagementService {
2023-06-30 16:38:26 +00:00
Hostname : managementHostname ,
log : log ,
logger : logger ,
serviceIP : serviceIP ,
clientID : clientID ,
label : label ,
metricsHandler : promhttp . Handler ( ) ,
2023-03-21 18:42:25 +00:00
}
2023-03-30 21:12:00 +00:00
r := chi . NewRouter ( )
2023-04-21 18:54:37 +00:00
r . Use ( ValidateAccessTokenQueryMiddleware )
2023-07-05 20:28:30 +00:00
// Default management services
r . With ( corsHandler ) . Get ( "/ping" , ping )
r . With ( corsHandler ) . Head ( "/ping" , ping )
2023-04-04 22:45:32 +00:00
r . Get ( "/logs" , s . logs )
2023-07-05 20:28:30 +00:00
r . With ( corsHandler ) . Get ( "/host_details" , s . getHostDetails )
// Diagnostic management services
if enableDiagServices {
// Prometheus endpoint
r . With ( corsHandler ) . Get ( "/metrics" , s . metricsHandler . ServeHTTP )
// Supports only heap and goroutine
r . With ( corsHandler ) . Get ( "/debug/pprof/{profile:heap|goroutine}" , pprof . Index )
}
2023-03-30 21:12:00 +00:00
s . router = r
return s
2023-03-21 18:42:25 +00:00
}
func ( m * ManagementService ) ServeHTTP ( w http . ResponseWriter , r * http . Request ) {
m . router . ServeHTTP ( w , r )
}
2023-03-30 21:12:00 +00:00
// Management Ping handler
2023-03-21 18:42:25 +00:00
func ping ( w http . ResponseWriter , r * http . Request ) {
w . WriteHeader ( 200 )
}
2023-04-04 22:45:32 +00:00
2023-04-18 08:59:55 +00:00
// The response provided by the /host_details endpoint
type getHostDetailsResponse struct {
ClientID string ` json:"connector_id" `
IP string ` json:"ip,omitempty" `
HostName string ` json:"hostname,omitempty" `
}
func ( m * ManagementService ) getHostDetails ( w http . ResponseWriter , r * http . Request ) {
var getHostDetailsResponse = getHostDetailsResponse {
ClientID : m . clientID . String ( ) ,
}
if ip , err := getPrivateIP ( m . serviceIP ) ; err == nil {
getHostDetailsResponse . IP = ip
}
2023-04-19 11:41:01 +00:00
getHostDetailsResponse . HostName = m . getLabel ( )
2023-04-18 08:59:55 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . WriteHeader ( 200 )
json . NewEncoder ( w ) . Encode ( getHostDetailsResponse )
}
2023-04-19 11:41:01 +00:00
func ( m * ManagementService ) getLabel ( ) string {
if m . label != "" {
return fmt . Sprintf ( "custom:%s" , m . label )
}
// If no label is provided we return the system hostname. This is not
// a fqdn hostname.
hostname , err := os . Hostname ( )
if err != nil {
return "unknown"
}
return hostname
}
2023-04-18 08:59:55 +00:00
// Get preferred private ip of this machine
func getPrivateIP ( addr string ) ( string , error ) {
conn , err := net . DialTimeout ( "tcp" , addr , 1 * time . Second )
if err != nil {
return "" , err
}
defer conn . Close ( )
localAddr := conn . LocalAddr ( ) . String ( )
host , _ , err := net . SplitHostPort ( localAddr )
return host , err
}
2023-04-04 22:45:32 +00:00
// readEvents will loop through all incoming websocket messages from a client and marshal them into the
// proper Event structure and pass through to the events channel. Any invalid messages sent will automatically
// terminate the connection.
func ( m * ManagementService ) readEvents ( c * websocket . Conn , ctx context . Context , events chan <- * ClientEvent ) {
for {
event , err := ReadClientEvent ( c , ctx )
select {
case <- ctx . Done ( ) :
return
default :
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if ! IsClosed ( err , m . log ) {
m . log . Err ( err ) . Send ( )
m . log . Err ( c . Close ( websocket . StatusUnsupportedData , err . Error ( ) ) ) . Send ( )
}
// Any errors when reading the messages from the client will close the connection
return
}
events <- event
}
}
}
// streamLogs will begin the process of reading from the Session listener and write the log events to the client.
2023-04-21 18:54:37 +00:00
func ( m * ManagementService ) streamLogs ( c * websocket . Conn , ctx context . Context , session * session ) {
for session . Active ( ) {
2023-04-04 22:45:32 +00:00
select {
case <- ctx . Done ( ) :
2023-04-21 18:54:37 +00:00
session . Stop ( )
2023-04-04 22:45:32 +00:00
return
case event := <- session . listener :
err := WriteEvent ( c , ctx , & EventLog {
ServerEvent : ServerEvent { Type : Logs } ,
2023-04-06 18:30:42 +00:00
Logs : [ ] * Log { event } ,
2023-04-04 22:45:32 +00:00
} )
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if ! IsClosed ( err , m . log ) {
m . log . Err ( err ) . Send ( )
m . log . Err ( c . Close ( websocket . StatusInternalError , err . Error ( ) ) ) . Send ( )
}
// Any errors when writing the messages to the client will stop streaming and close the connection
2023-04-21 18:54:37 +00:00
session . Stop ( )
2023-04-04 22:45:32 +00:00
return
}
default :
// No messages to send
}
}
}
2023-04-21 18:54:37 +00:00
// canStartStream will check the conditions of the request and return if the session can begin streaming.
func ( m * ManagementService ) canStartStream ( session * session ) bool {
2023-04-04 22:45:32 +00:00
m . streamingMut . Lock ( )
defer m . streamingMut . Unlock ( )
2023-04-21 18:54:37 +00:00
// Limits to one actor for streaming logs
if m . logger . ActiveSessions ( ) > 0 {
// Allow the same user to preempt their existing session to disconnect their old session and start streaming
// with this new session instead.
if existingSession := m . logger . ActiveSession ( session . actor ) ; existingSession != nil {
m . log . Info ( ) .
Msgf ( "Another management session request for the same actor was requested; the other session will be disconnected to handle the new request." )
existingSession . Stop ( )
m . logger . Remove ( existingSession )
existingSession . cancel ( )
} else {
m . log . Warn ( ) .
Msgf ( "Another management session request was attempted but one session already being served; there is a limit of streaming log sessions to reduce overall performance impact." )
return false
}
2023-04-04 22:45:32 +00:00
}
2023-04-21 18:54:37 +00:00
return true
}
// parseFilters will check the ClientEvent for start_streaming and assign filters if provided to the session
func ( m * ManagementService ) parseFilters ( c * websocket . Conn , event * ClientEvent , session * session ) bool {
2023-04-04 22:45:32 +00:00
// Expect the first incoming request
2023-04-11 16:54:28 +00:00
startEvent , ok := IntoClientEvent [ EventStartStreaming ] ( event , StartStreaming )
2023-04-04 22:45:32 +00:00
if ! ok {
2023-04-21 18:54:37 +00:00
return false
2023-04-04 22:45:32 +00:00
}
2023-04-21 18:54:37 +00:00
session . Filters ( startEvent . Filters )
return true
2023-04-04 22:45:32 +00:00
}
// Management Streaming Logs accept handler
func ( m * ManagementService ) logs ( w http . ResponseWriter , r * http . Request ) {
2023-05-11 17:13:39 +00:00
c , err := websocket . Accept ( w , r , & websocket . AcceptOptions {
OriginPatterns : [ ] string {
"*.cloudflare.com" ,
} ,
} )
2023-04-04 22:45:32 +00:00
if err != nil {
m . log . Debug ( ) . Msgf ( "management handshake: %s" , err . Error ( ) )
return
}
// Make sure the connection is closed if other go routines fail to close the connection after completing.
defer c . Close ( websocket . StatusInternalError , "" )
2023-04-06 23:00:19 +00:00
ctx , cancel := context . WithCancel ( r . Context ( ) )
defer cancel ( )
2023-04-04 22:45:32 +00:00
events := make ( chan * ClientEvent )
go m . readEvents ( c , ctx , events )
2023-04-06 23:00:19 +00:00
// Send a heartbeat ping to hold the connection open even if not streaming.
ping := time . NewTicker ( 15 * time . Second )
defer ping . Stop ( )
2023-04-21 18:54:37 +00:00
// Close the connection if no operation has occurred after the idle timeout. The timeout is halted
// when streaming logs is active.
2023-04-06 23:00:19 +00:00
idleTimeout := 5 * time . Minute
idle := time . NewTimer ( idleTimeout )
defer idle . Stop ( )
2023-04-21 18:54:37 +00:00
// Fetch the claims from the request context to acquire the actor
claims , ok := ctx . Value ( accessClaimsCtxKey ) . ( * managementTokenClaims )
if ! ok || claims == nil {
// Typically should never happen as it is provided in the context from the middleware
m . log . Err ( c . Close ( websocket . StatusInternalError , "missing access_token" ) ) . Send ( )
return
}
session := newSession ( logWindow , claims . Actor , cancel )
defer m . logger . Remove ( session )
2023-04-04 22:45:32 +00:00
for {
select {
case <- ctx . Done ( ) :
m . log . Debug ( ) . Msgf ( "management logs: context cancelled" )
c . Close ( websocket . StatusNormalClosure , "context closed" )
return
case event := <- events :
switch event . Type {
case StartStreaming :
2023-04-06 23:00:19 +00:00
idle . Stop ( )
2023-04-21 18:54:37 +00:00
// Expect the first incoming request
startEvent , ok := IntoClientEvent [ EventStartStreaming ] ( event , StartStreaming )
if ! ok {
m . log . Warn ( ) . Msgf ( "expected start_streaming as first recieved event" )
m . log . Err ( c . Close ( StatusInvalidCommand , reasonInvalidCommand ) ) . Send ( )
return
}
// Make sure the session can start
if ! m . canStartStream ( session ) {
m . log . Err ( c . Close ( StatusSessionLimitExceeded , reasonSessionLimitExceeded ) ) . Send ( )
return
}
session . Filters ( startEvent . Filters )
m . logger . Listen ( session )
m . log . Debug ( ) . Msgf ( "Streaming logs" )
go m . streamLogs ( c , ctx , session )
2023-04-04 22:45:32 +00:00
continue
case StopStreaming :
2023-04-06 23:00:19 +00:00
idle . Reset ( idleTimeout )
2023-04-21 18:54:37 +00:00
// Stop the current session for the current actor who requested it
session . Stop ( )
m . logger . Remove ( session )
2023-04-04 22:45:32 +00:00
case UnknownClientEventType :
fallthrough
default :
// Drop unknown events and close connection
m . log . Debug ( ) . Msgf ( "unexpected management message received: %s" , event . Type )
// If the client (or the server) already closed the connection, don't attempt to close it again
if ! IsClosed ( err , m . log ) {
m . log . Err ( err ) . Err ( c . Close ( websocket . StatusUnsupportedData , err . Error ( ) ) ) . Send ( )
}
return
}
2023-04-06 23:00:19 +00:00
case <- ping . C :
go c . Ping ( ctx )
case <- idle . C :
c . Close ( StatusIdleLimitExceeded , reasonIdleLimitExceeded )
return
2023-04-04 22:45:32 +00:00
}
}
}