diff --git a/connection/quic_datagram_v3.go b/connection/quic_datagram_v3.go new file mode 100644 index 00000000..3921d66c --- /dev/null +++ b/connection/quic_datagram_v3.go @@ -0,0 +1,48 @@ +package connection + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/google/uuid" + "github.com/quic-go/quic-go" + "github.com/rs/zerolog" + + cfdquic "github.com/cloudflare/cloudflared/quic/v3" + "github.com/cloudflare/cloudflared/tunnelrpc/pogs" +) + +type datagramV3Connection struct { + conn quic.Connection + // datagramMuxer mux/demux datagrams from quic connection + datagramMuxer cfdquic.DatagramConn + logger *zerolog.Logger +} + +func NewDatagramV3Connection(ctx context.Context, + conn quic.Connection, + sessionManager cfdquic.SessionManager, + logger *zerolog.Logger, +) DatagramSessionHandler { + datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, logger) + + return &datagramV3Connection{ + conn, + datagramMuxer, + logger, + } +} + +func (d *datagramV3Connection) Serve(ctx context.Context) error { + return d.datagramMuxer.Serve(ctx) +} + +func (d *datagramV3Connection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) { + return nil, fmt.Errorf("datagram v3 does not support RegisterUdpSession RPC") +} + +func (d *datagramV3Connection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error { + return fmt.Errorf("datagram v3 does not support UnregisterUdpSession RPC") +} diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 8dde4e76..8d68f6b7 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -12,7 +12,9 @@ import ( "github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/edgediscovery" + "github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/orchestration" + v3 "github.com/cloudflare/cloudflared/quic/v3" "github.com/cloudflare/cloudflared/retry" "github.com/cloudflare/cloudflared/signal" "github.com/cloudflare/cloudflared/tunnelstate" @@ -80,9 +82,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato edgeAddrHandler := NewIPAddrFallback(config.MaxEdgeAddrRetries) edgeBindAddr := config.EdgeBindAddr + sessionManager := v3.NewSessionManager(config.Log, ingress.DialUDPAddrPort) + edgeTunnelServer := EdgeTunnelServer{ config: config, orchestrator: orchestrator, + sessionManager: sessionManager, edgeAddrs: edgeIPs, edgeAddrHandler: edgeAddrHandler, edgeBindAddr: edgeBindAddr, diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 7de2cbd0..3e6ca86a 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -7,6 +7,7 @@ import ( "net" "net/netip" "runtime/debug" + "slices" "strings" "sync" "time" @@ -24,6 +25,7 @@ import ( "github.com/cloudflare/cloudflared/management" "github.com/cloudflare/cloudflared/orchestration" quicpogs "github.com/cloudflare/cloudflared/quic" + v3 "github.com/cloudflare/cloudflared/quic/v3" "github.com/cloudflare/cloudflared/retry" "github.com/cloudflare/cloudflared/signal" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" @@ -87,14 +89,6 @@ func (c *TunnelConfig) connectionOptions(originLocalAddr string, numPreviousAtte } } -func (c *TunnelConfig) SupportedFeatures() []string { - supported := []string{features.FeatureSerializedHeaders} - if c.NamedTunnel == nil { - supported = append(supported, features.FeatureQuickReconnects) - } - return supported -} - func StartTunnelDaemon( ctx context.Context, config *TunnelConfig, @@ -181,6 +175,7 @@ func (f *ipAddrFallback) ShouldGetNewAddress(connIndex uint8, err error) (needsN type EdgeTunnelServer struct { config *TunnelConfig orchestrator *orchestration.Orchestrator + sessionManager v3.SessionManager edgeAddrHandler EdgeAddrHandler edgeAddrs *edgediscovery.Edge edgeBindAddr net.IP @@ -605,14 +600,24 @@ func (e *EdgeTunnelServer) serveQUIC( return err, true } - datagramSessionManager := connection.NewDatagramV2Connection( - ctx, - conn, - e.config.PacketConfig, - e.config.RPCTimeout, - e.config.WriteStreamTimeout, - connLogger.Logger(), - ) + var datagramSessionManager connection.DatagramSessionHandler + if slices.Contains(connOptions.Client.Features, features.FeatureDatagramV3) { + datagramSessionManager = connection.NewDatagramV3Connection( + ctx, + conn, + e.sessionManager, + connLogger.Logger(), + ) + } else { + datagramSessionManager = connection.NewDatagramV2Connection( + ctx, + conn, + e.config.PacketConfig, + e.config.RPCTimeout, + e.config.WriteStreamTimeout, + connLogger.Logger(), + ) + } // Wrap the [quic.Connection] as a TunnelConnection tunnelConn, err := connection.NewTunnelConnection(