From e49a7a438912d39b917980ba293100cc28726567 Mon Sep 17 00:00:00 2001 From: Sudarsan Reddy Date: Tue, 3 Aug 2021 08:09:56 +0100 Subject: [PATCH] TUN-4597: Added HTTPProxy for QUIC --- connection/quic.go | 87 +++++++++++++++++++--- connection/quic_test.go | 161 +++++++++++++++++++++++++++++++++------- quic/pogs.go | 9 +++ 3 files changed, 221 insertions(+), 36 deletions(-) diff --git a/connection/quic.go b/connection/quic.go index e9a34746..494dcaa0 100644 --- a/connection/quic.go +++ b/connection/quic.go @@ -3,7 +3,12 @@ package connection import ( "context" "crypto/tls" + "fmt" + "io" "net" + "net/http" + "strconv" + "strings" "github.com/lucas-clemente/quic-go" "github.com/pkg/errors" @@ -12,10 +17,20 @@ import ( quicpogs "github.com/cloudflare/cloudflared/quic" ) +const ( + // HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP. + HTTPHeaderKey = "HttpHeader" + // HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP. + HTTPMethodKey = "HttpMethod" + // HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP. + HTTPHostKey = "HttpHost" +) + // QUICConnection represents the type that facilitates Proxying via QUIC streams. type QUICConnection struct { - session quic.Session - logger zerolog.Logger + session quic.Session + logger zerolog.Logger + httpProxy OriginProxy } // NewQUICConnection returns a new instance of QUICConnection. @@ -24,6 +39,7 @@ func NewQUICConnection( quicConfig *quic.Config, edgeAddr net.Addr, tlsConfig *tls.Config, + httpProxy OriginProxy, logger zerolog.Logger, ) (*QUICConnection, error) { session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig) @@ -34,8 +50,9 @@ func NewQUICConnection( //TODO: RegisterConnectionRPC here. return &QUICConnection{ - session: session, - logger: logger, + session: session, + httpProxy: httpProxy, + logger: logger, }, nil } @@ -58,7 +75,7 @@ func (q *QUICConnection) Serve(ctx context.Context) error { } } -// Close calls this to close the QuicConnection stream. +// Close closes the session with no errors specified. func (q *QUICConnection) Close() { q.session.CloseWithError(0, "") } @@ -71,14 +88,66 @@ func (q *QUICConnection) handleStream(stream quic.Stream) error { switch connectRequest.Type { case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket: - // Temporary dummy code for the unit test. - if err := quicpogs.WriteConnectResponseData(stream, nil, quicpogs.Metadata{Key: "HTTPStatus", Val: "200"}); err != nil { + req, err := buildHTTPRequest(connectRequest, stream) + if err != nil { return err } - stream.Write([]byte("OK")) + w := newHTTPResponseAdapter(stream) + return q.httpProxy.ProxyHTTP(w, req, connectRequest.Type == quicpogs.ConnectionTypeWebsocket) case quicpogs.ConnectionTypeTCP: - + return errors.New("not implemented") } return nil } + +// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC. +type httpResponseAdapter struct { + io.Writer +} + +func newHTTPResponseAdapter(w io.Writer) httpResponseAdapter { + return httpResponseAdapter{w} +} + +func (hrw httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error { + metadata := make([]quicpogs.Metadata, 0) + metadata = append(metadata, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)}) + for k, vv := range header { + for _, v := range vv { + httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k) + metadata = append(metadata, quicpogs.Metadata{Key: httpHeaderKey, Val: v}) + } + } + return quicpogs.WriteConnectResponseData(hrw, nil, metadata...) +} + +func (hrw httpResponseAdapter) WriteErrorResponse(err error) { + quicpogs.WriteConnectResponseData(hrw, err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)}) +} + +func buildHTTPRequest(connectRequest *quicpogs.ConnectRequest, body io.Reader) (*http.Request, error) { + metadata := connectRequest.MetadataMap() + dest := connectRequest.Dest + method := metadata[HTTPMethodKey] + host := metadata[HTTPHostKey] + + req, err := http.NewRequest(method, dest, body) + if err != nil { + return nil, err + } + + req.Host = host + for _, metadata := range connectRequest.Metadata { + if strings.Contains(metadata.Key, HTTPHeaderKey) { + // metadata.Key is off the format httpHeaderKey: + httpHeaderKey := strings.Split(metadata.Key, ":") + if len(httpHeaderKey) != 2 { + return nil, fmt.Errorf("Header Key: %s malformed", metadata.Key) + } + req.Header.Add(httpHeaderKey[1], metadata.Val) + } + } + stripWebsocketUpgradeHeader(req) + return req, err +} diff --git a/connection/quic_test.go b/connection/quic_test.go index 0f1b7964..f07ae3f3 100644 --- a/connection/quic_test.go +++ b/connection/quic_test.go @@ -1,20 +1,25 @@ package connection import ( + "bytes" "context" "crypto/rand" "crypto/rsa" "crypto/tls" "crypto/x509" "encoding/pem" - "io/ioutil" + "fmt" + "io" "math/big" "net" + "net/http" "os" "sync" "testing" + "github.com/gobwas/ws/wsutil" "github.com/lucas-clemente/quic-go" + "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,65 +34,127 @@ func TestQUICServer(t *testing.T) { KeepAlive: true, } + // Setup test. log := zerolog.New(os.Stdout) + // Start a UDP Listener for QUIC. udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") - require.NoError(t, err) udpListener, err := net.ListenUDP(udpAddr.Network(), udpAddr) require.NoError(t, err) defer udpListener.Close() + + // Create a simple tls config. tlsConfig := generateTLSConfig() - tlsConfClient := &tls.Config{ + + // Create a client config + tlsClientConfig := &tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"argotunnel"}, } + + // Start a mock httpProxy + originProxy := &mockOriginProxyWithRequest{} + + // This is simply a sample websocket frame message. + wsBuf := &bytes.Buffer{} + wsutil.WriteClientText(wsBuf, []byte("Hello")) + var tests = []struct { - desc string - dest string - connectionType quicpogs.ConnectionType - metadata []quicpogs.Metadata - message []byte - expectedMessage []byte + desc string + dest string + connectionType quicpogs.ConnectionType + metadata []quicpogs.Metadata + message []byte + expectedResponse []byte }{ { - desc: "", - dest: "somehost.com", + desc: "test http proxy", + dest: "/ok", + connectionType: quicpogs.ConnectionTypeHTTP, + metadata: []quicpogs.Metadata{ + quicpogs.Metadata{ + Key: "HttpHeader:Cf-Ray", + Val: "123123123", + }, + quicpogs.Metadata{ + Key: "HttpHost", + Val: "cf.host", + }, + quicpogs.Metadata{ + Key: "HttpMethod", + Val: "GET", + }, + }, + expectedResponse: []byte("OK"), + }, + { + desc: "test http body request streaming", + dest: "/echo_body", + connectionType: quicpogs.ConnectionTypeHTTP, + metadata: []quicpogs.Metadata{ + quicpogs.Metadata{ + Key: "HttpHeader:Cf-Ray", + Val: "123123123", + }, + quicpogs.Metadata{ + Key: "HttpHost", + Val: "cf.host", + }, + quicpogs.Metadata{ + Key: "HttpMethod", + Val: "POST", + }, + }, + message: []byte("This is the message body"), + expectedResponse: []byte("This is the message body"), + }, + { + desc: "test ws proxy", + dest: "/ok", connectionType: quicpogs.ConnectionTypeWebsocket, metadata: []quicpogs.Metadata{ quicpogs.Metadata{ - Key: "key", - Val: "value", + Key: "HttpHeader:Cf-Cloudflared-Proxy-Connection-Upgrade", + Val: "Websocket", + }, + quicpogs.Metadata{ + Key: "HttpHeader:Another-Header", + Val: "Misc", + }, + quicpogs.Metadata{ + Key: "HttpHost", + Val: "cf.host", + }, + quicpogs.Metadata{ + Key: "HttpMethod", + Val: "get", }, }, - expectedMessage: []byte("OK"), + message: wsBuf.Bytes(), + expectedResponse: []byte{0x81, 0x5, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, }, } for _, test := range tests { t.Run(test.desc, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup go func() { wg.Add(1) + defer wg.Done() quicServer( t, udpListener, tlsConfig, quicConfig, - test.dest, test.connectionType, test.metadata, test.message, test.expectedMessage, + test.dest, test.connectionType, test.metadata, test.message, test.expectedResponse, ) - wg.Done() }() - qC, err := NewQUICConnection(context.Background(), quicConfig, udpListener.LocalAddr(), tlsConfClient, log) + qC, err := NewQUICConnection(ctx, quicConfig, udpListener.LocalAddr(), tlsClientConfig, originProxy, log) require.NoError(t, err) + go qC.Serve(ctx) - go func() { - wg.Wait() - cancel() - }() - - qC.Serve(ctx) - + wg.Wait() + cancel() }) } @@ -125,11 +192,12 @@ func quicServer( if message != nil { // ALPN successful. Write data. - _, err = stream.Write([]byte(message)) + _, err := stream.Write([]byte(message)) require.NoError(t, err) } - response, err := ioutil.ReadAll(stream) + response := make([]byte, len(expectedResponse)) + stream.Read(response) require.NoError(t, err) // For now it is an echo server. Verify if the same data is returned. @@ -159,3 +227,42 @@ func generateTLSConfig() *tls.Config { NextProtos: []string{"argotunnel"}, } } + +type mockOriginProxyWithRequest struct{} + +func (moc *mockOriginProxyWithRequest) ProxyHTTP(w ResponseWriter, r *http.Request, isWebsocket bool) error { + // These are a series of crude tests to ensure the headers and http related data is transferred from + // metadata. + if r.Method == "" { + return errors.New("method not sent") + } + if r.Host == "" { + return errors.New("host not sent") + } + if len(r.Header) == 0 { + return errors.New("headers not set") + } + + if isWebsocket { + return wsEndpoint(w, r) + } + switch r.URL.Path { + case "/ok": + originRespEndpoint(w, http.StatusOK, []byte(http.StatusText(http.StatusOK))) + case "/echo_body": + resp := &http.Response{ + StatusCode: http.StatusOK, + } + _ = w.WriteRespHeaders(resp.StatusCode, resp.Header) + io.Copy(w, r.Body) + case "/error": + return fmt.Errorf("Failed to proxy to origin") + default: + originRespEndpoint(w, http.StatusNotFound, []byte("page not found")) + } + return nil +} + +func (moc *mockOriginProxyWithRequest) ProxyTCP(ctx context.Context, rwa ReadWriteAcker, tcpRequest *TCPRequest) error { + return nil +} diff --git a/quic/pogs.go b/quic/pogs.go index 4a11402e..e82fbe47 100644 --- a/quic/pogs.go +++ b/quic/pogs.go @@ -29,6 +29,15 @@ type Metadata struct { Val string `capnp:"val"` } +// MetadataMap returns a map format of []Metadata. +func (r *ConnectRequest) MetadataMap() map[string]string { + metadataMap := make(map[string]string) + for _, metadata := range r.Metadata { + metadataMap[metadata.Key] = metadata.Val + } + return metadataMap +} + func (r *ConnectRequest) fromPogs(msg *capnp.Message) error { metadata, err := schema.ReadRootConnectRequest(msg) if err != nil {