TUN-7252: Remove h2mux connection

This commit is contained in:
Devin Carr 2023-03-07 13:51:37 -08:00
parent 7080b8b2e6
commit 27f88ae209
6 changed files with 5 additions and 763 deletions

View File

@ -25,7 +25,6 @@ import (
"github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/edgediscovery" "github.com/cloudflare/cloudflared/edgediscovery"
"github.com/cloudflare/cloudflared/edgediscovery/allregions" "github.com/cloudflare/cloudflared/edgediscovery/allregions"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/ingress"
"github.com/cloudflare/cloudflared/orchestration" "github.com/cloudflare/cloudflared/orchestration"
"github.com/cloudflare/cloudflared/supervisor" "github.com/cloudflare/cloudflared/supervisor"
@ -272,14 +271,6 @@ func prepareTunnelConfig(
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
muxerConfig := &connection.MuxerConfig{
HeartbeatInterval: c.Duration("heartbeat-interval"),
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
MaxHeartbeats: uint64(c.Int("heartbeat-count")),
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
CompressionSetting: h2mux.CompressionSetting(uint64(c.Int("compression-quality"))),
MetricsUpdateFreq: c.Duration("metrics-update-freq"),
}
edgeIPVersion, err := parseConfigIPVersion(c.String("edge-ip-version")) edgeIPVersion, err := parseConfigIPVersion(c.String("edge-ip-version"))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -328,7 +319,6 @@ func prepareTunnelConfig(
Retries: uint(c.Int("retries")), Retries: uint(c.Int("retries")),
RunFromTerminal: isRunningFromTerminal(), RunFromTerminal: isRunningFromTerminal(),
NamedTunnel: namedTunnel, NamedTunnel: namedTunnel,
MuxerConfig: muxerConfig,
ProtocolSelector: protocolSelector, ProtocolSelector: protocolSelector,
EdgeTLSConfigs: edgeTLSConfigs, EdgeTLSConfigs: edgeTLSConfigs,
NeedPQ: needPQ, NeedPQ: needPQ,

View File

@ -1,46 +1,17 @@
package connection package connection
import ( import (
"context"
"io"
"net"
"net/http"
"time" "time"
"github.com/pkg/errors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/h2mux" "github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/tracing"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/cloudflare/cloudflared/websocket"
) )
const ( const (
muxerTimeout = 5 * time.Second muxerTimeout = 5 * time.Second
openStreamTimeout = 30 * time.Second
) )
type h2muxConnection struct {
orchestrator Orchestrator
gracePeriod time.Duration
muxerConfig *MuxerConfig
muxer *h2mux.Muxer
// connectionID is only used by metrics, and prometheus requires labels to be string
connIndexStr string
connIndex uint8
observer *Observer
gracefulShutdownC <-chan struct{}
stoppedGracefully bool
log *zerolog.Logger
// newRPCClientFunc allows us to mock RPCs during testing
newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
}
type MuxerConfig struct { type MuxerConfig struct {
HeartbeatInterval time.Duration HeartbeatInterval time.Duration
MaxHeartbeats uint64 MaxHeartbeats uint64
@ -59,180 +30,3 @@ func (mc *MuxerConfig) H2MuxerConfig(h h2mux.MuxedStreamHandler, log *zerolog.Lo
CompressionQuality: mc.CompressionSetting, CompressionQuality: mc.CompressionSetting,
} }
} }
// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
func NewH2muxConnection(
orchestrator Orchestrator,
gracePeriod time.Duration,
muxerConfig *MuxerConfig,
edgeConn net.Conn,
connIndex uint8,
observer *Observer,
gracefulShutdownC <-chan struct{},
log *zerolog.Logger,
) (*h2muxConnection, error, bool) {
h := &h2muxConnection{
orchestrator: orchestrator,
gracePeriod: gracePeriod,
muxerConfig: muxerConfig,
connIndexStr: uint8ToString(connIndex),
connIndex: connIndex,
observer: observer,
gracefulShutdownC: gracefulShutdownC,
newRPCClientFunc: newRegistrationRPCClient,
log: log,
}
// Establish a muxed connection with the edge
// Client mux handshake with agent server
muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig.H2MuxerConfig(h, observer.logTransport), h2mux.ActiveStreams)
if err != nil {
recoverable := isHandshakeErrRecoverable(err, connIndex, observer)
return nil, err, recoverable
}
h.muxer = muxer
return h, nil, false
}
func (h *h2muxConnection) ServeNamedTunnel(ctx context.Context, namedTunnel *NamedTunnelProperties, connOptions *tunnelpogs.ConnectionOptions, connectedFuse ConnectedFuse) error {
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return h.serveMuxer(serveCtx)
})
errGroup.Go(func() error {
if err := h.registerNamedTunnel(serveCtx, namedTunnel, connOptions); err != nil {
return err
}
connectedFuse.Connected()
return nil
})
errGroup.Go(func() error {
h.controlLoop(serveCtx, connectedFuse, true)
return nil
})
err := errGroup.Wait()
if err == errMuxerStopped {
if h.stoppedGracefully {
return nil
}
h.observer.log.Info().Uint8(LogFieldConnIndex, h.connIndex).Msg("Unexpected muxer shutdown")
}
return err
}
func (h *h2muxConnection) serveMuxer(ctx context.Context) error {
// All routines should stop when muxer finish serving. When muxer is shutdown
// gracefully, it doesn't return an error, so we need to return errMuxerShutdown
// here to notify other routines to stop
err := h.muxer.Serve(ctx)
if err == nil {
return errMuxerStopped
}
return err
}
func (h *h2muxConnection) controlLoop(ctx context.Context, connectedFuse ConnectedFuse, isNamedTunnel bool) {
updateMetricsTicker := time.NewTicker(h.muxerConfig.MetricsUpdateFreq)
defer updateMetricsTicker.Stop()
var shutdownCompleted <-chan struct{}
for {
select {
case <-h.gracefulShutdownC:
if connectedFuse.IsConnected() {
h.unregister(isNamedTunnel)
}
h.stoppedGracefully = true
h.gracefulShutdownC = nil
shutdownCompleted = h.muxer.Shutdown()
case <-shutdownCompleted:
return
case <-ctx.Done():
// UnregisterTunnel blocks until the RPC call returns
if !h.stoppedGracefully && connectedFuse.IsConnected() {
h.unregister(isNamedTunnel)
}
h.muxer.Shutdown()
// don't wait for shutdown to finish when context is closed, this is the hard termination path
return
case <-updateMetricsTicker.C:
h.observer.metrics.updateMuxerMetrics(h.connIndexStr, h.muxer.Metrics())
}
}
}
func (h *h2muxConnection) newRPCStream(ctx context.Context, rpcName rpcName) (*h2mux.MuxedStream, error) {
openStreamCtx, openStreamCancel := context.WithTimeout(ctx, openStreamTimeout)
defer openStreamCancel()
stream, err := h.muxer.OpenRPCStream(openStreamCtx)
if err != nil {
return nil, err
}
return stream, nil
}
func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error {
respWriter := &h2muxRespWriter{stream}
req, reqErr := h.newRequest(stream)
if reqErr != nil {
respWriter.WriteErrorResponse()
return reqErr
}
var sourceConnectionType = TypeHTTP
if websocket.IsWebSocketUpgrade(req) {
sourceConnectionType = TypeWebsocket
}
originProxy, err := h.orchestrator.GetOriginProxy()
if err != nil {
respWriter.WriteErrorResponse()
return err
}
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, h.connIndex, h.log), sourceConnectionType == TypeWebsocket)
if err != nil {
respWriter.WriteErrorResponse()
}
return err
}
func (h *h2muxConnection) newRequest(stream *h2mux.MuxedStream) (*http.Request, error) {
req, err := http.NewRequest("GET", "http://localhost:8080", h2mux.MuxedStreamReader{MuxedStream: stream})
if err != nil {
return nil, errors.Wrap(err, "Unexpected error from http.NewRequest")
}
err = H2RequestHeadersToH1Request(stream.Headers, req)
if err != nil {
return nil, errors.Wrap(err, "invalid request received")
}
return req, nil
}
type h2muxRespWriter struct {
*h2mux.MuxedStream
}
func (rp *h2muxRespWriter) AddTrailer(trailerName, trailerValue string) {
// do nothing. we don't support trailers over h2mux
}
func (rp *h2muxRespWriter) WriteRespHeaders(status int, header http.Header) error {
headers := H1ResponseToH2ResponseHeaders(status, header)
headers = append(headers, h2mux.Header{Name: ResponseMetaHeader, Value: responseMetaHeaderOrigin})
return rp.WriteHeaders(headers)
}
func (rp *h2muxRespWriter) WriteErrorResponse() {
_ = rp.WriteHeaders([]h2mux.Header{
{Name: ":status", Value: "502"},
{Name: ResponseMetaHeader, Value: responseMetaHeaderCfd},
})
_, _ = rp.Write([]byte("502 Bad Gateway"))
}

View File

@ -1,301 +0,0 @@
package connection
import (
"context"
"fmt"
"io"
"net"
"net/http"
"strconv"
"sync"
"testing"
"time"
"github.com/gobwas/ws/wsutil"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cloudflare/cloudflared/h2mux"
)
var (
testMuxerConfig = &MuxerConfig{
HeartbeatInterval: time.Second * 5,
MaxHeartbeats: 5,
CompressionSetting: 0,
MetricsUpdateFreq: time.Second * 5,
}
)
func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
edgeConn, originConn := net.Pipe()
edgeMuxChan := make(chan *h2mux.Muxer)
go func() {
edgeMuxConfig := h2mux.MuxerConfig{
Log: &log,
Handler: h2mux.MuxedStreamFunc(func(stream *h2mux.MuxedStream) error {
// we only expect RPC traffic in client->edge direction, provide minimal support for mocking
require.True(t, stream.IsRPCStream())
return stream.WriteHeaders([]h2mux.Header{
{Name: ":status", Value: "200"},
})
}),
}
edgeMux, err := h2mux.Handshake(edgeConn, edgeConn, edgeMuxConfig, h2mux.ActiveStreams)
require.NoError(t, err)
edgeMuxChan <- edgeMux
}()
var connIndex = uint8(0)
testObserver := NewObserver(&log, &log)
h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil, &log)
require.NoError(t, err)
return h2muxConn, <-edgeMuxChan
}
func TestServeStreamHTTP(t *testing.T) {
tests := []testRequest{
{
name: "ok",
endpoint: "/ok",
expectedStatus: http.StatusOK,
expectedBody: []byte(http.StatusText(http.StatusOK)),
},
{
name: "large_file",
endpoint: "/large_file",
expectedStatus: http.StatusOK,
expectedBody: testLargeResp,
},
{
name: "Bad request",
endpoint: "/400",
expectedStatus: http.StatusBadRequest,
expectedBody: []byte(http.StatusText(http.StatusBadRequest)),
},
{
name: "Internal server error",
endpoint: "/500",
expectedStatus: http.StatusInternalServerError,
expectedBody: []byte(http.StatusText(http.StatusInternalServerError)),
},
{
name: "Proxy error",
endpoint: "/error",
expectedStatus: http.StatusBadGateway,
expectedBody: nil,
isProxyError: true,
},
}
ctx, cancel := context.WithCancel(context.Background())
h2muxConn, edgeMux := newH2MuxConnection(t)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
_ = edgeMux.Serve(ctx)
}()
go func() {
defer wg.Done()
err := h2muxConn.serveMuxer(ctx)
require.Error(t, err)
}()
for _, test := range tests {
headers := []h2mux.Header{
{
Name: ":path",
Value: test.endpoint,
},
}
stream, err := edgeMux.OpenStream(ctx, headers, nil)
require.NoError(t, err)
require.True(t, hasHeader(stream, ":status", strconv.Itoa(test.expectedStatus)))
if test.isProxyError {
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderCfd))
} else {
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
body := make([]byte, len(test.expectedBody))
_, err = stream.Read(body)
require.NoError(t, err)
require.Equal(t, test.expectedBody, body)
}
}
cancel()
wg.Wait()
}
func TestServeStreamWS(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
h2muxConn, edgeMux := newH2MuxConnection(t)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
edgeMux.Serve(ctx)
}()
go func() {
defer wg.Done()
err := h2muxConn.serveMuxer(ctx)
require.Error(t, err)
}()
headers := []h2mux.Header{
{
Name: ":path",
Value: "/ws/echo",
},
{
Name: "connection",
Value: "upgrade",
},
{
Name: "upgrade",
Value: "websocket",
},
}
readPipe, writePipe := io.Pipe()
stream, err := edgeMux.OpenStream(ctx, headers, readPipe)
require.NoError(t, err)
require.True(t, hasHeader(stream, ":status", strconv.Itoa(http.StatusSwitchingProtocols)))
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
data := []byte("test websocket")
err = wsutil.WriteClientBinary(writePipe, data)
require.NoError(t, err)
respBody, err := wsutil.ReadServerBinary(stream)
require.NoError(t, err)
require.Equal(t, data, respBody, fmt.Sprintf("Expect %s, got %s", string(data), string(respBody)))
cancel()
wg.Wait()
}
func TestGracefulShutdownH2Mux(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h2muxConn, edgeMux := newH2MuxConnection(t)
shutdownC := make(chan struct{})
unregisteredC := make(chan struct{})
h2muxConn.gracefulShutdownC = shutdownC
h2muxConn.newRPCClientFunc = func(_ context.Context, _ io.ReadWriteCloser, _ *zerolog.Logger) NamedTunnelRPCClient {
return &mockNamedTunnelRPCClient{
registered: nil,
unregistered: unregisteredC,
}
}
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
_ = edgeMux.Serve(ctx)
}()
go func() {
defer wg.Done()
_ = h2muxConn.serveMuxer(ctx)
}()
go func() {
defer wg.Done()
h2muxConn.controlLoop(ctx, &mockConnectedFuse{}, true)
}()
time.Sleep(100 * time.Millisecond)
close(shutdownC)
select {
case <-unregisteredC:
break // ok
case <-time.Tick(time.Second):
assert.Fail(t, "timed out waiting for control loop to unregister")
}
cancel()
wg.Wait()
assert.True(t, h2muxConn.stoppedGracefully)
assert.Nil(t, h2muxConn.gracefulShutdownC)
}
func hasHeader(stream *h2mux.MuxedStream, name, val string) bool {
for _, header := range stream.Headers {
if header.Name == name && header.Value == val {
return true
}
}
return false
}
func benchmarkServeStreamHTTPSimple(b *testing.B, test testRequest) {
ctx, cancel := context.WithCancel(context.Background())
h2muxConn, edgeMux := newH2MuxConnection(b)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
edgeMux.Serve(ctx)
}()
go func() {
defer wg.Done()
err := h2muxConn.serveMuxer(ctx)
require.Error(b, err)
}()
headers := []h2mux.Header{
{
Name: ":path",
Value: test.endpoint,
},
}
body := make([]byte, len(test.expectedBody))
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StartTimer()
stream, openstreamErr := edgeMux.OpenStream(ctx, headers, nil)
_, readBodyErr := stream.Read(body)
b.StopTimer()
require.NoError(b, openstreamErr)
assert.True(b, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
require.True(b, hasHeader(stream, ":status", strconv.Itoa(http.StatusOK)))
require.NoError(b, readBodyErr)
require.Equal(b, test.expectedBody, body)
}
cancel()
wg.Wait()
}
func BenchmarkServeStreamHTTPSimple(b *testing.B) {
test := testRequest{
name: "ok",
endpoint: "/ok",
expectedStatus: http.StatusOK,
expectedBody: []byte(http.StatusText(http.StatusOK)),
}
benchmarkServeStreamHTTPSimple(b, test)
}
func BenchmarkServeStreamHTTPLargeFile(b *testing.B) {
test := testRequest{
name: "large_file",
endpoint: "/large_file",
expectedStatus: http.StatusOK,
expectedBody: testLargeResp,
}
benchmarkServeStreamHTTPSimple(b, test)
}

View File

@ -2,7 +2,6 @@ package connection
import ( import (
"context" "context"
"fmt"
"io" "io"
"net" "net"
"time" "time"
@ -152,178 +151,3 @@ const (
unregister rpcName = "unregister" unregister rpcName = "unregister"
authenticate rpcName = " authenticate" authenticate rpcName = " authenticate"
) )
func (h *h2muxConnection) registerTunnel(ctx context.Context, credentialSetter CredentialManager, classicTunnel *ClassicTunnelProperties, registrationOptions *tunnelpogs.RegistrationOptions) error {
h.observer.sendRegisteringEvent(registrationOptions.ConnectionID)
stream, err := h.newRPCStream(ctx, register)
if err != nil {
return err
}
rpcClient := NewTunnelServerClient(ctx, stream, h.observer.log)
defer rpcClient.Close()
_ = h.logServerInfo(ctx, rpcClient)
registration := rpcClient.client.RegisterTunnel(
ctx,
classicTunnel.OriginCert,
classicTunnel.Hostname,
registrationOptions,
)
if registrationErr := registration.DeserializeError(); registrationErr != nil {
// RegisterTunnel RPC failure
return h.processRegisterTunnelError(registrationErr, register)
}
credentialSetter.SetEventDigest(h.connIndex, registration.EventDigest)
return h.processRegistrationSuccess(registration, register, credentialSetter, classicTunnel)
}
type CredentialManager interface {
ReconnectToken() ([]byte, error)
EventDigest(connID uint8) ([]byte, error)
SetEventDigest(connID uint8, digest []byte)
ConnDigest(connID uint8) ([]byte, error)
SetConnDigest(connID uint8, digest []byte)
}
func (h *h2muxConnection) processRegistrationSuccess(
registration *tunnelpogs.TunnelRegistration,
name rpcName,
credentialManager CredentialManager, classicTunnel *ClassicTunnelProperties,
) error {
for _, logLine := range registration.LogLines {
h.observer.log.Info().Msg(logLine)
}
if registration.TunnelID != "" {
h.observer.metrics.tunnelsHA.AddTunnelID(h.connIndex, registration.TunnelID)
h.observer.log.Info().Msgf("Each HA connection's tunnel IDs: %v", h.observer.metrics.tunnelsHA.String())
}
credentialManager.SetConnDigest(h.connIndex, registration.ConnDigest)
h.observer.metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc()
h.observer.log.Info().Msgf("Route propagating, it may take up to 1 minute for your new route to become functional")
h.observer.metrics.regSuccess.WithLabelValues(string(name)).Inc()
return nil
}
func (h *h2muxConnection) processRegisterTunnelError(err tunnelpogs.TunnelRegistrationError, name rpcName) error {
if err.Error() == DuplicateConnectionError {
h.observer.metrics.regFail.WithLabelValues("dup_edge_conn", string(name)).Inc()
return errDuplicationConnection
}
h.observer.metrics.regFail.WithLabelValues("server_error", string(name)).Inc()
return ServerRegisterTunnelError{
Cause: err,
Permanent: err.IsPermanent(),
}
}
func (h *h2muxConnection) reconnectTunnel(ctx context.Context, credentialManager CredentialManager, classicTunnel *ClassicTunnelProperties, registrationOptions *tunnelpogs.RegistrationOptions) error {
token, err := credentialManager.ReconnectToken()
if err != nil {
return err
}
eventDigest, err := credentialManager.EventDigest(h.connIndex)
if err != nil {
return err
}
connDigest, err := credentialManager.ConnDigest(h.connIndex)
if err != nil {
return err
}
h.observer.log.Debug().Msg("initiating RPC stream to reconnect")
stream, err := h.newRPCStream(ctx, register)
if err != nil {
return err
}
rpcClient := NewTunnelServerClient(ctx, stream, h.observer.log)
defer rpcClient.Close()
_ = h.logServerInfo(ctx, rpcClient)
registration := rpcClient.client.ReconnectTunnel(
ctx,
token,
eventDigest,
connDigest,
classicTunnel.Hostname,
registrationOptions,
)
if registrationErr := registration.DeserializeError(); registrationErr != nil {
// ReconnectTunnel RPC failure
return h.processRegisterTunnelError(registrationErr, reconnect)
}
return h.processRegistrationSuccess(registration, reconnect, credentialManager, classicTunnel)
}
func (h *h2muxConnection) logServerInfo(ctx context.Context, rpcClient *tunnelServerClient) error {
// Request server info without blocking tunnel registration; must use capnp library directly.
serverInfoPromise := tunnelrpc.TunnelServer{Client: rpcClient.client.Client}.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error {
return nil
})
serverInfoMessage, err := serverInfoPromise.Result().Struct()
if err != nil {
h.observer.log.Err(err).Msg("Failed to retrieve server information")
return err
}
serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage)
if err != nil {
h.observer.log.Err(err).Msg("Failed to retrieve server information")
return err
}
h.observer.logServerInfo(h.connIndex, serverInfo.LocationName, net.IP{}, "Connection established")
return nil
}
func (h *h2muxConnection) registerNamedTunnel(
ctx context.Context,
namedTunnel *NamedTunnelProperties,
connOptions *tunnelpogs.ConnectionOptions,
) error {
stream, err := h.newRPCStream(ctx, register)
if err != nil {
return err
}
rpcClient := h.newRPCClientFunc(ctx, stream, h.observer.log)
defer rpcClient.Close()
registrationDetails, err := rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, nil, h.observer)
if err != nil {
return err
}
h.observer.logServerInfo(h.connIndex, registrationDetails.Location, nil, fmt.Sprintf("Connection %s registered", registrationDetails.UUID))
h.observer.sendConnectedEvent(h.connIndex, 0, registrationDetails.Location)
return nil
}
func (h *h2muxConnection) unregister(isNamedTunnel bool) {
h.observer.sendUnregisteringEvent(h.connIndex)
unregisterCtx, cancel := context.WithTimeout(context.Background(), h.gracePeriod)
defer cancel()
stream, err := h.newRPCStream(unregisterCtx, unregister)
if err != nil {
return
}
defer stream.Close()
if isNamedTunnel {
rpcClient := h.newRPCClientFunc(unregisterCtx, stream, h.observer.log)
defer rpcClient.Close()
rpcClient.GracefulShutdown(unregisterCtx, h.gracePeriod)
} else {
rpcClient := NewTunnelServerClient(unregisterCtx, stream, h.observer.log)
defer rpcClient.Close()
// gracePeriod is encoded in int64 using capnproto
_ = rpcClient.client.UnregisterTunnel(unregisterCtx, h.gracePeriod.Nanoseconds())
}
h.observer.log.Info().Uint8(LogFieldConnIndex, h.connIndex).Msg("Unregistered tunnel connection")
}

View File

@ -23,7 +23,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/cfio" "github.com/cloudflare/cloudflared/cfio"
"github.com/cloudflare/cloudflared/config" "github.com/cloudflare/cloudflared/config"
"github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/hello" "github.com/cloudflare/cloudflared/hello"
@ -34,7 +33,7 @@ import (
) )
var ( var (
testTags = []tunnelpogs.Tag{tunnelpogs.Tag{Name: "Name", Value: "value"}} testTags = []tunnelpogs.Tag{{Name: "Name", Value: "value"}}
noWarpRouting = ingress.WarpRoutingConfig{} noWarpRouting = ingress.WarpRoutingConfig{}
testWarpRouting = ingress.WarpRoutingConfig{ testWarpRouting = ingress.WarpRoutingConfig{
Enabled: true, Enabled: true,

View File

@ -69,7 +69,6 @@ type TunnelConfig struct {
PQKexIdx int PQKexIdx int
NamedTunnel *connection.NamedTunnelProperties NamedTunnel *connection.NamedTunnelProperties
MuxerConfig *connection.MuxerConfig
ProtocolSelector connection.ProtocolSelector ProtocolSelector connection.ProtocolSelector
EdgeTLSConfigs map[connection.Protocol]*tls.Config EdgeTLSConfigs map[connection.Protocol]*tls.Config
PacketConfig *ingress.GlobalRouterConfig PacketConfig *ingress.GlobalRouterConfig
@ -91,7 +90,7 @@ func (c *TunnelConfig) registrationOptions(connectionID uint8, OriginLocalIP str
OriginLocalIP: OriginLocalIP, OriginLocalIP: OriginLocalIP,
IsAutoupdated: c.IsAutoupdated, IsAutoupdated: c.IsAutoupdated,
RunFromTerminal: c.RunFromTerminal, RunFromTerminal: c.RunFromTerminal,
CompressionQuality: uint64(c.MuxerConfig.CompressionSetting), CompressionQuality: 0,
UUID: uuid.String(), UUID: uuid.String(),
Features: c.SupportedFeatures(), Features: c.SupportedFeatures(),
} }
@ -106,7 +105,7 @@ func (c *TunnelConfig) connectionOptions(originLocalAddr string, numPreviousAtte
Client: c.NamedTunnel.Client, Client: c.NamedTunnel.Client,
OriginLocalIP: originIP, OriginLocalIP: originIP,
ReplaceExisting: c.ReplaceExisting, ReplaceExisting: c.ReplaceExisting,
CompressionQuality: uint8(c.MuxerConfig.CompressionSetting), CompressionQuality: 0,
NumPreviousAttempts: numPreviousAttempts, NumPreviousAttempts: numPreviousAttempts,
} }
} }
@ -518,21 +517,7 @@ func (e *EdgeTunnelServer) serveConnection(
} }
default: default:
edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, e.config.EdgeTLSConfigs[protocol], addr.TCP, e.edgeBindAddr) return fmt.Errorf("invalid protocol selected: %s", protocol), false
if err != nil {
connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge")
return err, true
}
if err := e.serveH2mux(
ctx,
connLog,
edgeConn,
connIndex,
connectedFuse,
); err != nil {
return err, false
}
} }
return return
} }
@ -545,55 +530,6 @@ func (r unrecoverableError) Error() string {
return r.err.Error() return r.err.Error()
} }
func (e *EdgeTunnelServer) serveH2mux(
ctx context.Context,
connLog *ConnAwareLogger,
edgeConn net.Conn,
connIndex uint8,
connectedFuse *connectedFuse,
) error {
if e.config.NeedPQ {
return unrecoverableError{errors.New("H2Mux transport does not support post-quantum")}
}
connLog.Logger().Debug().Msgf("Connecting via h2mux")
// Returns error from parsing the origin URL or handshake errors
handler, err, recoverable := connection.NewH2muxConnection(
e.orchestrator,
e.config.GracePeriod,
e.config.MuxerConfig,
edgeConn,
connIndex,
e.config.Observer,
e.gracefulShutdownC,
e.config.Log,
)
if err != nil {
if !recoverable {
return unrecoverableError{err}
}
return err
}
errGroup, serveCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
connOptions := e.config.connectionOptions(edgeConn.LocalAddr().String(), uint8(connectedFuse.backoff.Retries()))
return handler.ServeNamedTunnel(serveCtx, e.config.NamedTunnel, connOptions, connectedFuse)
})
errGroup.Go(func() error {
err := listenReconnect(serveCtx, e.reconnectCh, e.gracefulShutdownC)
if err != nil {
// forcefully break the connection (this is only used for testing)
// errgroup will return context canceled for the handler.ServeClassicTunnel
connLog.Logger().Debug().Msg("Forcefully breaking h2mux connection")
}
return err
})
return errGroup.Wait()
}
func (e *EdgeTunnelServer) serveHTTP2( func (e *EdgeTunnelServer) serveHTTP2(
ctx context.Context, ctx context.Context,
connLog *ConnAwareLogger, connLog *ConnAwareLogger,