diff --git a/cmd/cloudflared/tail/cmd.go b/cmd/cloudflared/tail/cmd.go index e33e9b7d..5397cdb6 100644 --- a/cmd/cloudflared/tail/cmd.go +++ b/cmd/cloudflared/tail/cmd.go @@ -187,7 +187,12 @@ func Run(c *cli.Context) error { } // Output all the logs received to stdout for _, l := range logs.Logs { - fmt.Printf("%s %s %s %s\n", l.Timestamp, l.Level, l.Event, l.Message) + fields, err := json.Marshal(l.Fields) + if err != nil { + fields = []byte("unable to parse fields") + log.Debug().Msgf("unable to parse fields from event %+v", l) + } + fmt.Printf("%s %s %s %s %s\n", l.Time, l.Level, l.Event, l.Message, fields) } case management.UnknownServerEventType: fallthrough diff --git a/management/events.go b/management/events.go index c8963752..dc29392b 100644 --- a/management/events.go +++ b/management/events.go @@ -59,7 +59,7 @@ type EventStopStreaming struct { // EventLog is the event that the server sends to the client with the log events. type EventLog struct { ServerEvent - Logs []Log `json:"logs"` + Logs []*Log `json:"logs"` } // LogEventType is the way that logging messages are able to be filtered. @@ -68,10 +68,12 @@ type EventLog struct { type LogEventType int const ( - Cloudflared LogEventType = 0 - HTTP LogEventType = 1 - TCP LogEventType = 2 - UDP LogEventType = 3 + // Cloudflared events are signficant to cloudflared operations like connection state changes. + // Cloudflared is also the default event type for any events that haven't been separated into a proper event type. + Cloudflared LogEventType = iota + HTTP + TCP + UDP ) func (l LogEventType) String() string { @@ -101,12 +103,26 @@ const ( Error LogLevel = "error" ) +const ( + // TimeKey aligns with the zerolog.TimeFieldName + TimeKey = "time" + // LevelKey aligns with the zerolog.LevelFieldName + LevelKey = "level" + // LevelKey aligns with the zerolog.MessageFieldName + MessageKey = "message" + // EventTypeKey is the custom JSON key of the LogEventType in ZeroLogEvent + EventTypeKey = "event" + // FieldsKey is a custom JSON key to match and store every other key for a zerolog event + FieldsKey = "fields" +) + // Log is the basic structure of the events that are sent to the client. type Log struct { - Event LogEventType `json:"event"` - Timestamp string `json:"timestamp"` - Level LogLevel `json:"level"` - Message string `json:"message"` + Time string `json:"time,omitempty"` + Level LogLevel `json:"level,omitempty"` + Message string `json:"message,omitempty"` + Event LogEventType `json:"event,omitempty"` + Fields map[string]interface{} `json:"fields,omitempty"` } // IntoClientEvent unmarshals the provided ClientEvent into the proper type. diff --git a/management/events_test.go b/management/events_test.go index 249ed009..8cead137 100644 --- a/management/events_test.go +++ b/management/events_test.go @@ -62,12 +62,12 @@ func TestIntoServerEvent_Invalid(t *testing.T) { func TestReadServerEvent(t *testing.T) { sentEvent := EventLog{ ServerEvent: ServerEvent{Type: Logs}, - Logs: []Log{ + Logs: []*Log{ { - Timestamp: time.Now().UTC().Format(time.RFC3339), - Event: HTTP, - Level: Info, - Message: "test", + Time: time.Now().UTC().Format(time.RFC3339), + Event: HTTP, + Level: Info, + Message: "test", }, }, } diff --git a/management/logger.go b/management/logger.go index 51e5a2b8..d461cc7b 100644 --- a/management/logger.go +++ b/management/logger.go @@ -18,14 +18,6 @@ const ( logWindow = 30 ) -// ZeroLogEvent is the json structure that zerolog stores it's events as -type ZeroLogEvent struct { - Time string `json:"time,omitempty"` - Level LogLevel `json:"level,omitempty"` - Type LogEventType `json:"type,omitempty"` - Message string `json:"message,omitempty"` -} - // Logger manages the number of management streaming log sessions type Logger struct { sessions []*Session @@ -54,14 +46,14 @@ type LoggerListener interface { type Session struct { // Buffered channel that holds the recent log events - listener chan *ZeroLogEvent + listener chan *Log // Types of log events that this session will provide through the listener filters []LogEventType } func newListener(size int) *Session { return &Session{ - listener: make(chan *ZeroLogEvent, size), + listener: make(chan *Log, size), filters: []LogEventType{}, } } @@ -104,12 +96,10 @@ func (l *Logger) Write(p []byte) (int, error) { if len(l.sessions) == 0 { return len(p), nil } - var event ZeroLogEvent - iter := json.BorrowIterator(p) - defer json.ReturnIterator(iter) - iter.ReadVal(&event) - if iter.Error != nil { - l.Log.Debug().Msg("unable to unmarshal log event") + event, err := parseZerologEvent(p) + // drop event if unable to parse properly + if err != nil { + l.Log.Debug().Msg("unable to parse log event") return len(p), nil } for _, listener := range l.sessions { @@ -118,7 +108,7 @@ func (l *Logger) Write(p []byte) (int, error) { valid := false // make sure listener is subscribed to this event type for _, t := range listener.filters { - if t == event.Type { + if t == event.Event { valid = true break } @@ -129,7 +119,7 @@ func (l *Logger) Write(p []byte) (int, error) { } select { - case listener.listener <- &event: + case listener.listener <- event: default: // buffer is full, discard } @@ -140,3 +130,54 @@ func (l *Logger) Write(p []byte) (int, error) { func (l *Logger) WriteLevel(level zerolog.Level, p []byte) (n int, err error) { return l.Write(p) } + +func parseZerologEvent(p []byte) (*Log, error) { + var fields map[string]interface{} + iter := json.BorrowIterator(p) + defer json.ReturnIterator(iter) + iter.ReadVal(&fields) + if iter.Error != nil { + return nil, iter.Error + } + logTime := time.Now().UTC().Format(zerolog.TimeFieldFormat) + if t, ok := fields[TimeKey]; ok { + if t, ok := t.(string); ok { + logTime = t + } + } + logLevel := Debug + if level, ok := fields[LevelKey]; ok { + if level, ok := level.(string); ok { + logLevel = LogLevel(level) + } + } + // Assume the event type is Cloudflared if unable to parse/find. This could be from log events that haven't + // yet been tagged with the appropriate EventType yet. + logEvent := Cloudflared + e := fields[EventTypeKey] + if e != nil { + if eventNumber, ok := e.(float64); ok { + logEvent = LogEventType(eventNumber) + } + } + logMessage := "" + if m, ok := fields[MessageKey]; ok { + if m, ok := m.(string); ok { + logMessage = m + } + } + event := Log{ + Time: logTime, + Level: logLevel, + Event: logEvent, + Message: logMessage, + } + // Remove the keys that have top level keys on Log + delete(fields, TimeKey) + delete(fields, LevelKey) + delete(fields, EventTypeKey) + delete(fields, MessageKey) + // The rest of the keys go into the Fields + event.Fields = fields + return &event, nil +} diff --git a/management/logger_test.go b/management/logger_test.go index 2ba85364..640d245f 100644 --- a/management/logger_test.go +++ b/management/logger_test.go @@ -5,6 +5,7 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // No listening sessions will not write to the channel @@ -22,12 +23,13 @@ func TestLoggerWrite_OneSession(t *testing.T) { session := logger.Listen() defer logger.Close(session) - zlog.Info().Int("type", int(HTTP)).Msg("hello") + zlog.Info().Int(EventTypeKey, int(HTTP)).Msg("hello") select { case event := <-session.listener: + assert.NotEmpty(t, event.Time) assert.Equal(t, "hello", event.Message) - assert.Equal(t, LogLevel("info"), event.Level) - assert.Equal(t, HTTP, event.Type) + assert.Equal(t, Info, event.Level) + assert.Equal(t, HTTP, event.Event) default: assert.Fail(t, "expected an event to be in the listener") } @@ -41,13 +43,14 @@ func TestLoggerWrite_MultipleSessions(t *testing.T) { session1 := logger.Listen() defer logger.Close(session1) session2 := logger.Listen() - zlog.Info().Int("type", int(HTTP)).Msg("hello") + zlog.Info().Int(EventTypeKey, int(HTTP)).Msg("hello") for _, session := range []*Session{session1, session2} { select { case event := <-session.listener: + assert.NotEmpty(t, event.Time) assert.Equal(t, "hello", event.Message) - assert.Equal(t, LogLevel("info"), event.Level) - assert.Equal(t, HTTP, event.Type) + assert.Equal(t, Info, event.Level) + assert.Equal(t, HTTP, event.Event) default: assert.Fail(t, "expected an event to be in the listener") } @@ -55,12 +58,13 @@ func TestLoggerWrite_MultipleSessions(t *testing.T) { // Close session2 and make sure session1 still receives events logger.Close(session2) - zlog.Info().Int("type", int(HTTP)).Msg("hello2") + zlog.Info().Int(EventTypeKey, int(HTTP)).Msg("hello2") select { case event := <-session1.listener: + assert.NotEmpty(t, event.Time) assert.Equal(t, "hello2", event.Message) - assert.Equal(t, LogLevel("info"), event.Level) - assert.Equal(t, HTTP, event.Type) + assert.Equal(t, Info, event.Level) + assert.Equal(t, HTTP, event.Event) default: assert.Fail(t, "expected an event to be in the listener") } @@ -73,3 +77,62 @@ func TestLoggerWrite_MultipleSessions(t *testing.T) { // pass } } + +type mockWriter struct { + event *Log + err error +} + +func (m *mockWriter) Write(p []byte) (int, error) { + m.event, m.err = parseZerologEvent(p) + return len(p), nil +} + +// Validate all event types are set properly +func TestParseZerologEvent_EventTypes(t *testing.T) { + writer := mockWriter{} + zlog := zerolog.New(&writer).With().Timestamp().Logger().Level(zerolog.InfoLevel) + + for _, test := range []LogEventType{ + Cloudflared, + HTTP, + TCP, + UDP, + } { + t.Run(test.String(), func(t *testing.T) { + defer func() { writer.err = nil }() + zlog.Info().Int(EventTypeKey, int(test)).Msg("test") + require.NoError(t, writer.err) + require.Equal(t, test, writer.event.Event) + }) + } + + // Invalid defaults to Cloudflared LogEventType + t.Run("Invalid", func(t *testing.T) { + defer func() { writer.err = nil }() + zlog.Info().Str(EventTypeKey, "unknown").Msg("test") + require.NoError(t, writer.err) + require.Equal(t, Cloudflared, writer.event.Event) + }) +} + +// Validate top-level keys are removed from Fields +func TestParseZerologEvent_Fields(t *testing.T) { + writer := mockWriter{} + zlog := zerolog.New(&writer).With().Timestamp().Logger().Level(zerolog.InfoLevel) + zlog.Info().Int(EventTypeKey, int(Cloudflared)).Str("test", "test").Msg("test message") + require.NoError(t, writer.err) + event := writer.event + require.NotEmpty(t, event.Time) + require.Equal(t, Cloudflared, event.Event) + require.Equal(t, Info, event.Level) + require.Equal(t, "test message", event.Message) + + // Make sure Fields doesn't have other set keys used in the Log struct + require.NotEmpty(t, event.Fields) + require.Equal(t, "test", event.Fields["test"]) + require.NotContains(t, event.Fields, EventTypeKey) + require.NotContains(t, event.Fields, LevelKey) + require.NotContains(t, event.Fields, MessageKey) + require.NotContains(t, event.Fields, TimeKey) +} diff --git a/management/service.go b/management/service.go index 041bd8e7..0514178a 100644 --- a/management/service.go +++ b/management/service.go @@ -96,12 +96,7 @@ func (m *ManagementService) streamLogs(c *websocket.Conn, ctx context.Context, s case event := <-session.listener: err := WriteEvent(c, ctx, &EventLog{ ServerEvent: ServerEvent{Type: Logs}, - Logs: []Log{{ - Event: Cloudflared, - Timestamp: event.Time, - Level: event.Level, - Message: event.Message, - }}, + Logs: []*Log{event}, }) if err != nil { // If the client (or the server) already closed the connection, don't attempt to close it again diff --git a/proxy/proxy.go b/proxy/proxy.go index 61798742..194e97af 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -16,6 +16,7 @@ import ( "github.com/cloudflare/cloudflared/cfio" "github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/ingress" + "github.com/cloudflare/cloudflared/management" "github.com/cloudflare/cloudflared/stream" "github.com/cloudflare/cloudflared/tracing" tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" @@ -25,6 +26,7 @@ const ( // TagHeaderNamePrefix indicates a Cloudflared Warp Tag prefix that gets appended for warp traffic stream headers. TagHeaderNamePrefix = "Cf-Warp-Tag-" LogFieldCFRay = "cfRay" + LogFieldLBProbe = "lbProbe" LogFieldRule = "ingressRule" LogFieldOriginService = "originService" LogFieldFlowID = "flowID" @@ -339,7 +341,7 @@ func (p *Proxy) appendTagHeaders(r *http.Request) { type logFields struct { cfRay string lbProbe bool - rule interface{} + rule int flowID string connIndex uint8 } @@ -353,45 +355,41 @@ func copyTrailers(w connection.ResponseWriter, response *http.Response) { } func (p *Proxy) logRequest(r *http.Request, fields logFields) { + log := p.log.With().Int(management.EventTypeKey, int(management.HTTP)).Logger() + event := log.Debug() if fields.cfRay != "" { - p.log.Debug().Msgf("CF-RAY: %s %s %s %s", fields.cfRay, r.Method, r.URL, r.Proto) - } else if fields.lbProbe { - p.log.Debug().Msgf("CF-RAY: %s Load Balancer health check %s %s %s", fields.cfRay, r.Method, r.URL, r.Proto) - } else { - p.log.Debug().Msgf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", r.Method, r.URL, r.Proto) + event = event.Str(LogFieldCFRay, fields.cfRay) } - p.log.Debug(). - Str("CF-RAY", fields.cfRay). - Str("Header", fmt.Sprintf("%+v", r.Header)). + if fields.lbProbe { + event = event.Bool(LogFieldLBProbe, fields.lbProbe) + } + if fields.cfRay == "" && !fields.lbProbe { + log.Debug().Msgf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", r.Method, r.URL, r.Proto) + } + event. + Uint8(LogFieldConnIndex, fields.connIndex). Str("host", r.Host). Str("path", r.URL.Path). - Interface("rule", fields.rule). - Uint8(LogFieldConnIndex, fields.connIndex). - Msg("Inbound request") - - if contentLen := r.ContentLength; contentLen == -1 { - p.log.Debug().Msgf("CF-RAY: %s Request Content length unknown", fields.cfRay) - } else { - p.log.Debug().Msgf("CF-RAY: %s Request content length %d", fields.cfRay, contentLen) - } + Interface(LogFieldRule, fields.rule). + Interface("headers", r.Header). + Int64("content-length", r.ContentLength). + Msgf("%s %s %s", r.Method, r.URL, r.Proto) } func (p *Proxy) logOriginResponse(resp *http.Response, fields logFields) { responseByCode.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc() + event := p.log.Debug() if fields.cfRay != "" { - p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Status: %s served by ingress %d", fields.cfRay, resp.Status, fields.rule) - } else if fields.lbProbe { - p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("Response to Load Balancer health check %s", resp.Status) - } else { - p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("Status: %s served by ingress %v", resp.Status, fields.rule) + event = event.Str(LogFieldCFRay, fields.cfRay) } - p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Response Headers %+v", fields.cfRay, resp.Header) - - if contentLen := resp.ContentLength; contentLen == -1 { - p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Response content length unknown", fields.cfRay) - } else { - p.log.Debug().Uint8(LogFieldConnIndex, fields.connIndex).Msgf("CF-RAY: %s Response content length %d", fields.cfRay, contentLen) + if fields.lbProbe { + event = event.Bool(LogFieldLBProbe, fields.lbProbe) } + event. + Int(management.EventTypeKey, int(management.HTTP)). + Uint8(LogFieldConnIndex, fields.connIndex). + Int64("content-length", resp.ContentLength). + Msgf("%s", resp.Status) } func (p *Proxy) logRequestError(err error, cfRay string, flowID string, rule, service string) { @@ -401,7 +399,9 @@ func (p *Proxy) logRequestError(err error, cfRay string, flowID string, rule, se log = log.Str(LogFieldCFRay, cfRay) } if flowID != "" { - log = log.Str(LogFieldFlowID, flowID) + log = log.Str(LogFieldFlowID, flowID).Int(management.EventTypeKey, int(management.TCP)) + } else { + log = log.Int(management.EventTypeKey, int(management.HTTP)) } if rule != "" { log = log.Str(LogFieldRule, rule) @@ -409,7 +409,7 @@ func (p *Proxy) logRequestError(err error, cfRay string, flowID string, rule, se if service != "" { log = log.Str(LogFieldOriginService, service) } - log.Msg("") + log.Send() } func getDestFromRule(rule *ingress.Rule, req *http.Request) (string, error) {