From 8dc0697a8f095fc54814e13ed90e8e7adb36700d Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Tue, 11 Apr 2023 09:54:28 -0700 Subject: [PATCH] TUN-7132 TUN-7136: Add filter support for streaming logs Additionally adds similar support in cloudflared tail to provide filters for events and log level. --- cmd/cloudflared/tail/cmd.go | 55 +++++++++++++++++- management/events.go | 94 ++++++++++++++++++++++++++++--- management/events_test.go | 81 +++++++++++++++++++++++++-- management/logger.go | 85 +++++++++++++++++----------- management/logger_test.go | 107 ++++++++++++++++++++++++++++++++++-- management/service.go | 6 +- 6 files changed, 376 insertions(+), 52 deletions(-) diff --git a/cmd/cloudflared/tail/cmd.go b/cmd/cloudflared/tail/cmd.go index 5397cdb6..55864e90 100644 --- a/cmd/cloudflared/tail/cmd.go +++ b/cmd/cloudflared/tail/cmd.go @@ -39,6 +39,17 @@ func Command() *cli.Command { Value: "", EnvVars: []string{"TUNNEL_MANAGEMENT_CONNECTOR"}, }, + &cli.StringSliceFlag{ + Name: "event", + Usage: "Filter by specific Events (cloudflared, http, tcp, udp) otherwise, defaults to send all events", + EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_EVENTS"}, + }, + &cli.StringFlag{ + Name: "level", + Usage: "Filter by specific log levels (debug, info, warn, error)", + EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_LEVEL"}, + Value: "debug", + }, &cli.StringFlag{ Name: "token", Usage: "Access token for a specific tunnel", @@ -61,7 +72,7 @@ func Command() *cli.Command { &cli.StringFlag{ Name: logger.LogLevelFlag, Value: "info", - Usage: "Application logging level {debug, info, warn, error, fatal}. ", + Usage: "Application logging level {debug, info, warn, error, fatal}", EnvVars: []string{"TUNNEL_LOGLEVEL"}, }, }, @@ -113,6 +124,41 @@ func createLogger(c *cli.Context) *zerolog.Logger { return &log } +// parseFilters will attempt to parse provided filters to send to with the EventStartStreaming +func parseFilters(c *cli.Context) (*management.StreamingFilters, error) { + var level *management.LogLevel + var events []management.LogEventType + + argLevel := c.String("level") + argEvents := c.StringSlice("event") + + if argLevel != "" { + l, ok := management.ParseLogLevel(argLevel) + if !ok { + return nil, fmt.Errorf("invalid --level filter provided, please use one of the following Log Levels: debug, info, warn, error") + } + level = &l + } + + for _, v := range argEvents { + t, ok := management.ParseLogEventType(v) + if !ok { + return nil, fmt.Errorf("invalid --event filter provided, please use one of the following EventTypes: cloudflared, http, tcp, udp") + } + events = append(events, t) + } + + if level == nil && len(events) == 0 { + // When no filters are provided, do not return a StreamingFilters struct + return nil, nil + } + + return &management.StreamingFilters{ + Level: level, + Events: events, + }, nil +} + // Run implements a foreground runner func Run(c *cli.Context) error { log := createLogger(c) @@ -121,6 +167,12 @@ func Run(c *cli.Context) error { signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) defer signal.Stop(signals) + filters, err := parseFilters(c) + if err != nil { + log.Error().Err(err).Msgf("invalid filters provided") + return nil + } + managementHostname := c.String("management-hostname") token := c.String("token") u := url.URL{Scheme: "wss", Host: managementHostname, Path: "/logs", RawQuery: "access_token=" + token} @@ -148,6 +200,7 @@ func Run(c *cli.Context) error { // Once connection is established, send start_streaming event to begin receiving logs err = management.WriteEvent(conn, ctx, &management.EventStartStreaming{ ClientEvent: management.ClientEvent{Type: management.StartStreaming}, + Filters: filters, }) if err != nil { log.Error().Err(err).Msg("unable to request logs from management tunnel") diff --git a/management/events.go b/management/events.go index dc29392b..b89f81f4 100644 --- a/management/events.go +++ b/management/events.go @@ -48,7 +48,12 @@ type ClientEvent struct { // Additional filters can be provided to augment the log events requested. type EventStartStreaming struct { ClientEvent - Filters []string `json:"filters"` + Filters *StreamingFilters `json:"filters,omitempty"` +} + +type StreamingFilters struct { + Events []LogEventType `json:"events,omitempty"` + Level *LogLevel `json:"level,omitempty"` } // EventStopStreaming signifies that the client wishes to halt receiving log events. @@ -65,7 +70,7 @@ type EventLog struct { // LogEventType is the way that logging messages are able to be filtered. // Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only // the Cloudflared-related events. -type LogEventType int +type LogEventType int8 const ( // Cloudflared events are signficant to cloudflared operations like connection state changes. @@ -76,6 +81,20 @@ const ( UDP ) +func ParseLogEventType(s string) (LogEventType, bool) { + switch s { + case "cloudflared": + return Cloudflared, true + case "http": + return HTTP, true + case "tcp": + return TCP, true + case "udp": + return UDP, true + } + return -1, false +} + func (l LogEventType) String() string { switch l { case Cloudflared: @@ -91,18 +110,79 @@ func (l LogEventType) String() string { } } +func (l LogEventType) MarshalJSON() ([]byte, error) { + return json.Marshal(l.String()) +} + +func (e *LogEventType) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return errors.New("unable to unmarshal LogEventType string") + } + if event, ok := ParseLogEventType(s); ok { + *e = event + return nil + } + return errors.New("unable to unmarshal LogEventType") +} + // LogLevel corresponds to the zerolog logging levels // "panic", "fatal", and "trace" are exempt from this list as they are rarely used and, at least // the the first two are limited to failure conditions that lead to cloudflared shutting down. -type LogLevel string +type LogLevel int8 const ( - Debug LogLevel = "debug" - Info LogLevel = "info" - Warn LogLevel = "warn" - Error LogLevel = "error" + Debug LogLevel = 0 + Info LogLevel = 1 + Warn LogLevel = 2 + Error LogLevel = 3 ) +func ParseLogLevel(l string) (LogLevel, bool) { + switch l { + case "debug": + return Debug, true + case "info": + return Info, true + case "warn": + return Warn, true + case "error": + return Error, true + } + return -1, false +} + +func (l LogLevel) String() string { + switch l { + case Debug: + return "debug" + case Info: + return "info" + case Warn: + return "warn" + case Error: + return "error" + default: + return "" + } +} + +func (l LogLevel) MarshalJSON() ([]byte, error) { + return json.Marshal(l.String()) +} + +func (l *LogLevel) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return errors.New("unable to unmarshal LogLevel string") + } + if level, ok := ParseLogLevel(s); ok { + *l = level + return nil + } + return fmt.Errorf("unable to unmarshal LogLevel") +} + const ( // TimeKey aligns with the zerolog.TimeFieldName TimeKey = "time" diff --git a/management/events_test.go b/management/events_test.go index 8cead137..70cd4428 100644 --- a/management/events_test.go +++ b/management/events_test.go @@ -11,14 +11,83 @@ import ( "github.com/cloudflare/cloudflared/internal/test" ) +var ( + debugLevel *LogLevel + infoLevel *LogLevel + warnLevel *LogLevel + errorLevel *LogLevel +) + +func init() { + // created here because we can't do a reference to a const enum, i.e. &Info + debugLevel := new(LogLevel) + *debugLevel = Debug + infoLevel := new(LogLevel) + *infoLevel = Info + warnLevel := new(LogLevel) + *warnLevel = Warn + errorLevel := new(LogLevel) + *errorLevel = Error +} + func TestIntoClientEvent_StartStreaming(t *testing.T) { - event := ClientEvent{ - Type: StartStreaming, - event: []byte(`{"type": "start_streaming"}`), + for _, test := range []struct { + name string + expected EventStartStreaming + }{ + { + name: "no filters", + expected: EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}}, + }, + { + name: "level filter", + expected: EventStartStreaming{ + ClientEvent: ClientEvent{Type: StartStreaming}, + Filters: &StreamingFilters{ + Level: infoLevel, + }, + }, + }, + { + name: "events filter", + expected: EventStartStreaming{ + ClientEvent: ClientEvent{Type: StartStreaming}, + Filters: &StreamingFilters{ + Events: []LogEventType{Cloudflared, HTTP}, + }, + }, + }, + { + name: "level and events filters", + expected: EventStartStreaming{ + ClientEvent: ClientEvent{Type: StartStreaming}, + Filters: &StreamingFilters{ + Level: infoLevel, + Events: []LogEventType{Cloudflared}, + }, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + data, err := json.Marshal(test.expected) + require.NoError(t, err) + event := ClientEvent{} + err = json.Unmarshal(data, &event) + require.NoError(t, err) + event.event = data + ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming) + require.True(t, ok) + require.Equal(t, test.expected.ClientEvent, ce.ClientEvent) + if test.expected.Filters != nil { + f := ce.Filters + ef := test.expected.Filters + if ef.Level != nil { + require.Equal(t, *ef.Level, *f.Level) + } + require.ElementsMatch(t, ef.Events, f.Events) + } + }) } - ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming) - require.True(t, ok) - require.Equal(t, EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}}, *ce) } func TestIntoClientEvent_StopStreaming(t *testing.T) { diff --git a/management/logger.go b/management/logger.go index d461cc7b..272d01b6 100644 --- a/management/logger.go +++ b/management/logger.go @@ -40,7 +40,7 @@ func NewLogger() *Logger { } type LoggerListener interface { - Listen() *Session + Listen(*StreamingFilters) *Session Close(*Session) } @@ -48,21 +48,55 @@ type Session struct { // Buffered channel that holds the recent log events listener chan *Log // Types of log events that this session will provide through the listener - filters []LogEventType + filters *StreamingFilters } -func newListener(size int) *Session { - return &Session{ +func newSession(size int, filters *StreamingFilters) *Session { + s := &Session{ listener: make(chan *Log, size), - filters: []LogEventType{}, + } + if filters != nil { + s.filters = filters + } else { + s.filters = &StreamingFilters{} + } + return s +} + +// Insert attempts to insert the log to the session. If the log event matches the provided session filters, it +// will be applied to the listener. +func (s *Session) Insert(log *Log) { + // Level filters are optional + if s.filters.Level != nil { + if *s.filters.Level > log.Level { + return + } + } + // Event filters are optional + if len(s.filters.Events) != 0 && !contains(s.filters.Events, log.Event) { + return + } + select { + case s.listener <- log: + default: + // buffer is full, discard } } +func contains(array []LogEventType, t LogEventType) bool { + for _, v := range array { + if v == t { + return true + } + } + return false +} + // Listen creates a new Session that will append filtered log events as they are created. -func (l *Logger) Listen() *Session { +func (l *Logger) Listen(filters *StreamingFilters) *Session { l.mu.Lock() defer l.mu.Unlock() - listener := newListener(logWindow) + listener := newSession(logWindow, filters) l.sessions = append(l.sessions, listener) return listener } @@ -102,27 +136,8 @@ func (l *Logger) Write(p []byte) (int, error) { l.Log.Debug().Msg("unable to parse log event") return len(p), nil } - for _, listener := range l.sessions { - // no filters means all types are allowed - if len(listener.filters) != 0 { - valid := false - // make sure listener is subscribed to this event type - for _, t := range listener.filters { - if t == event.Event { - valid = true - break - } - } - if !valid { - continue - } - } - - select { - case listener.listener <- event: - default: - // buffer is full, discard - } + for _, session := range l.sessions { + session.Insert(event) } return len(p), nil } @@ -146,9 +161,17 @@ func parseZerologEvent(p []byte) (*Log, error) { } } logLevel := Debug - if level, ok := fields[LevelKey]; ok { - if level, ok := level.(string); ok { - logLevel = LogLevel(level) + // A zerolog Debug event can be created and then an error can be added + // via .Err(error), if so, we upgrade the level to error. + if _, hasError := fields["error"]; hasError { + logLevel = Error + } else { + if level, ok := fields[LevelKey]; ok { + if level, ok := level.(string); ok { + if logLevel, ok = ParseLogLevel(level); !ok { + logLevel = Debug + } + } } } // Assume the event type is Cloudflared if unable to parse/find. This could be from log events that haven't diff --git a/management/logger_test.go b/management/logger_test.go index 640d245f..bd4b07e4 100644 --- a/management/logger_test.go +++ b/management/logger_test.go @@ -2,6 +2,7 @@ package management import ( "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -21,7 +22,7 @@ func TestLoggerWrite_OneSession(t *testing.T) { logger := NewLogger() zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel) - session := logger.Listen() + session := logger.Listen(nil) defer logger.Close(session) zlog.Info().Int(EventTypeKey, int(HTTP)).Msg("hello") select { @@ -40,9 +41,9 @@ func TestLoggerWrite_MultipleSessions(t *testing.T) { logger := NewLogger() zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel) - session1 := logger.Listen() + session1 := logger.Listen(nil) defer logger.Close(session1) - session2 := logger.Listen() + session2 := logger.Listen(nil) zlog.Info().Int(EventTypeKey, int(HTTP)).Msg("hello") for _, session := range []*Session{session1, session2} { select { @@ -78,6 +79,104 @@ func TestLoggerWrite_MultipleSessions(t *testing.T) { } } +// Validate that the session filters events +func TestSession_Insert(t *testing.T) { + infoLevel := new(LogLevel) + *infoLevel = Info + warnLevel := new(LogLevel) + *warnLevel = Warn + for _, test := range []struct { + name string + filters StreamingFilters + expectLog bool + }{ + { + name: "none", + expectLog: true, + }, + { + name: "level", + filters: StreamingFilters{ + Level: infoLevel, + }, + expectLog: true, + }, + { + name: "filtered out level", + filters: StreamingFilters{ + Level: warnLevel, + }, + expectLog: false, + }, + { + name: "events", + filters: StreamingFilters{ + Events: []LogEventType{HTTP}, + }, + expectLog: true, + }, + { + name: "filtered out event", + filters: StreamingFilters{ + Events: []LogEventType{Cloudflared}, + }, + expectLog: false, + }, + { + name: "filter and event", + filters: StreamingFilters{ + Level: infoLevel, + Events: []LogEventType{HTTP}, + }, + expectLog: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + session := newSession(4, &test.filters) + log := Log{ + Time: time.Now().UTC().Format(time.RFC3339), + Event: HTTP, + Level: Info, + Message: "test", + } + session.Insert(&log) + select { + case <-session.listener: + require.True(t, test.expectLog) + default: + require.False(t, test.expectLog) + } + }) + } +} + +// Validate that the session has a max amount of events to hold +func TestSession_InsertOverflow(t *testing.T) { + session := newSession(1, nil) + log := Log{ + Time: time.Now().UTC().Format(time.RFC3339), + Event: HTTP, + Level: Info, + Message: "test", + } + // Insert 2 but only max channel size for 1 + session.Insert(&log) + session.Insert(&log) + select { + case <-session.listener: + // pass + default: + require.Fail(t, "expected one log event") + } + // Second dequeue should fail + select { + case <-session.listener: + require.Fail(t, "expected no more remaining log events") + default: + // pass + } +} + type mockWriter struct { event *Log err error @@ -108,7 +207,7 @@ func TestParseZerologEvent_EventTypes(t *testing.T) { } // Invalid defaults to Cloudflared LogEventType - t.Run("Invalid", func(t *testing.T) { + t.Run("invalid", func(t *testing.T) { defer func() { writer.err = nil }() zlog.Info().Str(EventTypeKey, "unknown").Msg("test") require.NoError(t, writer.err) diff --git a/management/service.go b/management/service.go index daf5687f..7f19f46b 100644 --- a/management/service.go +++ b/management/service.go @@ -131,13 +131,13 @@ func (m *ManagementService) startStreaming(c *websocket.Conn, ctx context.Contex return } // Expect the first incoming request - _, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming) + startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming) if !ok { - m.log.Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Msgf("expected start_streaming as first recieved event") + m.log.Warn().Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Msgf("expected start_streaming as first recieved event") return } m.streaming.Store(true) - listener := m.logger.Listen() + listener := m.logger.Listen(startEvent.Filters) m.log.Debug().Msgf("Streaming logs") go m.streamLogs(c, ctx, listener) }