TUN-7126: Add Management logger io.Writer

This commit is contained in:
Devin Carr 2023-03-30 14:12:00 -07:00
parent bbc8d9431b
commit 39ed5dc182
7 changed files with 290 additions and 7 deletions

View File

@ -401,7 +401,7 @@ func StartServer(
localRules := []ingress.Rule{} localRules := []ingress.Rule{}
if features.Contains(features.FeatureManagementLogs) { if features.Contains(features.FeatureManagementLogs) {
mgmt := management.New(c.String("management-hostname")) mgmt := management.New(c.String("management-hostname"), logger.ManagementLogger.Log, logger.ManagementLogger)
localRules = []ingress.Rule{ingress.NewManagementRule(mgmt)} localRules = []ingress.Rule{ingress.NewManagementRule(mgmt)}
} }
orchestrator, err := orchestration.NewOrchestrator(ctx, orchestratorConfig, tunnelConfig.Tags, localRules, tunnelConfig.Log) orchestrator, err := orchestration.NewOrchestrator(ctx, orchestratorConfig, tunnelConfig.Tags, localRules, tunnelConfig.Log)

View File

@ -15,6 +15,9 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/term" "golang.org/x/term"
"gopkg.in/natefinch/lumberjack.v2" "gopkg.in/natefinch/lumberjack.v2"
"github.com/cloudflare/cloudflared/features"
"github.com/cloudflare/cloudflared/management"
) )
const ( const (
@ -35,8 +38,19 @@ const (
consoleTimeFormat = time.RFC3339 consoleTimeFormat = time.RFC3339
) )
var (
ManagementLogger *management.Logger
)
func init() { func init() {
zerolog.TimeFieldFormat = time.RFC3339
zerolog.TimestampFunc = utcNow zerolog.TimestampFunc = utcNow
if features.Contains(features.FeatureManagementLogs) {
// Management logger needs to be initialized before any of the other loggers as to not capture
// it's own logging events.
ManagementLogger = management.NewLogger()
}
} }
func utcNow() time.Time { func utcNow() time.Time {
@ -91,6 +105,10 @@ func newZerolog(loggerConfig *Config) *zerolog.Logger {
writers = append(writers, rollingLogger) writers = append(writers, rollingLogger)
} }
if features.Contains(features.FeatureManagementLogs) {
writers = append(writers, ManagementLogger)
}
multi := resilientMultiWriter{writers} multi := resilientMultiWriter{writers}
level, levelErr := zerolog.ParseLevel(loggerConfig.MinLevel) level, levelErr := zerolog.ParseLevel(loggerConfig.MinLevel)

40
management/events.go Normal file
View File

@ -0,0 +1,40 @@
package management
// LogEventType is the way that logging messages are able to be filtered.
// Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only
// the Cloudflared-related events.
type LogEventType int
const (
Cloudflared LogEventType = 0
HTTP LogEventType = 1
TCP LogEventType = 2
UDP LogEventType = 3
)
func (l LogEventType) String() string {
switch l {
case Cloudflared:
return "cloudflared"
case HTTP:
return "http"
case TCP:
return "tcp"
case UDP:
return "udp"
default:
return ""
}
}
// LogLevel corresponds to the zerolog logging levels
// "panic", "fatal", and "trace" are exempt from this list as they are rarely used and, at least
// the the first two are limited to failure conditions that lead to cloudflared shutting down.
type LogLevel string
const (
Debug LogLevel = "debug"
Info LogLevel = "info"
Warn LogLevel = "warn"
Error LogLevel = "error"
)

142
management/logger.go Normal file
View File

@ -0,0 +1,142 @@
package management
import (
"os"
"sync"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/rs/zerolog"
)
var json = jsoniter.ConfigFastest
const (
// Indicates how many log messages the listener will hold before dropping.
// Provides a throttling mechanism to drop latest messages if the sender
// can't keep up with the influx of log messages.
logWindow = 30
)
// ZeroLogEvent is the json structure that zerolog stores it's events as
type ZeroLogEvent struct {
Time string `json:"time,omitempty"`
Level LogLevel `json:"level,omitempty"`
Type LogEventType `json:"type,omitempty"`
Message string `json:"message,omitempty"`
}
// Logger manages the number of management streaming log sessions
type Logger struct {
sessions []*Session
mu sync.RWMutex
// Unique logger that isn't a io.Writer of the list of zerolog writers. This helps prevent management log
// statements from creating infinite recursion to export messages to a session and allows basic debugging and
// error statements to be issued in the management code itself.
Log *zerolog.Logger
}
func NewLogger() *Logger {
log := zerolog.New(zerolog.ConsoleWriter{
Out: os.Stdout,
TimeFormat: time.RFC3339,
}).With().Timestamp().Logger().Level(zerolog.DebugLevel)
return &Logger{
Log: &log,
}
}
type LoggerListener interface {
Listen() *Session
Close(*Session)
}
type Session struct {
// Buffered channel that holds the recent log events
listener chan *ZeroLogEvent
// Types of log events that this session will provide through the listener
filters []LogEventType
}
func newListener(size int) *Session {
return &Session{
listener: make(chan *ZeroLogEvent, size),
filters: []LogEventType{},
}
}
// Listen creates a new Session that will append filtered log events as they are created.
func (l *Logger) Listen() *Session {
l.mu.Lock()
defer l.mu.Unlock()
listener := newListener(logWindow)
l.sessions = append(l.sessions, listener)
return listener
}
// Close will remove a Session from the available sessions that were receiving log events.
func (l *Logger) Close(session *Session) {
l.mu.Lock()
defer l.mu.Unlock()
index := -1
for i, v := range l.sessions {
if v == session {
index = i
break
}
}
if index == -1 {
// Not found
return
}
copy(l.sessions[index:], l.sessions[index+1:])
l.sessions = l.sessions[:len(l.sessions)-1]
}
// Write will write the log event to all sessions that have available capacity. For those that are full, the message
// will be dropped.
// This function is the interface that zerolog expects to call when a log event is to be written out.
func (l *Logger) Write(p []byte) (int, error) {
l.mu.RLock()
defer l.mu.RUnlock()
// return early if no active sessions
if len(l.sessions) == 0 {
return len(p), nil
}
var event ZeroLogEvent
iter := json.BorrowIterator(p)
defer json.ReturnIterator(iter)
iter.ReadVal(&event)
if iter.Error != nil {
l.Log.Debug().Msg("unable to unmarshal log event")
return len(p), nil
}
for _, listener := range l.sessions {
// no filters means all types are allowed
if len(listener.filters) != 0 {
valid := false
// make sure listener is subscribed to this event type
for _, t := range listener.filters {
if t == event.Type {
valid = true
break
}
}
if !valid {
continue
}
}
select {
case listener.listener <- &event:
default:
// buffer is full, discard
}
}
return len(p), nil
}
func (l *Logger) WriteLevel(level zerolog.Level, p []byte) (n int, err error) {
return l.Write(p)
}

75
management/logger_test.go Normal file
View File

@ -0,0 +1,75 @@
package management
import (
"testing"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
// No listening sessions will not write to the channel
func TestLoggerWrite_NoSessions(t *testing.T) {
logger := NewLogger()
zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel)
zlog.Info().Msg("hello")
}
// Validate that the session receives the event
func TestLoggerWrite_OneSession(t *testing.T) {
logger := NewLogger()
zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel)
session := logger.Listen()
defer logger.Close(session)
zlog.Info().Int("type", int(HTTP)).Msg("hello")
select {
case event := <-session.listener:
assert.Equal(t, "hello", event.Message)
assert.Equal(t, LogLevel("info"), event.Level)
assert.Equal(t, HTTP, event.Type)
default:
assert.Fail(t, "expected an event to be in the listener")
}
}
// Validate all sessions receive the same event
func TestLoggerWrite_MultipleSessions(t *testing.T) {
logger := NewLogger()
zlog := zerolog.New(logger).With().Timestamp().Logger().Level(zerolog.InfoLevel)
session1 := logger.Listen()
defer logger.Close(session1)
session2 := logger.Listen()
zlog.Info().Int("type", int(HTTP)).Msg("hello")
for _, session := range []*Session{session1, session2} {
select {
case event := <-session.listener:
assert.Equal(t, "hello", event.Message)
assert.Equal(t, LogLevel("info"), event.Level)
assert.Equal(t, HTTP, event.Type)
default:
assert.Fail(t, "expected an event to be in the listener")
}
}
// Close session2 and make sure session1 still receives events
logger.Close(session2)
zlog.Info().Int("type", int(HTTP)).Msg("hello2")
select {
case event := <-session1.listener:
assert.Equal(t, "hello2", event.Message)
assert.Equal(t, LogLevel("info"), event.Level)
assert.Equal(t, HTTP, event.Type)
default:
assert.Fail(t, "expected an event to be in the listener")
}
// Make sure a held reference to session2 doesn't receive events after being closed
select {
case <-session2.listener:
assert.Fail(t, "An event was not expected to be in the session listener")
default:
// pass
}
}

View File

@ -4,28 +4,36 @@ import (
"net/http" "net/http"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/rs/zerolog"
) )
type ManagementService struct { type ManagementService struct {
// The management tunnel hostname // The management tunnel hostname
Hostname string Hostname string
log *zerolog.Logger
router chi.Router router chi.Router
logger LoggerListener
} }
func New(managementHostname string) *ManagementService { func New(managementHostname string, log *zerolog.Logger, logger LoggerListener) *ManagementService {
s := &ManagementService{
Hostname: managementHostname,
log: log,
logger: logger,
}
r := chi.NewRouter() r := chi.NewRouter()
r.Get("/ping", ping) r.Get("/ping", ping)
return &ManagementService{ r.Head("/ping", ping)
Hostname: managementHostname, s.router = r
router: r, return s
}
} }
func (m *ManagementService) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (m *ManagementService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.router.ServeHTTP(w, r) m.router.ServeHTTP(w, r)
} }
// Management Ping handler
func ping(w http.ResponseWriter, r *http.Request) { func ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200) w.WriteHeader(200)
} }

View File

@ -50,7 +50,7 @@ func TestUpdateConfiguration(t *testing.T) {
initConfig := &Config{ initConfig := &Config{
Ingress: &ingress.Ingress{}, Ingress: &ingress.Ingress{},
} }
orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{ingress.NewManagementRule(management.New("management.argotunnel.com"))}, &testLogger) orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{ingress.NewManagementRule(management.New("management.argotunnel.com", &testLogger, nil))}, &testLogger)
require.NoError(t, err) require.NoError(t, err)
initOriginProxy, err := orchestrator.GetOriginProxy() initOriginProxy, err := orchestrator.GetOriginProxy()
require.NoError(t, err) require.NoError(t, err)