TUN-7130: Categorize UDP logs for streaming logs
This commit is contained in:
parent
7a014b06ec
commit
0b5b9b8297
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
"github.com/cloudflare/cloudflared/datagramsession"
|
"github.com/cloudflare/cloudflared/datagramsession"
|
||||||
"github.com/cloudflare/cloudflared/ingress"
|
"github.com/cloudflare/cloudflared/ingress"
|
||||||
|
"github.com/cloudflare/cloudflared/management"
|
||||||
"github.com/cloudflare/cloudflared/packet"
|
"github.com/cloudflare/cloudflared/packet"
|
||||||
quicpogs "github.com/cloudflare/cloudflared/quic"
|
quicpogs "github.com/cloudflare/cloudflared/quic"
|
||||||
"github.com/cloudflare/cloudflared/tracing"
|
"github.com/cloudflare/cloudflared/tracing"
|
||||||
|
@ -298,11 +299,12 @@ func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.
|
||||||
attribute.String("session-id", sessionID.String()),
|
attribute.String("session-id", sessionID.String()),
|
||||||
attribute.String("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)),
|
attribute.String("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)),
|
||||||
))
|
))
|
||||||
|
log := q.logger.With().Int(management.EventTypeKey, int(management.UDP)).Logger()
|
||||||
// Each session is a series of datagram from an eyeball to a dstIP:dstPort.
|
// Each session is a series of datagram from an eyeball to a dstIP:dstPort.
|
||||||
// (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
|
// (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
|
||||||
originProxy, err := ingress.DialUDP(dstIP, dstPort)
|
originProxy, err := ingress.DialUDP(dstIP, dstPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
q.logger.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
|
log.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
|
||||||
tracing.EndWithErrorStatus(registerSpan, err)
|
tracing.EndWithErrorStatus(registerSpan, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -313,14 +315,18 @@ func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.
|
||||||
|
|
||||||
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
|
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
q.logger.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
|
log.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
|
||||||
tracing.EndWithErrorStatus(registerSpan, err)
|
tracing.EndWithErrorStatus(registerSpan, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
go q.serveUDPSession(session, closeAfterIdleHint)
|
go q.serveUDPSession(session, closeAfterIdleHint)
|
||||||
|
|
||||||
q.logger.Debug().Str("sessionID", sessionID.String()).Str("src", originProxy.LocalAddr().String()).Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).Msgf("Registered session")
|
log.Debug().
|
||||||
|
Str("sessionID", sessionID.String()).
|
||||||
|
Str("src", originProxy.LocalAddr().String()).
|
||||||
|
Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).
|
||||||
|
Msgf("Registered session")
|
||||||
tracing.End(registerSpan)
|
tracing.End(registerSpan)
|
||||||
|
|
||||||
resp := tunnelpogs.RegisterUdpSessionResponse{
|
resp := tunnelpogs.RegisterUdpSessionResponse{
|
||||||
|
@ -341,7 +347,10 @@ func (q *QUICConnection) serveUDPSession(session *datagramsession.Session, close
|
||||||
q.closeUDPSession(ctx, session.ID, "terminated without error")
|
q.closeUDPSession(ctx, session.ID, "terminated without error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
q.logger.Debug().Err(err).Str("sessionID", session.ID.String()).Msg("Session terminated")
|
q.logger.Debug().Err(err).
|
||||||
|
Int(management.EventTypeKey, int(management.UDP)).
|
||||||
|
Str("sessionID", session.ID.String()).
|
||||||
|
Msg("Session terminated")
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeUDPSession first unregisters the session from session manager, then it tries to unregister from edge
|
// closeUDPSession first unregisters the session from session manager, then it tries to unregister from edge
|
||||||
|
@ -351,7 +360,9 @@ func (q *QUICConnection) closeUDPSession(ctx context.Context, sessionID uuid.UUI
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Log this at debug because this is not an error if session was closed due to lost connection
|
// Log this at debug because this is not an error if session was closed due to lost connection
|
||||||
// with edge
|
// with edge
|
||||||
q.logger.Debug().Err(err).Str("sessionID", sessionID.String()).
|
q.logger.Debug().Err(err).
|
||||||
|
Int(management.EventTypeKey, int(management.UDP)).
|
||||||
|
Str("sessionID", sessionID.String()).
|
||||||
Msgf("Failed to open quic stream to unregister udp session with edge")
|
Msgf("Failed to open quic stream to unregister udp session with edge")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
|
|
||||||
|
"github.com/cloudflare/cloudflared/management"
|
||||||
"github.com/cloudflare/cloudflared/packet"
|
"github.com/cloudflare/cloudflared/packet"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -120,7 +121,9 @@ func (m *manager) registerSession(ctx context.Context, registration *registerSes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) newSession(id uuid.UUID, dstConn io.ReadWriteCloser) *Session {
|
func (m *manager) newSession(id uuid.UUID, dstConn io.ReadWriteCloser) *Session {
|
||||||
logger := m.log.With().Str("sessionID", id.String()).Logger()
|
logger := m.log.With().
|
||||||
|
Int(management.EventTypeKey, int(management.UDP)).
|
||||||
|
Str("sessionID", id.String()).Logger()
|
||||||
return &Session{
|
return &Session{
|
||||||
ID: id,
|
ID: id,
|
||||||
sendFunc: m.sendFunc,
|
sendFunc: m.sendFunc,
|
||||||
|
|
Loading…
Reference in New Issue