From 27f88ae2097dcc647de596a4e751dc533e36f4f4 Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Tue, 7 Mar 2023 13:51:37 -0800 Subject: [PATCH] TUN-7252: Remove h2mux connection --- cmd/cloudflared/tunnel/configuration.go | 10 - connection/h2mux.go | 208 +--------------- connection/h2mux_test.go | 301 ------------------------ connection/rpc.go | 176 -------------- proxy/proxy_test.go | 3 +- supervisor/tunnel.go | 70 +----- 6 files changed, 5 insertions(+), 763 deletions(-) delete mode 100644 connection/h2mux_test.go diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index 4f6919c6..dec85fc5 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -25,7 +25,6 @@ import ( "github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/edgediscovery" "github.com/cloudflare/cloudflared/edgediscovery/allregions" - "github.com/cloudflare/cloudflared/h2mux" "github.com/cloudflare/cloudflared/ingress" "github.com/cloudflare/cloudflared/orchestration" "github.com/cloudflare/cloudflared/supervisor" @@ -272,14 +271,6 @@ func prepareTunnelConfig( if err != nil { return nil, nil, err } - muxerConfig := &connection.MuxerConfig{ - HeartbeatInterval: c.Duration("heartbeat-interval"), - // Note TUN-3758 , we use Int because UInt is not supported with altsrc - MaxHeartbeats: uint64(c.Int("heartbeat-count")), - // Note TUN-3758 , we use Int because UInt is not supported with altsrc - CompressionSetting: h2mux.CompressionSetting(uint64(c.Int("compression-quality"))), - MetricsUpdateFreq: c.Duration("metrics-update-freq"), - } edgeIPVersion, err := parseConfigIPVersion(c.String("edge-ip-version")) if err != nil { return nil, nil, err @@ -328,7 +319,6 @@ func prepareTunnelConfig( Retries: uint(c.Int("retries")), RunFromTerminal: isRunningFromTerminal(), NamedTunnel: namedTunnel, - MuxerConfig: muxerConfig, ProtocolSelector: protocolSelector, EdgeTLSConfigs: edgeTLSConfigs, NeedPQ: needPQ, diff --git a/connection/h2mux.go b/connection/h2mux.go index 75170cf0..4de983bc 100644 --- a/connection/h2mux.go +++ b/connection/h2mux.go @@ -1,46 +1,17 @@ package connection import ( - "context" - "io" - "net" - "net/http" "time" - "github.com/pkg/errors" "github.com/rs/zerolog" - "golang.org/x/sync/errgroup" "github.com/cloudflare/cloudflared/h2mux" - "github.com/cloudflare/cloudflared/tracing" - tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" - "github.com/cloudflare/cloudflared/websocket" ) const ( - muxerTimeout = 5 * time.Second - openStreamTimeout = 30 * time.Second + muxerTimeout = 5 * time.Second ) -type h2muxConnection struct { - orchestrator Orchestrator - gracePeriod time.Duration - muxerConfig *MuxerConfig - muxer *h2mux.Muxer - // connectionID is only used by metrics, and prometheus requires labels to be string - connIndexStr string - connIndex uint8 - - observer *Observer - gracefulShutdownC <-chan struct{} - stoppedGracefully bool - - log *zerolog.Logger - - // newRPCClientFunc allows us to mock RPCs during testing - newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient -} - type MuxerConfig struct { HeartbeatInterval time.Duration MaxHeartbeats uint64 @@ -59,180 +30,3 @@ func (mc *MuxerConfig) H2MuxerConfig(h h2mux.MuxedStreamHandler, log *zerolog.Lo CompressionQuality: mc.CompressionSetting, } } - -// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error -func NewH2muxConnection( - orchestrator Orchestrator, - gracePeriod time.Duration, - muxerConfig *MuxerConfig, - edgeConn net.Conn, - connIndex uint8, - observer *Observer, - gracefulShutdownC <-chan struct{}, - log *zerolog.Logger, -) (*h2muxConnection, error, bool) { - h := &h2muxConnection{ - orchestrator: orchestrator, - gracePeriod: gracePeriod, - muxerConfig: muxerConfig, - connIndexStr: uint8ToString(connIndex), - connIndex: connIndex, - observer: observer, - gracefulShutdownC: gracefulShutdownC, - newRPCClientFunc: newRegistrationRPCClient, - log: log, - } - - // Establish a muxed connection with the edge - // Client mux handshake with agent server - muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig.H2MuxerConfig(h, observer.logTransport), h2mux.ActiveStreams) - if err != nil { - recoverable := isHandshakeErrRecoverable(err, connIndex, observer) - return nil, err, recoverable - } - h.muxer = muxer - return h, nil, false -} - -func (h *h2muxConnection) ServeNamedTunnel(ctx context.Context, namedTunnel *NamedTunnelProperties, connOptions *tunnelpogs.ConnectionOptions, connectedFuse ConnectedFuse) error { - errGroup, serveCtx := errgroup.WithContext(ctx) - errGroup.Go(func() error { - return h.serveMuxer(serveCtx) - }) - - errGroup.Go(func() error { - if err := h.registerNamedTunnel(serveCtx, namedTunnel, connOptions); err != nil { - return err - } - connectedFuse.Connected() - return nil - }) - - errGroup.Go(func() error { - h.controlLoop(serveCtx, connectedFuse, true) - return nil - }) - - err := errGroup.Wait() - if err == errMuxerStopped { - if h.stoppedGracefully { - return nil - } - h.observer.log.Info().Uint8(LogFieldConnIndex, h.connIndex).Msg("Unexpected muxer shutdown") - } - return err -} - -func (h *h2muxConnection) serveMuxer(ctx context.Context) error { - // All routines should stop when muxer finish serving. When muxer is shutdown - // gracefully, it doesn't return an error, so we need to return errMuxerShutdown - // here to notify other routines to stop - err := h.muxer.Serve(ctx) - if err == nil { - return errMuxerStopped - } - return err -} - -func (h *h2muxConnection) controlLoop(ctx context.Context, connectedFuse ConnectedFuse, isNamedTunnel bool) { - updateMetricsTicker := time.NewTicker(h.muxerConfig.MetricsUpdateFreq) - defer updateMetricsTicker.Stop() - var shutdownCompleted <-chan struct{} - for { - select { - case <-h.gracefulShutdownC: - if connectedFuse.IsConnected() { - h.unregister(isNamedTunnel) - } - h.stoppedGracefully = true - h.gracefulShutdownC = nil - shutdownCompleted = h.muxer.Shutdown() - - case <-shutdownCompleted: - return - - case <-ctx.Done(): - // UnregisterTunnel blocks until the RPC call returns - if !h.stoppedGracefully && connectedFuse.IsConnected() { - h.unregister(isNamedTunnel) - } - h.muxer.Shutdown() - // don't wait for shutdown to finish when context is closed, this is the hard termination path - return - - case <-updateMetricsTicker.C: - h.observer.metrics.updateMuxerMetrics(h.connIndexStr, h.muxer.Metrics()) - } - } -} - -func (h *h2muxConnection) newRPCStream(ctx context.Context, rpcName rpcName) (*h2mux.MuxedStream, error) { - openStreamCtx, openStreamCancel := context.WithTimeout(ctx, openStreamTimeout) - defer openStreamCancel() - stream, err := h.muxer.OpenRPCStream(openStreamCtx) - if err != nil { - return nil, err - } - return stream, nil -} - -func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error { - respWriter := &h2muxRespWriter{stream} - - req, reqErr := h.newRequest(stream) - if reqErr != nil { - respWriter.WriteErrorResponse() - return reqErr - } - - var sourceConnectionType = TypeHTTP - if websocket.IsWebSocketUpgrade(req) { - sourceConnectionType = TypeWebsocket - } - - originProxy, err := h.orchestrator.GetOriginProxy() - if err != nil { - respWriter.WriteErrorResponse() - return err - } - - err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, h.connIndex, h.log), sourceConnectionType == TypeWebsocket) - if err != nil { - respWriter.WriteErrorResponse() - } - return err -} - -func (h *h2muxConnection) newRequest(stream *h2mux.MuxedStream) (*http.Request, error) { - req, err := http.NewRequest("GET", "http://localhost:8080", h2mux.MuxedStreamReader{MuxedStream: stream}) - if err != nil { - return nil, errors.Wrap(err, "Unexpected error from http.NewRequest") - } - err = H2RequestHeadersToH1Request(stream.Headers, req) - if err != nil { - return nil, errors.Wrap(err, "invalid request received") - } - return req, nil -} - -type h2muxRespWriter struct { - *h2mux.MuxedStream -} - -func (rp *h2muxRespWriter) AddTrailer(trailerName, trailerValue string) { - // do nothing. we don't support trailers over h2mux -} - -func (rp *h2muxRespWriter) WriteRespHeaders(status int, header http.Header) error { - headers := H1ResponseToH2ResponseHeaders(status, header) - headers = append(headers, h2mux.Header{Name: ResponseMetaHeader, Value: responseMetaHeaderOrigin}) - return rp.WriteHeaders(headers) -} - -func (rp *h2muxRespWriter) WriteErrorResponse() { - _ = rp.WriteHeaders([]h2mux.Header{ - {Name: ":status", Value: "502"}, - {Name: ResponseMetaHeader, Value: responseMetaHeaderCfd}, - }) - _, _ = rp.Write([]byte("502 Bad Gateway")) -} diff --git a/connection/h2mux_test.go b/connection/h2mux_test.go deleted file mode 100644 index 53c28227..00000000 --- a/connection/h2mux_test.go +++ /dev/null @@ -1,301 +0,0 @@ -package connection - -import ( - "context" - "fmt" - "io" - "net" - "net/http" - "strconv" - "sync" - "testing" - "time" - - "github.com/gobwas/ws/wsutil" - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/cloudflare/cloudflared/h2mux" -) - -var ( - testMuxerConfig = &MuxerConfig{ - HeartbeatInterval: time.Second * 5, - MaxHeartbeats: 5, - CompressionSetting: 0, - MetricsUpdateFreq: time.Second * 5, - } -) - -func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) { - edgeConn, originConn := net.Pipe() - edgeMuxChan := make(chan *h2mux.Muxer) - go func() { - edgeMuxConfig := h2mux.MuxerConfig{ - Log: &log, - Handler: h2mux.MuxedStreamFunc(func(stream *h2mux.MuxedStream) error { - // we only expect RPC traffic in client->edge direction, provide minimal support for mocking - require.True(t, stream.IsRPCStream()) - return stream.WriteHeaders([]h2mux.Header{ - {Name: ":status", Value: "200"}, - }) - }), - } - edgeMux, err := h2mux.Handshake(edgeConn, edgeConn, edgeMuxConfig, h2mux.ActiveStreams) - require.NoError(t, err) - edgeMuxChan <- edgeMux - }() - var connIndex = uint8(0) - testObserver := NewObserver(&log, &log) - h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil, &log) - require.NoError(t, err) - return h2muxConn, <-edgeMuxChan -} - -func TestServeStreamHTTP(t *testing.T) { - tests := []testRequest{ - { - name: "ok", - endpoint: "/ok", - expectedStatus: http.StatusOK, - expectedBody: []byte(http.StatusText(http.StatusOK)), - }, - { - name: "large_file", - endpoint: "/large_file", - expectedStatus: http.StatusOK, - expectedBody: testLargeResp, - }, - { - name: "Bad request", - endpoint: "/400", - expectedStatus: http.StatusBadRequest, - expectedBody: []byte(http.StatusText(http.StatusBadRequest)), - }, - { - name: "Internal server error", - endpoint: "/500", - expectedStatus: http.StatusInternalServerError, - expectedBody: []byte(http.StatusText(http.StatusInternalServerError)), - }, - { - name: "Proxy error", - endpoint: "/error", - expectedStatus: http.StatusBadGateway, - expectedBody: nil, - isProxyError: true, - }, - } - - ctx, cancel := context.WithCancel(context.Background()) - h2muxConn, edgeMux := newH2MuxConnection(t) - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - _ = edgeMux.Serve(ctx) - }() - go func() { - defer wg.Done() - err := h2muxConn.serveMuxer(ctx) - require.Error(t, err) - }() - - for _, test := range tests { - headers := []h2mux.Header{ - { - Name: ":path", - Value: test.endpoint, - }, - } - stream, err := edgeMux.OpenStream(ctx, headers, nil) - require.NoError(t, err) - require.True(t, hasHeader(stream, ":status", strconv.Itoa(test.expectedStatus))) - - if test.isProxyError { - assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderCfd)) - } else { - assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin)) - body := make([]byte, len(test.expectedBody)) - _, err = stream.Read(body) - require.NoError(t, err) - require.Equal(t, test.expectedBody, body) - } - } - cancel() - wg.Wait() -} - -func TestServeStreamWS(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - h2muxConn, edgeMux := newH2MuxConnection(t) - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - edgeMux.Serve(ctx) - }() - go func() { - defer wg.Done() - err := h2muxConn.serveMuxer(ctx) - require.Error(t, err) - }() - - headers := []h2mux.Header{ - { - Name: ":path", - Value: "/ws/echo", - }, - { - Name: "connection", - Value: "upgrade", - }, - { - Name: "upgrade", - Value: "websocket", - }, - } - - readPipe, writePipe := io.Pipe() - stream, err := edgeMux.OpenStream(ctx, headers, readPipe) - require.NoError(t, err) - - require.True(t, hasHeader(stream, ":status", strconv.Itoa(http.StatusSwitchingProtocols))) - assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin)) - - data := []byte("test websocket") - err = wsutil.WriteClientBinary(writePipe, data) - require.NoError(t, err) - - respBody, err := wsutil.ReadServerBinary(stream) - require.NoError(t, err) - require.Equal(t, data, respBody, fmt.Sprintf("Expect %s, got %s", string(data), string(respBody))) - - cancel() - wg.Wait() -} - -func TestGracefulShutdownH2Mux(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - h2muxConn, edgeMux := newH2MuxConnection(t) - - shutdownC := make(chan struct{}) - unregisteredC := make(chan struct{}) - h2muxConn.gracefulShutdownC = shutdownC - h2muxConn.newRPCClientFunc = func(_ context.Context, _ io.ReadWriteCloser, _ *zerolog.Logger) NamedTunnelRPCClient { - return &mockNamedTunnelRPCClient{ - registered: nil, - unregistered: unregisteredC, - } - } - - var wg sync.WaitGroup - wg.Add(3) - go func() { - defer wg.Done() - _ = edgeMux.Serve(ctx) - }() - go func() { - defer wg.Done() - _ = h2muxConn.serveMuxer(ctx) - }() - - go func() { - defer wg.Done() - h2muxConn.controlLoop(ctx, &mockConnectedFuse{}, true) - }() - - time.Sleep(100 * time.Millisecond) - close(shutdownC) - - select { - case <-unregisteredC: - break // ok - case <-time.Tick(time.Second): - assert.Fail(t, "timed out waiting for control loop to unregister") - } - - cancel() - wg.Wait() - - assert.True(t, h2muxConn.stoppedGracefully) - assert.Nil(t, h2muxConn.gracefulShutdownC) -} - -func hasHeader(stream *h2mux.MuxedStream, name, val string) bool { - for _, header := range stream.Headers { - if header.Name == name && header.Value == val { - return true - } - } - return false -} - -func benchmarkServeStreamHTTPSimple(b *testing.B, test testRequest) { - ctx, cancel := context.WithCancel(context.Background()) - h2muxConn, edgeMux := newH2MuxConnection(b) - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - edgeMux.Serve(ctx) - }() - go func() { - defer wg.Done() - err := h2muxConn.serveMuxer(ctx) - require.Error(b, err) - }() - - headers := []h2mux.Header{ - { - Name: ":path", - Value: test.endpoint, - }, - } - - body := make([]byte, len(test.expectedBody)) - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StartTimer() - stream, openstreamErr := edgeMux.OpenStream(ctx, headers, nil) - _, readBodyErr := stream.Read(body) - b.StopTimer() - - require.NoError(b, openstreamErr) - assert.True(b, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin)) - require.True(b, hasHeader(stream, ":status", strconv.Itoa(http.StatusOK))) - require.NoError(b, readBodyErr) - require.Equal(b, test.expectedBody, body) - } - - cancel() - wg.Wait() -} - -func BenchmarkServeStreamHTTPSimple(b *testing.B) { - test := testRequest{ - name: "ok", - endpoint: "/ok", - expectedStatus: http.StatusOK, - expectedBody: []byte(http.StatusText(http.StatusOK)), - } - - benchmarkServeStreamHTTPSimple(b, test) -} - -func BenchmarkServeStreamHTTPLargeFile(b *testing.B) { - test := testRequest{ - name: "large_file", - endpoint: "/large_file", - expectedStatus: http.StatusOK, - expectedBody: testLargeResp, - } - - benchmarkServeStreamHTTPSimple(b, test) -} diff --git a/connection/rpc.go b/connection/rpc.go index a4a13e82..e01031f9 100644 --- a/connection/rpc.go +++ b/connection/rpc.go @@ -2,7 +2,6 @@ package connection import ( "context" - "fmt" "io" "net" "time" @@ -152,178 +151,3 @@ const ( unregister rpcName = "unregister" authenticate rpcName = " authenticate" ) - -func (h *h2muxConnection) registerTunnel(ctx context.Context, credentialSetter CredentialManager, classicTunnel *ClassicTunnelProperties, registrationOptions *tunnelpogs.RegistrationOptions) error { - h.observer.sendRegisteringEvent(registrationOptions.ConnectionID) - - stream, err := h.newRPCStream(ctx, register) - if err != nil { - return err - } - rpcClient := NewTunnelServerClient(ctx, stream, h.observer.log) - defer rpcClient.Close() - - _ = h.logServerInfo(ctx, rpcClient) - registration := rpcClient.client.RegisterTunnel( - ctx, - classicTunnel.OriginCert, - classicTunnel.Hostname, - registrationOptions, - ) - if registrationErr := registration.DeserializeError(); registrationErr != nil { - // RegisterTunnel RPC failure - return h.processRegisterTunnelError(registrationErr, register) - } - - credentialSetter.SetEventDigest(h.connIndex, registration.EventDigest) - return h.processRegistrationSuccess(registration, register, credentialSetter, classicTunnel) -} - -type CredentialManager interface { - ReconnectToken() ([]byte, error) - EventDigest(connID uint8) ([]byte, error) - SetEventDigest(connID uint8, digest []byte) - ConnDigest(connID uint8) ([]byte, error) - SetConnDigest(connID uint8, digest []byte) -} - -func (h *h2muxConnection) processRegistrationSuccess( - registration *tunnelpogs.TunnelRegistration, - name rpcName, - credentialManager CredentialManager, classicTunnel *ClassicTunnelProperties, -) error { - for _, logLine := range registration.LogLines { - h.observer.log.Info().Msg(logLine) - } - - if registration.TunnelID != "" { - h.observer.metrics.tunnelsHA.AddTunnelID(h.connIndex, registration.TunnelID) - h.observer.log.Info().Msgf("Each HA connection's tunnel IDs: %v", h.observer.metrics.tunnelsHA.String()) - } - - credentialManager.SetConnDigest(h.connIndex, registration.ConnDigest) - h.observer.metrics.userHostnamesCounts.WithLabelValues(registration.Url).Inc() - - h.observer.log.Info().Msgf("Route propagating, it may take up to 1 minute for your new route to become functional") - h.observer.metrics.regSuccess.WithLabelValues(string(name)).Inc() - return nil -} - -func (h *h2muxConnection) processRegisterTunnelError(err tunnelpogs.TunnelRegistrationError, name rpcName) error { - if err.Error() == DuplicateConnectionError { - h.observer.metrics.regFail.WithLabelValues("dup_edge_conn", string(name)).Inc() - return errDuplicationConnection - } - h.observer.metrics.regFail.WithLabelValues("server_error", string(name)).Inc() - return ServerRegisterTunnelError{ - Cause: err, - Permanent: err.IsPermanent(), - } -} - -func (h *h2muxConnection) reconnectTunnel(ctx context.Context, credentialManager CredentialManager, classicTunnel *ClassicTunnelProperties, registrationOptions *tunnelpogs.RegistrationOptions) error { - token, err := credentialManager.ReconnectToken() - if err != nil { - return err - } - eventDigest, err := credentialManager.EventDigest(h.connIndex) - if err != nil { - return err - } - connDigest, err := credentialManager.ConnDigest(h.connIndex) - if err != nil { - return err - } - - h.observer.log.Debug().Msg("initiating RPC stream to reconnect") - stream, err := h.newRPCStream(ctx, register) - if err != nil { - return err - } - rpcClient := NewTunnelServerClient(ctx, stream, h.observer.log) - defer rpcClient.Close() - - _ = h.logServerInfo(ctx, rpcClient) - registration := rpcClient.client.ReconnectTunnel( - ctx, - token, - eventDigest, - connDigest, - classicTunnel.Hostname, - registrationOptions, - ) - if registrationErr := registration.DeserializeError(); registrationErr != nil { - // ReconnectTunnel RPC failure - return h.processRegisterTunnelError(registrationErr, reconnect) - } - return h.processRegistrationSuccess(registration, reconnect, credentialManager, classicTunnel) -} - -func (h *h2muxConnection) logServerInfo(ctx context.Context, rpcClient *tunnelServerClient) error { - // Request server info without blocking tunnel registration; must use capnp library directly. - serverInfoPromise := tunnelrpc.TunnelServer{Client: rpcClient.client.Client}.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error { - return nil - }) - serverInfoMessage, err := serverInfoPromise.Result().Struct() - if err != nil { - h.observer.log.Err(err).Msg("Failed to retrieve server information") - return err - } - serverInfo, err := tunnelpogs.UnmarshalServerInfo(serverInfoMessage) - if err != nil { - h.observer.log.Err(err).Msg("Failed to retrieve server information") - return err - } - h.observer.logServerInfo(h.connIndex, serverInfo.LocationName, net.IP{}, "Connection established") - return nil -} - -func (h *h2muxConnection) registerNamedTunnel( - ctx context.Context, - namedTunnel *NamedTunnelProperties, - connOptions *tunnelpogs.ConnectionOptions, -) error { - stream, err := h.newRPCStream(ctx, register) - if err != nil { - return err - } - rpcClient := h.newRPCClientFunc(ctx, stream, h.observer.log) - defer rpcClient.Close() - - registrationDetails, err := rpcClient.RegisterConnection(ctx, namedTunnel, connOptions, h.connIndex, nil, h.observer) - if err != nil { - return err - } - h.observer.logServerInfo(h.connIndex, registrationDetails.Location, nil, fmt.Sprintf("Connection %s registered", registrationDetails.UUID)) - h.observer.sendConnectedEvent(h.connIndex, 0, registrationDetails.Location) - - return nil -} - -func (h *h2muxConnection) unregister(isNamedTunnel bool) { - h.observer.sendUnregisteringEvent(h.connIndex) - - unregisterCtx, cancel := context.WithTimeout(context.Background(), h.gracePeriod) - defer cancel() - - stream, err := h.newRPCStream(unregisterCtx, unregister) - if err != nil { - return - } - defer stream.Close() - - if isNamedTunnel { - rpcClient := h.newRPCClientFunc(unregisterCtx, stream, h.observer.log) - defer rpcClient.Close() - - rpcClient.GracefulShutdown(unregisterCtx, h.gracePeriod) - } else { - rpcClient := NewTunnelServerClient(unregisterCtx, stream, h.observer.log) - defer rpcClient.Close() - - // gracePeriod is encoded in int64 using capnproto - _ = rpcClient.client.UnregisterTunnel(unregisterCtx, h.gracePeriod.Nanoseconds()) - } - - h.observer.log.Info().Uint8(LogFieldConnIndex, h.connIndex).Msg("Unregistered tunnel connection") -} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 437d3bd2..8277eeaa 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -23,7 +23,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/cloudflare/cloudflared/cfio" - "github.com/cloudflare/cloudflared/config" "github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/hello" @@ -34,7 +33,7 @@ import ( ) var ( - testTags = []tunnelpogs.Tag{tunnelpogs.Tag{Name: "Name", Value: "value"}} + testTags = []tunnelpogs.Tag{{Name: "Name", Value: "value"}} noWarpRouting = ingress.WarpRoutingConfig{} testWarpRouting = ingress.WarpRoutingConfig{ Enabled: true, diff --git a/supervisor/tunnel.go b/supervisor/tunnel.go index 636bb3ef..1a2b9775 100644 --- a/supervisor/tunnel.go +++ b/supervisor/tunnel.go @@ -69,7 +69,6 @@ type TunnelConfig struct { PQKexIdx int NamedTunnel *connection.NamedTunnelProperties - MuxerConfig *connection.MuxerConfig ProtocolSelector connection.ProtocolSelector EdgeTLSConfigs map[connection.Protocol]*tls.Config PacketConfig *ingress.GlobalRouterConfig @@ -91,7 +90,7 @@ func (c *TunnelConfig) registrationOptions(connectionID uint8, OriginLocalIP str OriginLocalIP: OriginLocalIP, IsAutoupdated: c.IsAutoupdated, RunFromTerminal: c.RunFromTerminal, - CompressionQuality: uint64(c.MuxerConfig.CompressionSetting), + CompressionQuality: 0, UUID: uuid.String(), Features: c.SupportedFeatures(), } @@ -106,7 +105,7 @@ func (c *TunnelConfig) connectionOptions(originLocalAddr string, numPreviousAtte Client: c.NamedTunnel.Client, OriginLocalIP: originIP, ReplaceExisting: c.ReplaceExisting, - CompressionQuality: uint8(c.MuxerConfig.CompressionSetting), + CompressionQuality: 0, NumPreviousAttempts: numPreviousAttempts, } } @@ -518,21 +517,7 @@ func (e *EdgeTunnelServer) serveConnection( } default: - edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, e.config.EdgeTLSConfigs[protocol], addr.TCP, e.edgeBindAddr) - if err != nil { - connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge") - return err, true - } - - if err := e.serveH2mux( - ctx, - connLog, - edgeConn, - connIndex, - connectedFuse, - ); err != nil { - return err, false - } + return fmt.Errorf("invalid protocol selected: %s", protocol), false } return } @@ -545,55 +530,6 @@ func (r unrecoverableError) Error() string { return r.err.Error() } -func (e *EdgeTunnelServer) serveH2mux( - ctx context.Context, - connLog *ConnAwareLogger, - edgeConn net.Conn, - connIndex uint8, - connectedFuse *connectedFuse, -) error { - if e.config.NeedPQ { - return unrecoverableError{errors.New("H2Mux transport does not support post-quantum")} - } - connLog.Logger().Debug().Msgf("Connecting via h2mux") - // Returns error from parsing the origin URL or handshake errors - handler, err, recoverable := connection.NewH2muxConnection( - e.orchestrator, - e.config.GracePeriod, - e.config.MuxerConfig, - edgeConn, - connIndex, - e.config.Observer, - e.gracefulShutdownC, - e.config.Log, - ) - if err != nil { - if !recoverable { - return unrecoverableError{err} - } - return err - } - - errGroup, serveCtx := errgroup.WithContext(ctx) - - errGroup.Go(func() error { - connOptions := e.config.connectionOptions(edgeConn.LocalAddr().String(), uint8(connectedFuse.backoff.Retries())) - return handler.ServeNamedTunnel(serveCtx, e.config.NamedTunnel, connOptions, connectedFuse) - }) - - errGroup.Go(func() error { - err := listenReconnect(serveCtx, e.reconnectCh, e.gracefulShutdownC) - if err != nil { - // forcefully break the connection (this is only used for testing) - // errgroup will return context canceled for the handler.ServeClassicTunnel - connLog.Logger().Debug().Msg("Forcefully breaking h2mux connection") - } - return err - }) - - return errGroup.Wait() -} - func (e *EdgeTunnelServer) serveHTTP2( ctx context.Context, connLog *ConnAwareLogger,