2023-03-21 18:42:25 +00:00
package management
import (
2023-04-04 22:45:32 +00:00
"context"
2023-04-19 11:41:01 +00:00
"fmt"
2023-04-18 08:59:55 +00:00
"net"
2023-03-21 18:42:25 +00:00
"net/http"
2023-04-18 08:59:55 +00:00
"os"
2023-04-04 22:45:32 +00:00
"sync"
"sync/atomic"
2023-04-06 23:00:19 +00:00
"time"
2023-03-21 18:42:25 +00:00
"github.com/go-chi/chi/v5"
2023-04-18 08:59:55 +00:00
"github.com/google/uuid"
2023-03-30 21:12:00 +00:00
"github.com/rs/zerolog"
2023-04-04 22:45:32 +00:00
"nhooyr.io/websocket"
)
const (
// In the current state, an invalid command was provided by the client
StatusInvalidCommand websocket . StatusCode = 4001
reasonInvalidCommand = "expected start streaming as first event"
// There are a limited number of available streaming log sessions that cloudflared will service, exceeding this
// value will return this error to incoming requests.
StatusSessionLimitExceeded websocket . StatusCode = 4002
reasonSessionLimitExceeded = "limit exceeded for streaming sessions"
2023-04-06 23:00:19 +00:00
StatusIdleLimitExceeded websocket . StatusCode = 4003
reasonIdleLimitExceeded = "session was idle for too long"
2023-03-21 18:42:25 +00:00
)
type ManagementService struct {
// The management tunnel hostname
Hostname string
2023-04-19 11:41:01 +00:00
// Host details related configurations
2023-04-18 08:59:55 +00:00
serviceIP string
clientID uuid . UUID
2023-04-19 11:41:01 +00:00
label string
2023-04-18 08:59:55 +00:00
2023-03-30 21:12:00 +00:00
log * zerolog . Logger
2023-03-21 18:42:25 +00:00
router chi . Router
2023-04-04 22:45:32 +00:00
// streaming signifies if the service is already streaming logs. Helps limit the number of active users streaming logs
// from this cloudflared instance.
streaming atomic . Bool
// streamingMut is a lock to prevent concurrent requests to start streaming. Utilizing the atomic.Bool is not
// sufficient to complete this operation since many other checks during an incoming new request are needed
// to validate this before setting streaming to true.
streamingMut sync . Mutex
logger LoggerListener
2023-03-21 18:42:25 +00:00
}
2023-04-18 08:59:55 +00:00
func New ( managementHostname string ,
serviceIP string ,
clientID uuid . UUID ,
2023-04-19 11:41:01 +00:00
label string ,
2023-04-18 08:59:55 +00:00
log * zerolog . Logger ,
logger LoggerListener ,
) * ManagementService {
2023-03-30 21:12:00 +00:00
s := & ManagementService {
2023-04-18 08:59:55 +00:00
Hostname : managementHostname ,
log : log ,
logger : logger ,
serviceIP : serviceIP ,
clientID : clientID ,
2023-04-19 11:41:01 +00:00
label : label ,
2023-03-21 18:42:25 +00:00
}
2023-03-30 21:12:00 +00:00
r := chi . NewRouter ( )
r . Get ( "/ping" , ping )
r . Head ( "/ping" , ping )
2023-04-04 22:45:32 +00:00
r . Get ( "/logs" , s . logs )
2023-04-18 08:59:55 +00:00
r . Get ( "/host_details" , s . getHostDetails )
2023-03-30 21:12:00 +00:00
s . router = r
return s
2023-03-21 18:42:25 +00:00
}
func ( m * ManagementService ) ServeHTTP ( w http . ResponseWriter , r * http . Request ) {
m . router . ServeHTTP ( w , r )
}
2023-03-30 21:12:00 +00:00
// Management Ping handler
2023-03-21 18:42:25 +00:00
func ping ( w http . ResponseWriter , r * http . Request ) {
w . WriteHeader ( 200 )
}
2023-04-04 22:45:32 +00:00
2023-04-18 08:59:55 +00:00
// The response provided by the /host_details endpoint
type getHostDetailsResponse struct {
ClientID string ` json:"connector_id" `
IP string ` json:"ip,omitempty" `
HostName string ` json:"hostname,omitempty" `
}
func ( m * ManagementService ) getHostDetails ( w http . ResponseWriter , r * http . Request ) {
var getHostDetailsResponse = getHostDetailsResponse {
ClientID : m . clientID . String ( ) ,
}
if ip , err := getPrivateIP ( m . serviceIP ) ; err == nil {
getHostDetailsResponse . IP = ip
}
2023-04-19 11:41:01 +00:00
getHostDetailsResponse . HostName = m . getLabel ( )
2023-04-18 08:59:55 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
w . WriteHeader ( 200 )
json . NewEncoder ( w ) . Encode ( getHostDetailsResponse )
}
2023-04-19 11:41:01 +00:00
func ( m * ManagementService ) getLabel ( ) string {
if m . label != "" {
return fmt . Sprintf ( "custom:%s" , m . label )
}
// If no label is provided we return the system hostname. This is not
// a fqdn hostname.
hostname , err := os . Hostname ( )
if err != nil {
return "unknown"
}
return hostname
}
2023-04-18 08:59:55 +00:00
// Get preferred private ip of this machine
func getPrivateIP ( addr string ) ( string , error ) {
conn , err := net . DialTimeout ( "tcp" , addr , 1 * time . Second )
if err != nil {
return "" , err
}
defer conn . Close ( )
localAddr := conn . LocalAddr ( ) . String ( )
host , _ , err := net . SplitHostPort ( localAddr )
return host , err
}
2023-04-04 22:45:32 +00:00
// readEvents will loop through all incoming websocket messages from a client and marshal them into the
// proper Event structure and pass through to the events channel. Any invalid messages sent will automatically
// terminate the connection.
func ( m * ManagementService ) readEvents ( c * websocket . Conn , ctx context . Context , events chan <- * ClientEvent ) {
for {
event , err := ReadClientEvent ( c , ctx )
select {
case <- ctx . Done ( ) :
return
default :
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if ! IsClosed ( err , m . log ) {
m . log . Err ( err ) . Send ( )
m . log . Err ( c . Close ( websocket . StatusUnsupportedData , err . Error ( ) ) ) . Send ( )
}
// Any errors when reading the messages from the client will close the connection
return
}
events <- event
}
}
}
// streamLogs will begin the process of reading from the Session listener and write the log events to the client.
func ( m * ManagementService ) streamLogs ( c * websocket . Conn , ctx context . Context , session * Session ) {
defer m . logger . Close ( session )
for m . streaming . Load ( ) {
select {
case <- ctx . Done ( ) :
m . streaming . Store ( false )
return
case event := <- session . listener :
err := WriteEvent ( c , ctx , & EventLog {
ServerEvent : ServerEvent { Type : Logs } ,
2023-04-06 18:30:42 +00:00
Logs : [ ] * Log { event } ,
2023-04-04 22:45:32 +00:00
} )
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if ! IsClosed ( err , m . log ) {
m . log . Err ( err ) . Send ( )
m . log . Err ( c . Close ( websocket . StatusInternalError , err . Error ( ) ) ) . Send ( )
}
// Any errors when writing the messages to the client will stop streaming and close the connection
m . streaming . Store ( false )
return
}
default :
// No messages to send
}
}
}
// startStreaming will check the conditions of the request and begin streaming or close the connection for invalid
// requests.
func ( m * ManagementService ) startStreaming ( c * websocket . Conn , ctx context . Context , event * ClientEvent ) {
m . streamingMut . Lock ( )
defer m . streamingMut . Unlock ( )
// Limits to one user for streaming logs
if m . streaming . Load ( ) {
m . log . Warn ( ) .
Msgf ( "Another management session request was attempted but one session already being served; there is a limit of streaming log sessions to reduce overall performance impact." )
m . log . Err ( c . Close ( StatusSessionLimitExceeded , reasonSessionLimitExceeded ) ) . Send ( )
return
}
// Expect the first incoming request
2023-04-11 16:54:28 +00:00
startEvent , ok := IntoClientEvent [ EventStartStreaming ] ( event , StartStreaming )
2023-04-04 22:45:32 +00:00
if ! ok {
2023-04-11 16:54:28 +00:00
m . log . Warn ( ) . Err ( c . Close ( StatusInvalidCommand , reasonInvalidCommand ) ) . Msgf ( "expected start_streaming as first recieved event" )
2023-04-04 22:45:32 +00:00
return
}
m . streaming . Store ( true )
2023-04-11 16:54:28 +00:00
listener := m . logger . Listen ( startEvent . Filters )
2023-04-04 22:45:32 +00:00
m . log . Debug ( ) . Msgf ( "Streaming logs" )
go m . streamLogs ( c , ctx , listener )
}
// Management Streaming Logs accept handler
func ( m * ManagementService ) logs ( w http . ResponseWriter , r * http . Request ) {
c , err := websocket . Accept ( w , r , nil )
if err != nil {
m . log . Debug ( ) . Msgf ( "management handshake: %s" , err . Error ( ) )
return
}
// Make sure the connection is closed if other go routines fail to close the connection after completing.
defer c . Close ( websocket . StatusInternalError , "" )
2023-04-06 23:00:19 +00:00
ctx , cancel := context . WithCancel ( r . Context ( ) )
defer cancel ( )
2023-04-04 22:45:32 +00:00
events := make ( chan * ClientEvent )
go m . readEvents ( c , ctx , events )
2023-04-06 23:00:19 +00:00
// Send a heartbeat ping to hold the connection open even if not streaming.
ping := time . NewTicker ( 15 * time . Second )
defer ping . Stop ( )
// Close the connection if no operation has occurred after the idle timeout.
idleTimeout := 5 * time . Minute
idle := time . NewTimer ( idleTimeout )
defer idle . Stop ( )
2023-04-04 22:45:32 +00:00
for {
select {
case <- ctx . Done ( ) :
m . log . Debug ( ) . Msgf ( "management logs: context cancelled" )
c . Close ( websocket . StatusNormalClosure , "context closed" )
return
case event := <- events :
switch event . Type {
case StartStreaming :
2023-04-06 23:00:19 +00:00
idle . Stop ( )
2023-04-04 22:45:32 +00:00
m . startStreaming ( c , ctx , event )
continue
case StopStreaming :
2023-04-06 23:00:19 +00:00
idle . Reset ( idleTimeout )
2023-04-04 22:45:32 +00:00
// TODO: limit StopStreaming to only halt streaming for clients that are already streaming
m . streaming . Store ( false )
case UnknownClientEventType :
fallthrough
default :
// Drop unknown events and close connection
m . log . Debug ( ) . Msgf ( "unexpected management message received: %s" , event . Type )
// If the client (or the server) already closed the connection, don't attempt to close it again
if ! IsClosed ( err , m . log ) {
m . log . Err ( err ) . Err ( c . Close ( websocket . StatusUnsupportedData , err . Error ( ) ) ) . Send ( )
}
return
}
2023-04-06 23:00:19 +00:00
case <- ping . C :
go c . Ping ( ctx )
case <- idle . C :
c . Close ( StatusIdleLimitExceeded , reasonIdleLimitExceeded )
return
2023-04-04 22:45:32 +00:00
}
}
}