TUN-4597: Added HTTPProxy for QUIC

This commit is contained in:
Sudarsan Reddy 2021-08-03 08:09:56 +01:00
parent 5749425671
commit e49a7a4389
3 changed files with 221 additions and 36 deletions

View File

@ -3,7 +3,12 @@ package connection
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt"
"io"
"net" "net"
"net/http"
"strconv"
"strings"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -12,10 +17,20 @@ import (
quicpogs "github.com/cloudflare/cloudflared/quic" quicpogs "github.com/cloudflare/cloudflared/quic"
) )
const (
// HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP.
HTTPHeaderKey = "HttpHeader"
// HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP.
HTTPMethodKey = "HttpMethod"
// HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP.
HTTPHostKey = "HttpHost"
)
// QUICConnection represents the type that facilitates Proxying via QUIC streams. // QUICConnection represents the type that facilitates Proxying via QUIC streams.
type QUICConnection struct { type QUICConnection struct {
session quic.Session session quic.Session
logger zerolog.Logger logger zerolog.Logger
httpProxy OriginProxy
} }
// NewQUICConnection returns a new instance of QUICConnection. // NewQUICConnection returns a new instance of QUICConnection.
@ -24,6 +39,7 @@ func NewQUICConnection(
quicConfig *quic.Config, quicConfig *quic.Config,
edgeAddr net.Addr, edgeAddr net.Addr,
tlsConfig *tls.Config, tlsConfig *tls.Config,
httpProxy OriginProxy,
logger zerolog.Logger, logger zerolog.Logger,
) (*QUICConnection, error) { ) (*QUICConnection, error) {
session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig) session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig)
@ -35,6 +51,7 @@ func NewQUICConnection(
return &QUICConnection{ return &QUICConnection{
session: session, session: session,
httpProxy: httpProxy,
logger: logger, logger: logger,
}, nil }, nil
} }
@ -58,7 +75,7 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
} }
} }
// Close calls this to close the QuicConnection stream. // Close closes the session with no errors specified.
func (q *QUICConnection) Close() { func (q *QUICConnection) Close() {
q.session.CloseWithError(0, "") q.session.CloseWithError(0, "")
} }
@ -71,14 +88,66 @@ func (q *QUICConnection) handleStream(stream quic.Stream) error {
switch connectRequest.Type { switch connectRequest.Type {
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket: case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
// Temporary dummy code for the unit test. req, err := buildHTTPRequest(connectRequest, stream)
if err := quicpogs.WriteConnectResponseData(stream, nil, quicpogs.Metadata{Key: "HTTPStatus", Val: "200"}); err != nil { if err != nil {
return err return err
} }
stream.Write([]byte("OK")) w := newHTTPResponseAdapter(stream)
return q.httpProxy.ProxyHTTP(w, req, connectRequest.Type == quicpogs.ConnectionTypeWebsocket)
case quicpogs.ConnectionTypeTCP: case quicpogs.ConnectionTypeTCP:
return errors.New("not implemented")
} }
return nil return nil
} }
// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
type httpResponseAdapter struct {
io.Writer
}
func newHTTPResponseAdapter(w io.Writer) httpResponseAdapter {
return httpResponseAdapter{w}
}
func (hrw httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
metadata := make([]quicpogs.Metadata, 0)
metadata = append(metadata, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
for k, vv := range header {
for _, v := range vv {
httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k)
metadata = append(metadata, quicpogs.Metadata{Key: httpHeaderKey, Val: v})
}
}
return quicpogs.WriteConnectResponseData(hrw, nil, metadata...)
}
func (hrw httpResponseAdapter) WriteErrorResponse(err error) {
quicpogs.WriteConnectResponseData(hrw, err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
}
func buildHTTPRequest(connectRequest *quicpogs.ConnectRequest, body io.Reader) (*http.Request, error) {
metadata := connectRequest.MetadataMap()
dest := connectRequest.Dest
method := metadata[HTTPMethodKey]
host := metadata[HTTPHostKey]
req, err := http.NewRequest(method, dest, body)
if err != nil {
return nil, err
}
req.Host = host
for _, metadata := range connectRequest.Metadata {
if strings.Contains(metadata.Key, HTTPHeaderKey) {
// metadata.Key is off the format httpHeaderKey:<HTTPHeader>
httpHeaderKey := strings.Split(metadata.Key, ":")
if len(httpHeaderKey) != 2 {
return nil, fmt.Errorf("Header Key: %s malformed", metadata.Key)
}
req.Header.Add(httpHeaderKey[1], metadata.Val)
}
}
stripWebsocketUpgradeHeader(req)
return req, err
}

View File

@ -1,20 +1,25 @@
package connection package connection
import ( import (
"bytes"
"context" "context"
"crypto/rand" "crypto/rand"
"crypto/rsa" "crypto/rsa"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
"io/ioutil" "fmt"
"io"
"math/big" "math/big"
"net" "net"
"net/http"
"os" "os"
"sync" "sync"
"testing" "testing"
"github.com/gobwas/ws/wsutil"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"github.com/pkg/errors"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -29,65 +34,127 @@ func TestQUICServer(t *testing.T) {
KeepAlive: true, KeepAlive: true,
} }
// Setup test.
log := zerolog.New(os.Stdout) log := zerolog.New(os.Stdout)
// Start a UDP Listener for QUIC.
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
udpListener, err := net.ListenUDP(udpAddr.Network(), udpAddr) udpListener, err := net.ListenUDP(udpAddr.Network(), udpAddr)
require.NoError(t, err) require.NoError(t, err)
defer udpListener.Close() defer udpListener.Close()
// Create a simple tls config.
tlsConfig := generateTLSConfig() tlsConfig := generateTLSConfig()
tlsConfClient := &tls.Config{
// Create a client config
tlsClientConfig := &tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
NextProtos: []string{"argotunnel"}, NextProtos: []string{"argotunnel"},
} }
// Start a mock httpProxy
originProxy := &mockOriginProxyWithRequest{}
// This is simply a sample websocket frame message.
wsBuf := &bytes.Buffer{}
wsutil.WriteClientText(wsBuf, []byte("Hello"))
var tests = []struct { var tests = []struct {
desc string desc string
dest string dest string
connectionType quicpogs.ConnectionType connectionType quicpogs.ConnectionType
metadata []quicpogs.Metadata metadata []quicpogs.Metadata
message []byte message []byte
expectedMessage []byte expectedResponse []byte
}{ }{
{ {
desc: "", desc: "test http proxy",
dest: "somehost.com", dest: "/ok",
connectionType: quicpogs.ConnectionTypeHTTP,
metadata: []quicpogs.Metadata{
quicpogs.Metadata{
Key: "HttpHeader:Cf-Ray",
Val: "123123123",
},
quicpogs.Metadata{
Key: "HttpHost",
Val: "cf.host",
},
quicpogs.Metadata{
Key: "HttpMethod",
Val: "GET",
},
},
expectedResponse: []byte("OK"),
},
{
desc: "test http body request streaming",
dest: "/echo_body",
connectionType: quicpogs.ConnectionTypeHTTP,
metadata: []quicpogs.Metadata{
quicpogs.Metadata{
Key: "HttpHeader:Cf-Ray",
Val: "123123123",
},
quicpogs.Metadata{
Key: "HttpHost",
Val: "cf.host",
},
quicpogs.Metadata{
Key: "HttpMethod",
Val: "POST",
},
},
message: []byte("This is the message body"),
expectedResponse: []byte("This is the message body"),
},
{
desc: "test ws proxy",
dest: "/ok",
connectionType: quicpogs.ConnectionTypeWebsocket, connectionType: quicpogs.ConnectionTypeWebsocket,
metadata: []quicpogs.Metadata{ metadata: []quicpogs.Metadata{
quicpogs.Metadata{ quicpogs.Metadata{
Key: "key", Key: "HttpHeader:Cf-Cloudflared-Proxy-Connection-Upgrade",
Val: "value", Val: "Websocket",
},
quicpogs.Metadata{
Key: "HttpHeader:Another-Header",
Val: "Misc",
},
quicpogs.Metadata{
Key: "HttpHost",
Val: "cf.host",
},
quicpogs.Metadata{
Key: "HttpMethod",
Val: "get",
}, },
}, },
expectedMessage: []byte("OK"), message: wsBuf.Bytes(),
expectedResponse: []byte{0x81, 0x5, 0x48, 0x65, 0x6c, 0x6c, 0x6f},
}, },
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.desc, func(t *testing.T) { t.Run(test.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup var wg sync.WaitGroup
go func() { go func() {
wg.Add(1) wg.Add(1)
defer wg.Done()
quicServer( quicServer(
t, udpListener, tlsConfig, quicConfig, t, udpListener, tlsConfig, quicConfig,
test.dest, test.connectionType, test.metadata, test.message, test.expectedMessage, test.dest, test.connectionType, test.metadata, test.message, test.expectedResponse,
) )
wg.Done()
}() }()
qC, err := NewQUICConnection(context.Background(), quicConfig, udpListener.LocalAddr(), tlsConfClient, log) qC, err := NewQUICConnection(ctx, quicConfig, udpListener.LocalAddr(), tlsClientConfig, originProxy, log)
require.NoError(t, err) require.NoError(t, err)
go qC.Serve(ctx)
go func() {
wg.Wait() wg.Wait()
cancel() cancel()
}()
qC.Serve(ctx)
}) })
} }
@ -125,11 +192,12 @@ func quicServer(
if message != nil { if message != nil {
// ALPN successful. Write data. // ALPN successful. Write data.
_, err = stream.Write([]byte(message)) _, err := stream.Write([]byte(message))
require.NoError(t, err) require.NoError(t, err)
} }
response, err := ioutil.ReadAll(stream) response := make([]byte, len(expectedResponse))
stream.Read(response)
require.NoError(t, err) require.NoError(t, err)
// For now it is an echo server. Verify if the same data is returned. // For now it is an echo server. Verify if the same data is returned.
@ -159,3 +227,42 @@ func generateTLSConfig() *tls.Config {
NextProtos: []string{"argotunnel"}, NextProtos: []string{"argotunnel"},
} }
} }
type mockOriginProxyWithRequest struct{}
func (moc *mockOriginProxyWithRequest) ProxyHTTP(w ResponseWriter, r *http.Request, isWebsocket bool) error {
// These are a series of crude tests to ensure the headers and http related data is transferred from
// metadata.
if r.Method == "" {
return errors.New("method not sent")
}
if r.Host == "" {
return errors.New("host not sent")
}
if len(r.Header) == 0 {
return errors.New("headers not set")
}
if isWebsocket {
return wsEndpoint(w, r)
}
switch r.URL.Path {
case "/ok":
originRespEndpoint(w, http.StatusOK, []byte(http.StatusText(http.StatusOK)))
case "/echo_body":
resp := &http.Response{
StatusCode: http.StatusOK,
}
_ = w.WriteRespHeaders(resp.StatusCode, resp.Header)
io.Copy(w, r.Body)
case "/error":
return fmt.Errorf("Failed to proxy to origin")
default:
originRespEndpoint(w, http.StatusNotFound, []byte("page not found"))
}
return nil
}
func (moc *mockOriginProxyWithRequest) ProxyTCP(ctx context.Context, rwa ReadWriteAcker, tcpRequest *TCPRequest) error {
return nil
}

View File

@ -29,6 +29,15 @@ type Metadata struct {
Val string `capnp:"val"` Val string `capnp:"val"`
} }
// MetadataMap returns a map format of []Metadata.
func (r *ConnectRequest) MetadataMap() map[string]string {
metadataMap := make(map[string]string)
for _, metadata := range r.Metadata {
metadataMap[metadata.Key] = metadata.Val
}
return metadataMap
}
func (r *ConnectRequest) fromPogs(msg *capnp.Message) error { func (r *ConnectRequest) fromPogs(msg *capnp.Message) error {
metadata, err := schema.ReadRootConnectRequest(msg) metadata, err := schema.ReadRootConnectRequest(msg)
if err != nil { if err != nil {