TUN-8646: Allow experimental feature support for datagram v3

Closes TUN-8646
This commit is contained in:
Devin Carr 2024-11-04 13:59:32 -08:00
parent 5891c0d955
commit 589c198d2d
3 changed files with 74 additions and 16 deletions

View File

@ -0,0 +1,48 @@
package connection
import (
"context"
"fmt"
"net"
"time"
"github.com/google/uuid"
"github.com/quic-go/quic-go"
"github.com/rs/zerolog"
cfdquic "github.com/cloudflare/cloudflared/quic/v3"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
type datagramV3Connection struct {
conn quic.Connection
// datagramMuxer mux/demux datagrams from quic connection
datagramMuxer cfdquic.DatagramConn
logger *zerolog.Logger
}
func NewDatagramV3Connection(ctx context.Context,
conn quic.Connection,
sessionManager cfdquic.SessionManager,
logger *zerolog.Logger,
) DatagramSessionHandler {
datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, logger)
return &datagramV3Connection{
conn,
datagramMuxer,
logger,
}
}
func (d *datagramV3Connection) Serve(ctx context.Context) error {
return d.datagramMuxer.Serve(ctx)
}
func (d *datagramV3Connection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
return nil, fmt.Errorf("datagram v3 does not support RegisterUdpSession RPC")
}
func (d *datagramV3Connection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
return fmt.Errorf("datagram v3 does not support UnregisterUdpSession RPC")
}

View File

@ -12,7 +12,9 @@ import (
"github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/edgediscovery" "github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/orchestration" "github.com/cloudflare/cloudflared/orchestration"
v3 "github.com/cloudflare/cloudflared/quic/v3"
"github.com/cloudflare/cloudflared/retry" "github.com/cloudflare/cloudflared/retry"
"github.com/cloudflare/cloudflared/signal" "github.com/cloudflare/cloudflared/signal"
"github.com/cloudflare/cloudflared/tunnelstate" "github.com/cloudflare/cloudflared/tunnelstate"
@ -80,9 +82,12 @@ func NewSupervisor(config *TunnelConfig, orchestrator *orchestration.Orchestrato
edgeAddrHandler := NewIPAddrFallback(config.MaxEdgeAddrRetries) edgeAddrHandler := NewIPAddrFallback(config.MaxEdgeAddrRetries)
edgeBindAddr := config.EdgeBindAddr edgeBindAddr := config.EdgeBindAddr
sessionManager := v3.NewSessionManager(config.Log, ingress.DialUDPAddrPort)
edgeTunnelServer := EdgeTunnelServer{ edgeTunnelServer := EdgeTunnelServer{
config: config, config: config,
orchestrator: orchestrator, orchestrator: orchestrator,
sessionManager: sessionManager,
edgeAddrs: edgeIPs, edgeAddrs: edgeIPs,
edgeAddrHandler: edgeAddrHandler, edgeAddrHandler: edgeAddrHandler,
edgeBindAddr: edgeBindAddr, edgeBindAddr: edgeBindAddr,

View File

@ -7,6 +7,7 @@ import (
"net" "net"
"net/netip" "net/netip"
"runtime/debug" "runtime/debug"
"slices"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -24,6 +25,7 @@ import (
"github.com/cloudflare/cloudflared/management" "github.com/cloudflare/cloudflared/management"
"github.com/cloudflare/cloudflared/orchestration" "github.com/cloudflare/cloudflared/orchestration"
quicpogs "github.com/cloudflare/cloudflared/quic" quicpogs "github.com/cloudflare/cloudflared/quic"
v3 "github.com/cloudflare/cloudflared/quic/v3"
"github.com/cloudflare/cloudflared/retry" "github.com/cloudflare/cloudflared/retry"
"github.com/cloudflare/cloudflared/signal" "github.com/cloudflare/cloudflared/signal"
"github.com/cloudflare/cloudflared/tunnelrpc/pogs" "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
@ -87,14 +89,6 @@ func (c *TunnelConfig) connectionOptions(originLocalAddr string, numPreviousAtte
} }
} }
func (c *TunnelConfig) SupportedFeatures() []string {
supported := []string{features.FeatureSerializedHeaders}
if c.NamedTunnel == nil {
supported = append(supported, features.FeatureQuickReconnects)
}
return supported
}
func StartTunnelDaemon( func StartTunnelDaemon(
ctx context.Context, ctx context.Context,
config *TunnelConfig, config *TunnelConfig,
@ -181,6 +175,7 @@ func (f *ipAddrFallback) ShouldGetNewAddress(connIndex uint8, err error) (needsN
type EdgeTunnelServer struct { type EdgeTunnelServer struct {
config *TunnelConfig config *TunnelConfig
orchestrator *orchestration.Orchestrator orchestrator *orchestration.Orchestrator
sessionManager v3.SessionManager
edgeAddrHandler EdgeAddrHandler edgeAddrHandler EdgeAddrHandler
edgeAddrs *edgediscovery.Edge edgeAddrs *edgediscovery.Edge
edgeBindAddr net.IP edgeBindAddr net.IP
@ -605,14 +600,24 @@ func (e *EdgeTunnelServer) serveQUIC(
return err, true return err, true
} }
datagramSessionManager := connection.NewDatagramV2Connection( var datagramSessionManager connection.DatagramSessionHandler
ctx, if slices.Contains(connOptions.Client.Features, features.FeatureDatagramV3) {
conn, datagramSessionManager = connection.NewDatagramV3Connection(
e.config.PacketConfig, ctx,
e.config.RPCTimeout, conn,
e.config.WriteStreamTimeout, e.sessionManager,
connLogger.Logger(), connLogger.Logger(),
) )
} else {
datagramSessionManager = connection.NewDatagramV2Connection(
ctx,
conn,
e.config.PacketConfig,
e.config.RPCTimeout,
e.config.WriteStreamTimeout,
connLogger.Logger(),
)
}
// Wrap the [quic.Connection] as a TunnelConnection // Wrap the [quic.Connection] as a TunnelConnection
tunnelConn, err := connection.NewTunnelConnection( tunnelConn, err := connection.NewTunnelConnection(