TUN-7132 TUN-7136: Add filter support for streaming logs

Additionally adds similar support in cloudflared tail to provide
filters for events and log level.
This commit is contained in:
Devin Carr 2023-04-11 09:54:28 -07:00
parent 5dbf76a7aa
commit 8dc0697a8f
6 changed files with 376 additions and 52 deletions

View File

@ -39,6 +39,17 @@ func Command() *cli.Command {
Value: "",
EnvVars: []string{"TUNNEL_MANAGEMENT_CONNECTOR"},
},
&cli.StringSliceFlag{
Name: "event",
Usage: "Filter by specific Events (cloudflared, http, tcp, udp) otherwise, defaults to send all events",
EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_EVENTS"},
},
&cli.StringFlag{
Name: "level",
Usage: "Filter by specific log levels (debug, info, warn, error)",
EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_LEVEL"},
Value: "debug",
},
&cli.StringFlag{
Name: "token",
Usage: "Access token for a specific tunnel",
@ -61,7 +72,7 @@ func Command() *cli.Command {
&cli.StringFlag{
Name: logger.LogLevelFlag,
Value: "info",
Usage: "Application logging level {debug, info, warn, error, fatal}. ",
Usage: "Application logging level {debug, info, warn, error, fatal}",
EnvVars: []string{"TUNNEL_LOGLEVEL"},
},
},
@ -113,6 +124,41 @@ func createLogger(c *cli.Context) *zerolog.Logger {
return &log
}
// parseFilters will attempt to parse provided filters to send to with the EventStartStreaming
func parseFilters(c *cli.Context) (*management.StreamingFilters, error) {
var level *management.LogLevel
var events []management.LogEventType
argLevel := c.String("level")
argEvents := c.StringSlice("event")
if argLevel != "" {
l, ok := management.ParseLogLevel(argLevel)
if !ok {
return nil, fmt.Errorf("invalid --level filter provided, please use one of the following Log Levels: debug, info, warn, error")
}
level = &l
}
for _, v := range argEvents {
t, ok := management.ParseLogEventType(v)
if !ok {
return nil, fmt.Errorf("invalid --event filter provided, please use one of the following EventTypes: cloudflared, http, tcp, udp")
}
events = append(events, t)
}
if level == nil && len(events) == 0 {
// When no filters are provided, do not return a StreamingFilters struct
return nil, nil
}
return &management.StreamingFilters{
Level: level,
Events: events,
}, nil
}
// Run implements a foreground runner
func Run(c *cli.Context) error {
log := createLogger(c)
@ -121,6 +167,12 @@ func Run(c *cli.Context) error {
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
defer signal.Stop(signals)
filters, err := parseFilters(c)
if err != nil {
log.Error().Err(err).Msgf("invalid filters provided")
return nil
}
managementHostname := c.String("management-hostname")
token := c.String("token")
u := url.URL{Scheme: "wss", Host: managementHostname, Path: "/logs", RawQuery: "access_token=" + token}
@ -148,6 +200,7 @@ func Run(c *cli.Context) error {
// Once connection is established, send start_streaming event to begin receiving logs
err = management.WriteEvent(conn, ctx, &management.EventStartStreaming{
ClientEvent: management.ClientEvent{Type: management.StartStreaming},
Filters: filters,
})
if err != nil {
log.Error().Err(err).Msg("unable to request logs from management tunnel")

View File

@ -48,7 +48,12 @@ type ClientEvent struct {
// Additional filters can be provided to augment the log events requested.
type EventStartStreaming struct {
ClientEvent
Filters []string `json:"filters"`
Filters *StreamingFilters `json:"filters,omitempty"`
}
type StreamingFilters struct {
Events []LogEventType `json:"events,omitempty"`
Level *LogLevel `json:"level,omitempty"`
}
// EventStopStreaming signifies that the client wishes to halt receiving log events.
@ -65,7 +70,7 @@ type EventLog struct {
// LogEventType is the way that logging messages are able to be filtered.
// Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only
// the Cloudflared-related events.
type LogEventType int
type LogEventType int8
const (
// Cloudflared events are signficant to cloudflared operations like connection state changes.
@ -76,6 +81,20 @@ const (
UDP
)
func ParseLogEventType(s string) (LogEventType, bool) {
switch s {
case "cloudflared":
return Cloudflared, true
case "http":
return HTTP, true
case "tcp":
return TCP, true
case "udp":
return UDP, true
}
return -1, false
}
func (l LogEventType) String() string {
switch l {
case Cloudflared:
@ -91,18 +110,79 @@ func (l LogEventType) String() string {
}
}
func (l LogEventType) MarshalJSON() ([]byte, error) {
return json.Marshal(l.String())
}
func (e *LogEventType) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return errors.New("unable to unmarshal LogEventType string")
}
if event, ok := ParseLogEventType(s); ok {
*e = event
return nil
}
return errors.New("unable to unmarshal LogEventType")
}
// LogLevel corresponds to the zerolog logging levels
// "panic", "fatal", and "trace" are exempt from this list as they are rarely used and, at least
// the the first two are limited to failure conditions that lead to cloudflared shutting down.
type LogLevel string
type LogLevel int8
const (
Debug LogLevel = "debug"
Info LogLevel = "info"
Warn LogLevel = "warn"
Error LogLevel = "error"
Debug LogLevel = 0
Info LogLevel = 1
Warn LogLevel = 2
Error LogLevel = 3
)
func ParseLogLevel(l string) (LogLevel, bool) {
switch l {
case "debug":
return Debug, true
case "info":
return Info, true
case "warn":
return Warn, true
case "error":
return Error, true
}
return -1, false
}
func (l LogLevel) String() string {
switch l {
case Debug:
return "debug"
case Info:
return "info"
case Warn:
return "warn"
case Error:
return "error"
default:
return ""
}
}
func (l LogLevel) MarshalJSON() ([]byte, error) {
return json.Marshal(l.String())
}
func (l *LogLevel) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return errors.New("unable to unmarshal LogLevel string")
}
if level, ok := ParseLogLevel(s); ok {
*l = level
return nil
}
return fmt.Errorf("unable to unmarshal LogLevel")
}
const (
// TimeKey aligns with the zerolog.TimeFieldName
TimeKey = "time"

View File

@ -11,14 +11,83 @@ import (
"github.com/cloudflare/cloudflared/internal/test"
)
var (
debugLevel *LogLevel
infoLevel *LogLevel
warnLevel *LogLevel
errorLevel *LogLevel
)
func init() {
// created here because we can't do a reference to a const enum, i.e. &Info
debugLevel := new(LogLevel)
*debugLevel = Debug
infoLevel := new(LogLevel)
*infoLevel = Info
warnLevel := new(LogLevel)
*warnLevel = Warn
errorLevel := new(LogLevel)
*errorLevel = Error
}
func TestIntoClientEvent_StartStreaming(t *testing.T) {
event := ClientEvent{
Type: StartStreaming,
event: []byte(`{"type": "start_streaming"}`),
for _, test := range []struct {
name string
expected EventStartStreaming
}{
{
name: "no filters",
expected: EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}},
},
{
name: "level filter",
expected: EventStartStreaming{
ClientEvent: ClientEvent{Type: StartStreaming},
Filters: &StreamingFilters{
Level: infoLevel,
},
},
},
{
name: "events filter",
expected: EventStartStreaming{
ClientEvent: ClientEvent{Type: StartStreaming},
Filters: &StreamingFilters{
Events: []LogEventType{Cloudflared, HTTP},
},
},
},
{
name: "level and events filters",
expected: EventStartStreaming{
ClientEvent: ClientEvent{Type: StartStreaming},
Filters: &StreamingFilters{
Level: infoLevel,
Events: []LogEventType{Cloudflared},
},
},
},
} {
t.Run(test.name, func(t *testing.T) {
data, err := json.Marshal(test.expected)
require.NoError(t, err)
event := ClientEvent{}
err = json.Unmarshal(data, &event)
require.NoError(t, err)
event.event = data
ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
require.True(t, ok)
require.Equal(t, test.expected.ClientEvent, ce.ClientEvent)
if test.expected.Filters != nil {
f := ce.Filters
ef := test.expected.Filters
if ef.Level != nil {
require.Equal(t, *ef.Level, *f.Level)
}
require.ElementsMatch(t, ef.Events, f.Events)
}
})
}
ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
require.True(t, ok)
require.Equal(t, EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}}, *ce)
}
func TestIntoClientEvent_StopStreaming(t *testing.T) {

View File

@ -40,7 +40,7 @@ func NewLogger() *Logger {
}
type LoggerListener interface {
Listen() *Session
Listen(*StreamingFilters) *Session
Close(*Session)
}
@ -48,21 +48,55 @@ type Session struct {
// Buffered channel that holds the recent log events
listener chan *Log
// Types of log events that this session will provide through the listener
filters []LogEventType
filters *StreamingFilters
}
func newListener(size int) *Session {
return &Session{
func newSession(size int, filters *StreamingFilters) *Session {
s := &Session{
listener: make(chan *Log, size),
filters: []LogEventType{},
}
if filters != nil {
s.filters = filters
} else {
s.filters = &StreamingFilters{}
}
return s
}
// Insert attempts to insert the log to the session. If the log event matches the provided session filters, it
// will be applied to the listener.
func (s *Session) Insert(log *Log) {
// Level filters are optional
if s.filters.Level != nil {
if *s.filters.Level > log.Level {
return
}
}
// Event filters are optional
if len(s.filters.Events) != 0 && !contains(s.filters.Events, log.Event) {
return
}
select {
case s.listener <- log:
default:
// buffer is full, discard
}
}
func contains(array []LogEventType, t LogEventType) bool {
for _, v := range array {
if v == t {
return true
}
}
return false
}
// Listen creates a new Session that will append filtered log events as they are created.
func (l *Logger) Listen() *Session {
func (l *Logger) Listen(filters *StreamingFilters) *Session {
l.mu.Lock()
defer l.mu.Unlock()
listener := newListener(logWindow)
listener := newSession(logWindow, filters)
l.sessions = append(l.sessions, listener)
return listener
}
@ -102,27 +136,8 @@ func (l *Logger) Write(p []byte) (int, error) {
l.Log.Debug().Msg("unable to parse log event")
return len(p), nil
}
for _, listener := range l.sessions {
// no filters means all types are allowed
if len(listener.filters) != 0 {
valid := false
// make sure listener is subscribed to this event type
for _, t := range listener.filters {
if t == event.Event {
valid = true
break
}
}
if !valid {
continue
}
}
select {
case listener.listener <- event:
default:
// buffer is full, discard
}
for _, session := range l.sessions {
session.Insert(event)
}
return len(p), nil
}
@ -146,9 +161,17 @@ func parseZerologEvent(p []byte) (*Log, error) {
}
}
logLevel := Debug
if level, ok := fields[LevelKey]; ok {
if level, ok := level.(string); ok {
logLevel = LogLevel(level)
// A zerolog Debug event can be created and then an error can be added
// via .Err(error), if so, we upgrade the level to error.
if _, hasError := fields["error"]; hasError {
logLevel = Error
} else {
if level, ok := fields[LevelKey]; ok {
if level, ok := level.(string); ok {
if logLevel, ok = ParseLogLevel(level); !ok {
logLevel = Debug
}
}
}
}
// Assume the event type is Cloudflared if unable to parse/find. This could be from log events that haven't

View File

@ -2,6 +2,7 @@ package management
import (
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
@ -21,7 +22,7 @@ func TestLoggerWrite_OneSession(t *testing.T) {
logger := NewLogger()
zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel)
session := logger.Listen()
session := logger.Listen(nil)
defer logger.Close(session)
zlog.Info().Int(EventTypeKey, int(HTTP)).Msg("hello")
select {
@ -40,9 +41,9 @@ func TestLoggerWrite_MultipleSessions(t *testing.T) {
logger := NewLogger()
zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel)
session1 := logger.Listen()
session1 := logger.Listen(nil)
defer logger.Close(session1)
session2 := logger.Listen()
session2 := logger.Listen(nil)
zlog.Info().Int(EventTypeKey, int(HTTP)).Msg("hello")
for _, session := range []*Session{session1, session2} {
select {
@ -78,6 +79,104 @@ func TestLoggerWrite_MultipleSessions(t *testing.T) {
}
}
// Validate that the session filters events
func TestSession_Insert(t *testing.T) {
infoLevel := new(LogLevel)
*infoLevel = Info
warnLevel := new(LogLevel)
*warnLevel = Warn
for _, test := range []struct {
name string
filters StreamingFilters
expectLog bool
}{
{
name: "none",
expectLog: true,
},
{
name: "level",
filters: StreamingFilters{
Level: infoLevel,
},
expectLog: true,
},
{
name: "filtered out level",
filters: StreamingFilters{
Level: warnLevel,
},
expectLog: false,
},
{
name: "events",
filters: StreamingFilters{
Events: []LogEventType{HTTP},
},
expectLog: true,
},
{
name: "filtered out event",
filters: StreamingFilters{
Events: []LogEventType{Cloudflared},
},
expectLog: false,
},
{
name: "filter and event",
filters: StreamingFilters{
Level: infoLevel,
Events: []LogEventType{HTTP},
},
expectLog: true,
},
} {
t.Run(test.name, func(t *testing.T) {
session := newSession(4, &test.filters)
log := Log{
Time: time.Now().UTC().Format(time.RFC3339),
Event: HTTP,
Level: Info,
Message: "test",
}
session.Insert(&log)
select {
case <-session.listener:
require.True(t, test.expectLog)
default:
require.False(t, test.expectLog)
}
})
}
}
// Validate that the session has a max amount of events to hold
func TestSession_InsertOverflow(t *testing.T) {
session := newSession(1, nil)
log := Log{
Time: time.Now().UTC().Format(time.RFC3339),
Event: HTTP,
Level: Info,
Message: "test",
}
// Insert 2 but only max channel size for 1
session.Insert(&log)
session.Insert(&log)
select {
case <-session.listener:
// pass
default:
require.Fail(t, "expected one log event")
}
// Second dequeue should fail
select {
case <-session.listener:
require.Fail(t, "expected no more remaining log events")
default:
// pass
}
}
type mockWriter struct {
event *Log
err error
@ -108,7 +207,7 @@ func TestParseZerologEvent_EventTypes(t *testing.T) {
}
// Invalid defaults to Cloudflared LogEventType
t.Run("Invalid", func(t *testing.T) {
t.Run("invalid", func(t *testing.T) {
defer func() { writer.err = nil }()
zlog.Info().Str(EventTypeKey, "unknown").Msg("test")
require.NoError(t, writer.err)

View File

@ -131,13 +131,13 @@ func (m *ManagementService) startStreaming(c *websocket.Conn, ctx context.Contex
return
}
// Expect the first incoming request
_, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
if !ok {
m.log.Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Msgf("expected start_streaming as first recieved event")
m.log.Warn().Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Msgf("expected start_streaming as first recieved event")
return
}
m.streaming.Store(true)
listener := m.logger.Listen()
listener := m.logger.Listen(startEvent.Filters)
m.log.Debug().Msgf("Streaming logs")
go m.streamLogs(c, ctx, listener)
}