From 991f01fe34af4cc04f6044bd50e38908c1354c23 Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Wed, 12 Apr 2023 14:41:11 -0700 Subject: [PATCH] TUN-7131: Add cloudflared log event to connection messages and enable streaming logs --- connection/control.go | 10 ++++--- connection/observer.go | 12 +++++++-- edgediscovery/allregions/discovery.go | 26 +++++++++++++----- edgediscovery/edgediscovery.go | 38 +++++++++++++++------------ features/features.go | 1 + supervisor/tunnel.go | 2 ++ 6 files changed, 60 insertions(+), 29 deletions(-) diff --git a/connection/control.go b/connection/control.go index 174db372..5cde0204 100644 --- a/connection/control.go +++ b/connection/control.go @@ -2,13 +2,13 @@ package connection import ( "context" - "fmt" "io" "net" "time" "github.com/rs/zerolog" + "github.com/cloudflare/cloudflared/management" tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -85,7 +85,7 @@ func (c *controlStream) ServeControlStream( return err } - c.observer.logServerInfo(c.connIndex, registrationDetails.Location, c.edgeAddress, fmt.Sprintf("Connection %s registered with protocol: %s", registrationDetails.UUID, c.protocol)) + c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol) c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location) c.connectedFuse.Connected() @@ -116,7 +116,11 @@ func (c *controlStream) waitForUnregister(ctx context.Context, rpcClient NamedTu c.observer.sendUnregisteringEvent(c.connIndex) rpcClient.GracefulShutdown(ctx, c.gracePeriod) - c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection") + c.observer.log.Info(). + Int(management.EventTypeKey, int(management.Cloudflared)). + Uint8(LogFieldConnIndex, c.connIndex). + IPAddr(LogFieldIPAddress, c.edgeAddress). + Msg("Unregistered tunnel connection") } func (c *controlStream) IsStopped() bool { diff --git a/connection/observer.go b/connection/observer.go index 7429b32c..c6cb895e 100644 --- a/connection/observer.go +++ b/connection/observer.go @@ -4,12 +4,17 @@ import ( "net" "strings" + "github.com/google/uuid" "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/management" ) const ( + LogFieldConnectionID = "connection" LogFieldLocation = "location" LogFieldIPAddress = "ip" + LogFieldProtocol = "protocol" observerChannelBufferSize = 16 ) @@ -41,13 +46,16 @@ func (o *Observer) RegisterSink(sink EventSink) { o.addSinkChan <- sink } -func (o *Observer) logServerInfo(connIndex uint8, location string, address net.IP, msg string) { +func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) { o.sendEvent(Event{Index: connIndex, EventType: Connected, Location: location}) o.log.Info(). + Int(management.EventTypeKey, int(management.Cloudflared)). + Str(LogFieldConnectionID, connectionID.String()). Uint8(LogFieldConnIndex, connIndex). Str(LogFieldLocation, location). IPAddr(LogFieldIPAddress, address). - Msg(msg) + Str(LogFieldProtocol, protocol.String()). + Msg("Registered tunnel connection") o.metrics.registerServerLocation(uint8ToString(connIndex), location) } diff --git a/edgediscovery/allregions/discovery.go b/edgediscovery/allregions/discovery.go index e6c6105e..cab06611 100644 --- a/edgediscovery/allregions/discovery.go +++ b/edgediscovery/allregions/discovery.go @@ -9,6 +9,8 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/management" ) const ( @@ -108,16 +110,20 @@ var friendlyDNSErrorLines = []string{ // EdgeDiscovery implements HA service discovery lookup. func edgeDiscovery(log *zerolog.Logger, srvService string) ([][]*EdgeAddr, error) { - log.Debug().Str("domain", "_"+srvService+"._"+srvProto+"."+srvName).Msg("looking up edge SRV record") + logger := log.With().Int(management.EventTypeKey, int(management.Cloudflared)).Logger() + logger.Debug(). + Int(management.EventTypeKey, int(management.Cloudflared)). + Str("domain", "_"+srvService+"._"+srvProto+"."+srvName). + Msg("edge discovery: looking up edge SRV record") _, addrs, err := netLookupSRV(srvService, srvProto, srvName) if err != nil { _, fallbackAddrs, fallbackErr := fallbackLookupSRV(srvService, srvProto, srvName) if fallbackErr != nil || len(fallbackAddrs) == 0 { // use the original DNS error `err` in messages, not `fallbackErr` - log.Err(err).Msg("Error looking up Cloudflare edge IPs: the DNS query failed") + logger.Err(err).Msg("edge discovery: error looking up Cloudflare edge IPs: the DNS query failed") for _, s := range friendlyDNSErrorLines { - log.Error().Msg(s) + logger.Error().Msg(s) } return nil, errors.Wrapf(err, "Could not lookup srv records on _%v._%v.%v", srvService, srvProto, srvName) } @@ -131,9 +137,13 @@ func edgeDiscovery(log *zerolog.Logger, srvService string) ([][]*EdgeAddr, error if err != nil { return nil, err } - for _, e := range edgeAddrs { - log.Debug().Msgf("Edge Address: %+v", *e) + logAddrs := make([]string, len(edgeAddrs)) + for i, e := range edgeAddrs { + logAddrs[i] = e.UDP.IP.String() } + logger.Debug(). + Strs("addresses", logAddrs). + Msg("edge discovery: resolved edge addresses") resolvedAddrPerCNAME = append(resolvedAddrPerCNAME, edgeAddrs) } @@ -188,13 +198,15 @@ func ResolveAddrs(addrs []string, log *zerolog.Logger) (resolved []*EdgeAddr) { for _, addr := range addrs { tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { - log.Error().Str(logFieldAddress, addr).Err(err).Msg("failed to resolve to TCP address") + log.Error().Int(management.EventTypeKey, int(management.Cloudflared)). + Str(logFieldAddress, addr).Err(err).Msg("edge discovery: failed to resolve to TCP address") continue } udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { - log.Error().Str(logFieldAddress, addr).Err(err).Msg("failed to resolve to UDP address") + log.Error().Int(management.EventTypeKey, int(management.Cloudflared)). + Str(logFieldAddress, addr).Err(err).Msg("edge discovery: failed to resolve to UDP address") continue } version := V6 diff --git a/edgediscovery/edgediscovery.go b/edgediscovery/edgediscovery.go index 88788b10..59091bc7 100644 --- a/edgediscovery/edgediscovery.go +++ b/edgediscovery/edgediscovery.go @@ -6,6 +6,7 @@ import ( "github.com/rs/zerolog" "github.com/cloudflare/cloudflared/edgediscovery/allregions" + "github.com/cloudflare/cloudflared/management" ) const ( @@ -74,33 +75,35 @@ func (ed *Edge) GetAddrForRPC() (*allregions.EdgeAddr, error) { // GetAddr gives this proxy connection an edge Addr. Prefer Addrs this connection has already used. func (ed *Edge) GetAddr(connIndex int) (*allregions.EdgeAddr, error) { - log := ed.log.With().Int(LogFieldConnIndex, connIndex).Logger() + log := ed.log.With(). + Int(LogFieldConnIndex, connIndex). + Int(management.EventTypeKey, int(management.Cloudflared)). + Logger() ed.Lock() defer ed.Unlock() // If this connection has already used an edge addr, return it. if addr := ed.regions.AddrUsedBy(connIndex); addr != nil { - log.Debug().Msg("edgediscovery - GetAddr: Returning same address back to proxy connection") + log.Debug().IPAddr(LogFieldIPAddress, addr.UDP.IP).Msg("edge discovery: returning same edge address back to pool") return addr, nil } // Otherwise, give it an unused one addr := ed.regions.GetUnusedAddr(nil, connIndex) if addr == nil { - log.Debug().Msg("edgediscovery - GetAddr: No addresses left to give proxy connection") + log.Debug().Msg("edge discovery: no addresses left in pool to give proxy connection") return nil, errNoAddressesLeft } - log = ed.log.With(). - Int(LogFieldConnIndex, connIndex). - IPAddr(LogFieldIPAddress, addr.UDP.IP).Logger() - log.Debug().Msgf("edgediscovery - GetAddr: Giving connection its new address") + log.Debug().IPAddr(LogFieldIPAddress, addr.UDP.IP).Msg("edge discovery: giving new address to connection") return addr, nil } // GetDifferentAddr gives back the proxy connection's edge Addr and uses a new one. func (ed *Edge) GetDifferentAddr(connIndex int, hasConnectivityError bool) (*allregions.EdgeAddr, error) { - log := ed.log.With().Int(LogFieldConnIndex, connIndex).Logger() - + log := ed.log.With(). + Int(LogFieldConnIndex, connIndex). + Int(management.EventTypeKey, int(management.Cloudflared)). + Logger() ed.Lock() defer ed.Unlock() @@ -110,14 +113,14 @@ func (ed *Edge) GetDifferentAddr(connIndex int, hasConnectivityError bool) (*all } addr := ed.regions.GetUnusedAddr(oldAddr, connIndex) if addr == nil { - log.Debug().Msg("edgediscovery - GetDifferentAddr: No addresses left to give proxy connection") + log.Debug().Msg("edge discovery: no addresses left in pool to give proxy connection") // note: if oldAddr were not nil, it will become available on the next iteration return nil, errNoAddressesLeft } - log = ed.log.With(). - Int(LogFieldConnIndex, connIndex). - IPAddr(LogFieldIPAddress, addr.UDP.IP).Logger() - log.Debug().Msgf("edgediscovery - GetDifferentAddr: Giving connection its new address from the address list: %v", ed.regions.AvailableAddrs()) + log.Debug(). + IPAddr(LogFieldIPAddress, addr.UDP.IP). + Int("available", ed.regions.AvailableAddrs()). + Msg("edge discovery: giving new address to connection") return addr, nil } @@ -133,8 +136,9 @@ func (ed *Edge) AvailableAddrs() int { func (ed *Edge) GiveBack(addr *allregions.EdgeAddr, hasConnectivityError bool) bool { ed.Lock() defer ed.Unlock() - log := ed.log.With(). - IPAddr(LogFieldIPAddress, addr.UDP.IP).Logger() - log.Debug().Msgf("edgediscovery - GiveBack: Address now unused") + ed.log.Debug(). + Int(management.EventTypeKey, int(management.Cloudflared)). + IPAddr(LogFieldIPAddress, addr.UDP.IP). + Msg("edge discovery: gave back address to the pool") return ed.regions.GiveBack(addr, hasConnectivityError) } diff --git a/features/features.go b/features/features.go index eb83c1fc..d6aaa6a6 100644 --- a/features/features.go +++ b/features/features.go @@ -16,6 +16,7 @@ var ( FeatureSerializedHeaders, FeatureDatagramV2, FeatureQUICSupportEOF, + FeatureManagementLogs, } ) diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 320c3ac9..1b2e132f 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -22,6 +22,7 @@ import ( "github.com/cloudflare/cloudflared/features" "github.com/cloudflare/cloudflared/h2mux" "github.com/cloudflare/cloudflared/ingress" + "github.com/cloudflare/cloudflared/management" "github.com/cloudflare/cloudflared/orchestration" quicpogs "github.com/cloudflare/cloudflared/quic" "github.com/cloudflare/cloudflared/retry" @@ -238,6 +239,7 @@ func (e *EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, protocolF } logger := e.config.Log.With(). + Int(management.EventTypeKey, int(management.Cloudflared)). IPAddr(connection.LogFieldIPAddress, addr.UDP.IP). Uint8(connection.LogFieldConnIndex, connIndex). Logger()