2023-04-05 17:20:53 +00:00
package tail
import (
"encoding/json"
2023-04-12 16:43:38 +00:00
"errors"
2023-04-05 17:20:53 +00:00
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
"time"
2023-04-12 16:43:38 +00:00
"github.com/google/uuid"
2023-04-05 17:20:53 +00:00
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/urfave/cli/v2"
"nhooyr.io/websocket"
2023-04-12 16:43:38 +00:00
"github.com/cloudflare/cloudflared/cmd/cloudflared/cliutil"
"github.com/cloudflare/cloudflared/credentials"
2023-04-05 17:20:53 +00:00
"github.com/cloudflare/cloudflared/logger"
"github.com/cloudflare/cloudflared/management"
)
var (
2023-04-12 16:43:38 +00:00
buildInfo * cliutil . BuildInfo
2023-04-05 17:20:53 +00:00
)
2023-04-12 16:43:38 +00:00
func Init ( bi * cliutil . BuildInfo ) {
buildInfo = bi
2023-04-05 17:20:53 +00:00
}
func Command ( ) * cli . Command {
2023-04-18 08:59:55 +00:00
subcommands := [ ] * cli . Command {
buildTailManagementTokenSubcommand ( ) ,
}
return buildTailCommand ( subcommands )
}
func buildTailManagementTokenSubcommand ( ) * cli . Command {
return & cli . Command {
Name : "token" ,
Action : cliutil . ConfiguredAction ( managementTokenCommand ) ,
Usage : "Get management access jwt" ,
UsageText : "cloudflared tail token TUNNEL_ID" ,
Description : ` Get management access jwt for a tunnel ` ,
Hidden : true ,
}
}
func managementTokenCommand ( c * cli . Context ) error {
log := createLogger ( c )
token , err := getManagementToken ( c , log )
if err != nil {
return err
}
var tokenResponse = struct {
Token string ` json:"token" `
} { Token : token }
return json . NewEncoder ( os . Stdout ) . Encode ( tokenResponse )
}
func buildTailCommand ( subcommands [ ] * cli . Command ) * cli . Command {
2023-04-05 17:20:53 +00:00
return & cli . Command {
2023-04-12 16:43:38 +00:00
Name : "tail" ,
Action : Run ,
Usage : "Stream logs from a remote cloudflared" ,
UsageText : "cloudflared tail [tail command options] [TUNNEL-ID]" ,
2023-04-05 17:20:53 +00:00
Flags : [ ] cli . Flag {
& cli . StringFlag {
Name : "connector-id" ,
Usage : "Access a specific cloudflared instance by connector id (for when a tunnel has multiple cloudflared's)" ,
Value : "" ,
EnvVars : [ ] string { "TUNNEL_MANAGEMENT_CONNECTOR" } ,
} ,
2023-04-11 16:54:28 +00:00
& cli . StringSliceFlag {
Name : "event" ,
Usage : "Filter by specific Events (cloudflared, http, tcp, udp) otherwise, defaults to send all events" ,
EnvVars : [ ] string { "TUNNEL_MANAGEMENT_FILTER_EVENTS" } ,
} ,
& cli . StringFlag {
Name : "level" ,
2023-04-24 16:39:26 +00:00
Usage : "Filter by specific log levels (debug, info, warn, error). Filters by debug log level by default." ,
2023-04-11 16:54:28 +00:00
EnvVars : [ ] string { "TUNNEL_MANAGEMENT_FILTER_LEVEL" } ,
Value : "debug" ,
} ,
2023-04-24 16:39:26 +00:00
& cli . Float64Flag {
Name : "sample" ,
Usage : "Sample log events by percentage (0.0 .. 1.0). No sampling by default." ,
EnvVars : [ ] string { "TUNNEL_MANAGEMENT_FILTER_SAMPLE" } ,
Value : 1.0 ,
} ,
2023-04-05 17:20:53 +00:00
& cli . StringFlag {
Name : "token" ,
Usage : "Access token for a specific tunnel" ,
Value : "" ,
EnvVars : [ ] string { "TUNNEL_MANAGEMENT_TOKEN" } ,
} ,
& cli . StringFlag {
Name : "management-hostname" ,
Usage : "Management hostname to signify incoming management requests" ,
EnvVars : [ ] string { "TUNNEL_MANAGEMENT_HOSTNAME" } ,
Hidden : true ,
Value : "management.argotunnel.com" ,
} ,
& cli . StringFlag {
Name : "trace" ,
Usage : "Set a cf-trace-id for the request" ,
Hidden : true ,
Value : "" ,
} ,
& cli . StringFlag {
Name : logger . LogLevelFlag ,
Value : "info" ,
2023-04-11 16:54:28 +00:00
Usage : "Application logging level {debug, info, warn, error, fatal}" ,
2023-04-05 17:20:53 +00:00
EnvVars : [ ] string { "TUNNEL_LOGLEVEL" } ,
} ,
2023-04-12 16:43:38 +00:00
& cli . StringFlag {
Name : credentials . OriginCertFlag ,
Usage : "Path to the certificate generated for your origin when you run cloudflared login." ,
EnvVars : [ ] string { "TUNNEL_ORIGIN_CERT" } ,
Value : credentials . FindDefaultOriginCertPath ( ) ,
} ,
2023-04-05 17:20:53 +00:00
} ,
2023-04-18 08:59:55 +00:00
Subcommands : subcommands ,
2023-04-05 17:20:53 +00:00
}
}
// Middleware validation error struct for returning to the eyeball
type managementError struct {
Code int ` json:"code,omitempty" `
Message string ` json:"message,omitempty" `
}
// Middleware validation error HTTP response JSON for returning to the eyeball
type managementErrorResponse struct {
Success bool ` json:"success,omitempty" `
Errors [ ] managementError ` json:"errors,omitempty" `
}
func handleValidationError ( resp * http . Response , log * zerolog . Logger ) {
if resp . StatusCode == 530 {
log . Error ( ) . Msgf ( "no cloudflared connector available or reachable via management request (a recent version of cloudflared is required to use streaming logs)" )
}
var managementErr managementErrorResponse
err := json . NewDecoder ( resp . Body ) . Decode ( & managementErr )
if err != nil {
log . Error ( ) . Msgf ( "unable to start management log streaming session: http response code returned %d" , resp . StatusCode )
return
}
if managementErr . Success || len ( managementErr . Errors ) == 0 {
log . Error ( ) . Msgf ( "management tunnel validation returned success with invalid HTTP response code to convert to a WebSocket request" )
return
}
for _ , e := range managementErr . Errors {
log . Error ( ) . Msgf ( "management request failed validation: (%d) %s" , e . Code , e . Message )
}
}
// logger will be created to emit only against the os.Stderr as to not obstruct with normal output from
// management requests
func createLogger ( c * cli . Context ) * zerolog . Logger {
level , levelErr := zerolog . ParseLevel ( c . String ( logger . LogLevelFlag ) )
if levelErr != nil {
level = zerolog . InfoLevel
}
log := zerolog . New ( zerolog . ConsoleWriter {
Out : colorable . NewColorable ( os . Stderr ) ,
TimeFormat : time . RFC3339 ,
} ) . With ( ) . Timestamp ( ) . Logger ( ) . Level ( level )
return & log
}
2023-04-11 16:54:28 +00:00
// parseFilters will attempt to parse provided filters to send to with the EventStartStreaming
func parseFilters ( c * cli . Context ) ( * management . StreamingFilters , error ) {
var level * management . LogLevel
var events [ ] management . LogEventType
2023-04-24 16:39:26 +00:00
var sample float64
2023-04-11 16:54:28 +00:00
argLevel := c . String ( "level" )
argEvents := c . StringSlice ( "event" )
2023-04-24 16:39:26 +00:00
argSample := c . Float64 ( "sample" )
2023-04-11 16:54:28 +00:00
if argLevel != "" {
l , ok := management . ParseLogLevel ( argLevel )
if ! ok {
return nil , fmt . Errorf ( "invalid --level filter provided, please use one of the following Log Levels: debug, info, warn, error" )
}
level = & l
}
for _ , v := range argEvents {
t , ok := management . ParseLogEventType ( v )
if ! ok {
return nil , fmt . Errorf ( "invalid --event filter provided, please use one of the following EventTypes: cloudflared, http, tcp, udp" )
}
events = append ( events , t )
}
2023-04-24 16:39:26 +00:00
if argSample <= 0.0 || argSample > 1.0 {
return nil , fmt . Errorf ( "invalid --sample value provided, please make sure it is in the range (0.0 .. 1.0)" )
}
sample = argSample
if level == nil && len ( events ) == 0 && argSample != 1.0 {
2023-04-11 16:54:28 +00:00
// When no filters are provided, do not return a StreamingFilters struct
return nil , nil
}
return & management . StreamingFilters {
2023-04-24 16:39:26 +00:00
Level : level ,
Events : events ,
Sampling : sample ,
2023-04-11 16:54:28 +00:00
} , nil
}
2023-04-12 16:43:38 +00:00
// getManagementToken will make a call to the Cloudflare API to acquire a management token for the requested tunnel.
func getManagementToken ( c * cli . Context , log * zerolog . Logger ) ( string , error ) {
userCreds , err := credentials . Read ( c . String ( credentials . OriginCertFlag ) , log )
if err != nil {
return "" , err
}
client , err := userCreds . Client ( c . String ( "api-url" ) , buildInfo . UserAgent ( ) , log )
if err != nil {
return "" , err
}
tunnelIDString := c . Args ( ) . First ( )
if tunnelIDString == "" {
return "" , errors . New ( "no tunnel ID provided" )
}
tunnelID , err := uuid . Parse ( tunnelIDString )
if err != nil {
return "" , errors . New ( "unable to parse provided tunnel id as a valid UUID" )
}
token , err := client . GetManagementToken ( tunnelID )
if err != nil {
return "" , err
}
return token , nil
}
// buildURL will build the management url to contain the required query parameters to authenticate the request.
func buildURL ( c * cli . Context , log * zerolog . Logger ) ( url . URL , error ) {
var err error
managementHostname := c . String ( "management-hostname" )
token := c . String ( "token" )
if token == "" {
token , err = getManagementToken ( c , log )
if err != nil {
return url . URL { } , fmt . Errorf ( "unable to acquire management token for requested tunnel id: %w" , err )
}
}
query := url . Values { }
query . Add ( "access_token" , token )
connector := c . String ( "connector-id" )
if connector != "" {
connectorID , err := uuid . Parse ( connector )
if err != nil {
return url . URL { } , fmt . Errorf ( "unabled to parse 'connector-id' flag into a valid UUID: %w" , err )
}
query . Add ( "connector_id" , connectorID . String ( ) )
}
return url . URL { Scheme : "wss" , Host : managementHostname , Path : "/logs" , RawQuery : query . Encode ( ) } , nil
}
2023-04-05 17:20:53 +00:00
// Run implements a foreground runner
func Run ( c * cli . Context ) error {
log := createLogger ( c )
signals := make ( chan os . Signal , 10 )
signal . Notify ( signals , syscall . SIGTERM , syscall . SIGINT )
defer signal . Stop ( signals )
2023-04-11 16:54:28 +00:00
filters , err := parseFilters ( c )
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "invalid filters provided" )
return nil
}
2023-04-12 16:43:38 +00:00
u , err := buildURL ( c , log )
if err != nil {
log . Err ( err ) . Msg ( "unable to construct management request URL" )
return nil
}
2023-04-05 17:20:53 +00:00
header := make ( http . Header )
2023-04-12 16:43:38 +00:00
header . Add ( "User-Agent" , buildInfo . UserAgent ( ) )
2023-04-05 17:20:53 +00:00
trace := c . String ( "trace" )
if trace != "" {
header [ "cf-trace-id" ] = [ ] string { trace }
}
ctx := c . Context
conn , resp , err := websocket . Dial ( ctx , u . String ( ) , & websocket . DialOptions {
HTTPHeader : header ,
} )
if err != nil {
if resp != nil && resp . StatusCode != http . StatusSwitchingProtocols {
handleValidationError ( resp , log )
return nil
}
log . Error ( ) . Err ( err ) . Msgf ( "unable to start management log streaming session" )
return nil
}
defer conn . Close ( websocket . StatusInternalError , "management connection was closed abruptly" )
// Once connection is established, send start_streaming event to begin receiving logs
err = management . WriteEvent ( conn , ctx , & management . EventStartStreaming {
ClientEvent : management . ClientEvent { Type : management . StartStreaming } ,
2023-04-11 16:54:28 +00:00
Filters : filters ,
2023-04-05 17:20:53 +00:00
} )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "unable to request logs from management tunnel" )
return nil
}
2023-04-12 16:43:38 +00:00
log . Debug ( ) .
Str ( "tunnel-id" , c . Args ( ) . First ( ) ) .
Str ( "connector-id" , c . String ( "connector-id" ) ) .
Interface ( "filters" , filters ) .
Msg ( "connected" )
2023-04-05 17:20:53 +00:00
readerDone := make ( chan struct { } )
go func ( ) {
defer close ( readerDone )
for {
select {
case <- ctx . Done ( ) :
return
default :
event , err := management . ReadServerEvent ( conn , ctx )
if err != nil {
if closeErr := management . AsClosed ( err ) ; closeErr != nil {
// If the client (or the server) already closed the connection, don't continue to
// attempt to read from the client.
if closeErr . Code == websocket . StatusNormalClosure {
return
}
// Only log abnormal closures
log . Error ( ) . Msgf ( "received remote closure: (%d) %s" , closeErr . Code , closeErr . Reason )
return
}
log . Err ( err ) . Msg ( "unable to read event from server" )
return
}
switch event . Type {
case management . Logs :
logs , ok := management . IntoServerEvent ( event , management . Logs )
if ! ok {
log . Error ( ) . Msgf ( "invalid logs event" )
continue
}
// Output all the logs received to stdout
for _ , l := range logs . Logs {
2023-04-06 18:30:42 +00:00
fields , err := json . Marshal ( l . Fields )
if err != nil {
fields = [ ] byte ( "unable to parse fields" )
log . Debug ( ) . Msgf ( "unable to parse fields from event %+v" , l )
}
fmt . Printf ( "%s %s %s %s %s\n" , l . Time , l . Level , l . Event , l . Message , fields )
2023-04-05 17:20:53 +00:00
}
case management . UnknownServerEventType :
fallthrough
default :
log . Debug ( ) . Msgf ( "unexpected log event type: %s" , event . Type )
}
}
}
} ( )
for {
select {
case <- ctx . Done ( ) :
return nil
case <- readerDone :
return nil
case <- signals :
log . Debug ( ) . Msg ( "closing management connection" )
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
conn . Close ( websocket . StatusNormalClosure , "" )
select {
case <- readerDone :
case <- time . After ( time . Second ) :
}
return nil
}
}
}