From 39ed5dc18248605220c2553fa062081b1e1f3a7e Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Thu, 30 Mar 2023 14:12:00 -0700 Subject: [PATCH] TUN-7126: Add Management logger io.Writer --- cmd/cloudflared/tunnel/cmd.go | 2 +- logger/create.go | 18 ++++ management/events.go | 40 ++++++++ management/logger.go | 142 +++++++++++++++++++++++++++++ management/logger_test.go | 75 +++++++++++++++ management/service.go | 18 +++- orchestration/orchestrator_test.go | 2 +- 7 files changed, 290 insertions(+), 7 deletions(-) create mode 100644 management/events.go create mode 100644 management/logger.go create mode 100644 management/logger_test.go diff --git a/cmd/cloudflared/tunnel/cmd.go b/cmd/cloudflared/tunnel/cmd.go index b1f572c3..fc337dd1 100644 --- a/cmd/cloudflared/tunnel/cmd.go +++ b/cmd/cloudflared/tunnel/cmd.go @@ -401,7 +401,7 @@ func StartServer( localRules := []ingress.Rule{} if features.Contains(features.FeatureManagementLogs) { - mgmt := management.New(c.String("management-hostname")) + mgmt := management.New(c.String("management-hostname"), logger.ManagementLogger.Log, logger.ManagementLogger) localRules = []ingress.Rule{ingress.NewManagementRule(mgmt)} } orchestrator, err := orchestration.NewOrchestrator(ctx, orchestratorConfig, tunnelConfig.Tags, localRules, tunnelConfig.Log) diff --git a/logger/create.go b/logger/create.go index 256ff4c7..8b8fed23 100644 --- a/logger/create.go +++ b/logger/create.go @@ -15,6 +15,9 @@ import ( "github.com/urfave/cli/v2" "golang.org/x/term" "gopkg.in/natefinch/lumberjack.v2" + + "github.com/cloudflare/cloudflared/features" + "github.com/cloudflare/cloudflared/management" ) const ( @@ -35,8 +38,19 @@ const ( consoleTimeFormat = time.RFC3339 ) +var ( + ManagementLogger *management.Logger +) + func init() { + zerolog.TimeFieldFormat = time.RFC3339 zerolog.TimestampFunc = utcNow + + if features.Contains(features.FeatureManagementLogs) { + // Management logger needs to be initialized before any of the other loggers as to not capture + // it's own logging events. + ManagementLogger = management.NewLogger() + } } func utcNow() time.Time { @@ -91,6 +105,10 @@ func newZerolog(loggerConfig *Config) *zerolog.Logger { writers = append(writers, rollingLogger) } + if features.Contains(features.FeatureManagementLogs) { + writers = append(writers, ManagementLogger) + } + multi := resilientMultiWriter{writers} level, levelErr := zerolog.ParseLevel(loggerConfig.MinLevel) diff --git a/management/events.go b/management/events.go new file mode 100644 index 00000000..c9d153bc --- /dev/null +++ b/management/events.go @@ -0,0 +1,40 @@ +package management + +// LogEventType is the way that logging messages are able to be filtered. +// Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only +// the Cloudflared-related events. +type LogEventType int + +const ( + Cloudflared LogEventType = 0 + HTTP LogEventType = 1 + TCP LogEventType = 2 + UDP LogEventType = 3 +) + +func (l LogEventType) String() string { + switch l { + case Cloudflared: + return "cloudflared" + case HTTP: + return "http" + case TCP: + return "tcp" + case UDP: + return "udp" + default: + return "" + } +} + +// LogLevel corresponds to the zerolog logging levels +// "panic", "fatal", and "trace" are exempt from this list as they are rarely used and, at least +// the the first two are limited to failure conditions that lead to cloudflared shutting down. +type LogLevel string + +const ( + Debug LogLevel = "debug" + Info LogLevel = "info" + Warn LogLevel = "warn" + Error LogLevel = "error" +) diff --git a/management/logger.go b/management/logger.go new file mode 100644 index 00000000..3af494a4 --- /dev/null +++ b/management/logger.go @@ -0,0 +1,142 @@ +package management + +import ( + "os" + "sync" + "time" + + jsoniter "github.com/json-iterator/go" + "github.com/rs/zerolog" +) + +var json = jsoniter.ConfigFastest + +const ( + // Indicates how many log messages the listener will hold before dropping. + // Provides a throttling mechanism to drop latest messages if the sender + // can't keep up with the influx of log messages. + logWindow = 30 +) + +// ZeroLogEvent is the json structure that zerolog stores it's events as +type ZeroLogEvent struct { + Time string `json:"time,omitempty"` + Level LogLevel `json:"level,omitempty"` + Type LogEventType `json:"type,omitempty"` + Message string `json:"message,omitempty"` +} + +// Logger manages the number of management streaming log sessions +type Logger struct { + sessions []*Session + mu sync.RWMutex + + // Unique logger that isn't a io.Writer of the list of zerolog writers. This helps prevent management log + // statements from creating infinite recursion to export messages to a session and allows basic debugging and + // error statements to be issued in the management code itself. + Log *zerolog.Logger +} + +func NewLogger() *Logger { + log := zerolog.New(zerolog.ConsoleWriter{ + Out: os.Stdout, + TimeFormat: time.RFC3339, + }).With().Timestamp().Logger().Level(zerolog.DebugLevel) + return &Logger{ + Log: &log, + } +} + +type LoggerListener interface { + Listen() *Session + Close(*Session) +} + +type Session struct { + // Buffered channel that holds the recent log events + listener chan *ZeroLogEvent + // Types of log events that this session will provide through the listener + filters []LogEventType +} + +func newListener(size int) *Session { + return &Session{ + listener: make(chan *ZeroLogEvent, size), + filters: []LogEventType{}, + } +} + +// Listen creates a new Session that will append filtered log events as they are created. +func (l *Logger) Listen() *Session { + l.mu.Lock() + defer l.mu.Unlock() + listener := newListener(logWindow) + l.sessions = append(l.sessions, listener) + return listener +} + +// Close will remove a Session from the available sessions that were receiving log events. +func (l *Logger) Close(session *Session) { + l.mu.Lock() + defer l.mu.Unlock() + index := -1 + for i, v := range l.sessions { + if v == session { + index = i + break + } + } + if index == -1 { + // Not found + return + } + copy(l.sessions[index:], l.sessions[index+1:]) + l.sessions = l.sessions[:len(l.sessions)-1] +} + +// Write will write the log event to all sessions that have available capacity. For those that are full, the message +// will be dropped. +// This function is the interface that zerolog expects to call when a log event is to be written out. +func (l *Logger) Write(p []byte) (int, error) { + l.mu.RLock() + defer l.mu.RUnlock() + // return early if no active sessions + if len(l.sessions) == 0 { + return len(p), nil + } + var event ZeroLogEvent + iter := json.BorrowIterator(p) + defer json.ReturnIterator(iter) + iter.ReadVal(&event) + if iter.Error != nil { + l.Log.Debug().Msg("unable to unmarshal log event") + return len(p), nil + } + for _, listener := range l.sessions { + // no filters means all types are allowed + if len(listener.filters) != 0 { + valid := false + // make sure listener is subscribed to this event type + for _, t := range listener.filters { + if t == event.Type { + valid = true + break + } + } + if !valid { + continue + } + } + + select { + case listener.listener <- &event: + default: + // buffer is full, discard + } + } + return len(p), nil +} + +func (l *Logger) WriteLevel(level zerolog.Level, p []byte) (n int, err error) { + return l.Write(p) +} diff --git a/management/logger_test.go b/management/logger_test.go new file mode 100644 index 00000000..2ba85364 --- /dev/null +++ b/management/logger_test.go @@ -0,0 +1,75 @@ +package management + +import ( + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" +) + +// No listening sessions will not write to the channel +func TestLoggerWrite_NoSessions(t *testing.T) { + logger := NewLogger() + zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel) + + zlog.Info().Msg("hello") +} + +// Validate that the session receives the event +func TestLoggerWrite_OneSession(t *testing.T) { + logger := NewLogger() + zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel) + + session := logger.Listen() + defer logger.Close(session) + zlog.Info().Int("type", int(HTTP)).Msg("hello") + select { + case event := <-session.listener: + assert.Equal(t, "hello", event.Message) + assert.Equal(t, LogLevel("info"), event.Level) + assert.Equal(t, HTTP, event.Type) + default: + assert.Fail(t, "expected an event to be in the listener") + } +} + +// Validate all sessions receive the same event +func TestLoggerWrite_MultipleSessions(t *testing.T) { + logger := NewLogger() + zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel) + + session1 := logger.Listen() + defer logger.Close(session1) + session2 := logger.Listen() + zlog.Info().Int("type", int(HTTP)).Msg("hello") + for _, session := range []*Session{session1, session2} { + select { + case event := <-session.listener: + assert.Equal(t, "hello", event.Message) + assert.Equal(t, LogLevel("info"), event.Level) + assert.Equal(t, HTTP, event.Type) + default: + assert.Fail(t, "expected an event to be in the listener") + } + } + + // Close session2 and make sure session1 still receives events + logger.Close(session2) + zlog.Info().Int("type", int(HTTP)).Msg("hello2") + select { + case event := <-session1.listener: + assert.Equal(t, "hello2", event.Message) + assert.Equal(t, LogLevel("info"), event.Level) + assert.Equal(t, HTTP, event.Type) + default: + assert.Fail(t, "expected an event to be in the listener") + } + + // Make sure a held reference to session2 doesn't receive events after being closed + select { + case <-session2.listener: + assert.Fail(t, "An event was not expected to be in the session listener") + default: + // pass + } +} diff --git a/management/service.go b/management/service.go index 66e8faef..c2a74ecb 100644 --- a/management/service.go +++ b/management/service.go @@ -4,28 +4,36 @@ import ( "net/http" "github.com/go-chi/chi/v5" + "github.com/rs/zerolog" ) type ManagementService struct { // The management tunnel hostname Hostname string + log *zerolog.Logger router chi.Router + logger LoggerListener } -func New(managementHostname string) *ManagementService { +func New(managementHostname string, log *zerolog.Logger, logger LoggerListener) *ManagementService { + s := &ManagementService{ + Hostname: managementHostname, + log: log, + logger: logger, + } r := chi.NewRouter() r.Get("/ping", ping) - return &ManagementService{ - Hostname: managementHostname, - router: r, - } + r.Head("/ping", ping) + s.router = r + return s } func (m *ManagementService) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.router.ServeHTTP(w, r) } +// Management Ping handler func ping(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) } diff --git a/orchestration/orchestrator_test.go b/orchestration/orchestrator_test.go index 96fbb83d..b95f2646 100644 --- a/orchestration/orchestrator_test.go +++ b/orchestration/orchestrator_test.go @@ -50,7 +50,7 @@ func TestUpdateConfiguration(t *testing.T) { initConfig := &Config{ Ingress: &ingress.Ingress{}, } - orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{ingress.NewManagementRule(management.New("management.argotunnel.com"))}, &testLogger) + orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{ingress.NewManagementRule(management.New("management.argotunnel.com", &testLogger, nil))}, &testLogger) require.NoError(t, err) initOriginProxy, err := orchestrator.GetOriginProxy() require.NoError(t, err)