parent
16ecf60800
commit
0eddb8a615
|
@ -124,7 +124,7 @@ func (q *datagramV2Connection) RegisterUdpSession(ctx context.Context, sessionID
|
||||||
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
|
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
originProxy.Close()
|
originProxy.Close()
|
||||||
log.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
|
log.Err(err).Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).Msgf("Failed to register udp session")
|
||||||
tracing.EndWithErrorStatus(registerSpan, err)
|
tracing.EndWithErrorStatus(registerSpan, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -132,7 +132,7 @@ func (q *datagramV2Connection) RegisterUdpSession(ctx context.Context, sessionID
|
||||||
go q.serveUDPSession(session, closeAfterIdleHint)
|
go q.serveUDPSession(session, closeAfterIdleHint)
|
||||||
|
|
||||||
log.Debug().
|
log.Debug().
|
||||||
Str("sessionID", sessionID.String()).
|
Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
|
||||||
Str("src", originProxy.LocalAddr().String()).
|
Str("src", originProxy.LocalAddr().String()).
|
||||||
Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).
|
Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).
|
||||||
Msgf("Registered session")
|
Msgf("Registered session")
|
||||||
|
@ -163,7 +163,7 @@ func (q *datagramV2Connection) serveUDPSession(session *datagramsession.Session,
|
||||||
}
|
}
|
||||||
q.logger.Debug().Err(err).
|
q.logger.Debug().Err(err).
|
||||||
Int(management.EventTypeKey, int(management.UDP)).
|
Int(management.EventTypeKey, int(management.UDP)).
|
||||||
Str("sessionID", session.ID.String()).
|
Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(session.ID)).
|
||||||
Msg("Session terminated")
|
Msg("Session terminated")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,7 +176,7 @@ func (q *datagramV2Connection) closeUDPSession(ctx context.Context, sessionID uu
|
||||||
// with edge
|
// with edge
|
||||||
q.logger.Debug().Err(err).
|
q.logger.Debug().Err(err).
|
||||||
Int(management.EventTypeKey, int(management.UDP)).
|
Int(management.EventTypeKey, int(management.UDP)).
|
||||||
Str("sessionID", sessionID.String()).
|
Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
|
||||||
Msgf("Failed to open quic stream to unregister udp session with edge")
|
Msgf("Failed to open quic stream to unregister udp session with edge")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -187,14 +187,14 @@ func (q *datagramV2Connection) closeUDPSession(ctx context.Context, sessionID uu
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Log this at debug because this is not an error if session was closed due to lost connection
|
// Log this at debug because this is not an error if session was closed due to lost connection
|
||||||
// with edge
|
// with edge
|
||||||
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
q.logger.Err(err).Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
|
||||||
Msgf("Failed to open rpc stream to unregister udp session with edge")
|
Msgf("Failed to open rpc stream to unregister udp session with edge")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer rpcClientStream.Close()
|
defer rpcClientStream.Close()
|
||||||
|
|
||||||
if err := rpcClientStream.UnregisterUdpSession(ctx, sessionID, message); err != nil {
|
if err := rpcClientStream.UnregisterUdpSession(ctx, sessionID, message); err != nil {
|
||||||
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
q.logger.Err(err).Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
|
||||||
Msgf("Failed to unregister udp session with edge")
|
Msgf("Failed to unregister udp session with edge")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
@ -20,8 +21,15 @@ const (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errSessionManagerClosed = fmt.Errorf("session manager closed")
|
errSessionManagerClosed = fmt.Errorf("session manager closed")
|
||||||
|
LogFieldSessionID = "sessionID"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func FormatSessionID(sessionID uuid.UUID) string {
|
||||||
|
sessionIDStr := sessionID.String()
|
||||||
|
sessionIDStr = strings.ReplaceAll(sessionIDStr, "-", "")
|
||||||
|
return sessionIDStr
|
||||||
|
}
|
||||||
|
|
||||||
// Manager defines the APIs to manage sessions from the same transport.
|
// Manager defines the APIs to manage sessions from the same transport.
|
||||||
type Manager interface {
|
type Manager interface {
|
||||||
// Serve starts the event loop
|
// Serve starts the event loop
|
||||||
|
@ -127,7 +135,7 @@ func (m *manager) registerSession(ctx context.Context, registration *registerSes
|
||||||
func (m *manager) newSession(id uuid.UUID, dstConn io.ReadWriteCloser) *Session {
|
func (m *manager) newSession(id uuid.UUID, dstConn io.ReadWriteCloser) *Session {
|
||||||
logger := m.log.With().
|
logger := m.log.With().
|
||||||
Int(management.EventTypeKey, int(management.UDP)).
|
Int(management.EventTypeKey, int(management.UDP)).
|
||||||
Str("sessionID", id.String()).Logger()
|
Str(LogFieldSessionID, FormatSessionID(id)).Logger()
|
||||||
return &Session{
|
return &Session{
|
||||||
ID: id,
|
ID: id,
|
||||||
sendFunc: m.sendFunc,
|
sendFunc: m.sendFunc,
|
||||||
|
@ -174,7 +182,7 @@ func (m *manager) unregisterSession(unregistration *unregisterSessionEvent) {
|
||||||
func (m *manager) sendToSession(datagram *packet.Session) {
|
func (m *manager) sendToSession(datagram *packet.Session) {
|
||||||
session, ok := m.sessions[datagram.ID]
|
session, ok := m.sessions[datagram.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
m.log.Error().Str("sessionID", datagram.ID.String()).Msg("session not found")
|
m.log.Error().Str(LogFieldSessionID, FormatSessionID(datagram.ID)).Msg("session not found")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// session writes to destination over a connected UDP socket, which should not be blocking, so this call doesn't
|
// session writes to destination over a connected UDP socket, which should not be blocking, so this call doesn't
|
||||||
|
|
|
@ -16,11 +16,14 @@ const (
|
||||||
logFieldLBProbe = "lbProbe"
|
logFieldLBProbe = "lbProbe"
|
||||||
logFieldRule = "ingressRule"
|
logFieldRule = "ingressRule"
|
||||||
logFieldOriginService = "originService"
|
logFieldOriginService = "originService"
|
||||||
logFieldFlowID = "flowID"
|
|
||||||
logFieldConnIndex = "connIndex"
|
logFieldConnIndex = "connIndex"
|
||||||
logFieldDestAddr = "destAddr"
|
logFieldDestAddr = "destAddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
LogFieldFlowID = "flowID"
|
||||||
|
)
|
||||||
|
|
||||||
// newHTTPLogger creates a child zerolog.Logger from the provided with added context from the HTTP request, ingress
|
// newHTTPLogger creates a child zerolog.Logger from the provided with added context from the HTTP request, ingress
|
||||||
// services, and connection index.
|
// services, and connection index.
|
||||||
func newHTTPLogger(logger *zerolog.Logger, connIndex uint8, req *http.Request, rule int, serviceName string) zerolog.Logger {
|
func newHTTPLogger(logger *zerolog.Logger, connIndex uint8, req *http.Request, rule int, serviceName string) zerolog.Logger {
|
||||||
|
@ -47,7 +50,7 @@ func newTCPLogger(logger *zerolog.Logger, req *connection.TCPRequest) zerolog.Lo
|
||||||
Int(management.EventTypeKey, int(management.TCP)).
|
Int(management.EventTypeKey, int(management.TCP)).
|
||||||
Uint8(logFieldConnIndex, req.ConnIndex).
|
Uint8(logFieldConnIndex, req.ConnIndex).
|
||||||
Str(logFieldOriginService, ingress.ServiceWarpRouting).
|
Str(logFieldOriginService, ingress.ServiceWarpRouting).
|
||||||
Str(logFieldFlowID, req.FlowID).
|
Str(LogFieldFlowID, req.FlowID).
|
||||||
Str(logFieldDestAddr, req.Dest).
|
Str(logFieldDestAddr, req.Dest).
|
||||||
Uint8(logFieldConnIndex, req.ConnIndex).
|
Uint8(logFieldConnIndex, req.ConnIndex).
|
||||||
Logger()
|
Logger()
|
||||||
|
|
Loading…
Reference in New Issue