cloudflared-mirror/ingress/origin_proxy_test.go

331 lines
8.7 KiB
Go
Raw Permalink Normal View History

package ingress
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"github.com/cloudflare/cloudflared/h2mux"
"github.com/cloudflare/cloudflared/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestEstablishConnectionResponse ensures each implementation of StreamBasedOriginProxy returns
// the expected response
func assertEstablishConnectionResponse(t *testing.T,
originProxy StreamBasedOriginProxy,
req *http.Request,
expectHeader http.Header,
) {
_, resp, err := originProxy.EstablishConnection(req)
assert.NoError(t, err)
assert.Equal(t, switchingProtocolText, resp.Status)
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
assert.Equal(t, expectHeader, resp.Header)
}
func TestHTTPServiceEstablishConnection(t *testing.T) {
origin := echoWSOrigin(t)
defer origin.Close()
originURL, err := url.Parse(origin.URL)
require.NoError(t, err)
httpService := &httpService{
url: originURL,
hostHeader: origin.URL,
transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
req, err := http.NewRequest(http.MethodGet, origin.URL, nil)
require.NoError(t, err)
req.Header.Set("Sec-Websocket-Key", "dGhlIHNhbXBsZSBub25jZQ==")
req.Header.Set("Test-Cloudflared-Echo", t.Name())
expectHeader := http.Header{
"Connection": {"Upgrade"},
"Sec-Websocket-Accept": {"s3pPLMBiTxaQ9kYGzzhZRbK+xOo="},
"Upgrade": {"websocket"},
"Test-Cloudflared-Echo": {t.Name()},
}
assertEstablishConnectionResponse(t, httpService, req, expectHeader)
}
func TestHelloWorldEstablishConnection(t *testing.T) {
var wg sync.WaitGroup
shutdownC := make(chan struct{})
errC := make(chan error)
helloWorldSerivce := &helloWorld{}
helloWorldSerivce.start(&wg, testLogger, shutdownC, errC, OriginRequestConfig{})
// Scheme and Host of URL will be override by the Scheme and Host of the helloWorld service
req, err := http.NewRequest(http.MethodGet, "https://place-holder/ws", nil)
require.NoError(t, err)
expectHeader := http.Header{
"Connection": {"Upgrade"},
// Accept key when Sec-Websocket-Key is not specified
"Sec-Websocket-Accept": {"Kfh9QIsMVZcl6xEPYxPHzW8SZ8w="},
"Upgrade": {"websocket"},
}
assertEstablishConnectionResponse(t, helloWorldSerivce, req, expectHeader)
close(shutdownC)
}
func TestRawTCPServiceEstablishConnection(t *testing.T) {
originListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
listenerClosed := make(chan struct{})
tcpListenRoutine(originListener, listenerClosed)
rawTCPService := &rawTCPService{name: ServiceWarpRouting}
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", originListener.Addr()), nil)
require.NoError(t, err)
assertEstablishConnectionResponse(t, rawTCPService, req, nil)
originListener.Close()
<-listenerClosed
req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", originListener.Addr()), nil)
require.NoError(t, err)
// Origin not listening for new connection, should return an error
_, resp, err := rawTCPService.EstablishConnection(req)
require.Error(t, err)
require.Nil(t, resp)
}
func TestTCPOverWSServiceEstablishConnection(t *testing.T) {
originListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
listenerClosed := make(chan struct{})
tcpListenRoutine(originListener, listenerClosed)
originURL := &url.URL{
Scheme: "tcp",
Host: originListener.Addr().String(),
}
baseReq, err := http.NewRequest(http.MethodGet, "https://place-holder", nil)
require.NoError(t, err)
baseReq.Header.Set("Sec-Websocket-Key", "dGhlIHNhbXBsZSBub25jZQ==")
bastionReq := baseReq.Clone(context.Background())
bastionReq.Header.Set(h2mux.CFJumpDestinationHeader, originListener.Addr().String())
expectHeader := http.Header{
"Connection": {"Upgrade"},
"Sec-Websocket-Accept": {"s3pPLMBiTxaQ9kYGzzhZRbK+xOo="},
"Upgrade": {"websocket"},
}
tests := []struct {
service *tcpOverWSService
req *http.Request
expectErr bool
}{
{
service: newTCPOverWSService(originURL),
req: baseReq,
},
{
service: newBastionService(),
req: bastionReq,
},
{
service: newBastionService(),
req: baseReq,
expectErr: true,
},
}
for _, test := range tests {
if test.expectErr {
_, resp, err := test.service.EstablishConnection(test.req)
assert.Error(t, err)
assert.Nil(t, resp)
} else {
assertEstablishConnectionResponse(t, test.service, test.req, expectHeader)
}
}
originListener.Close()
<-listenerClosed
for _, service := range []*tcpOverWSService{newTCPOverWSService(originURL), newBastionService()} {
// Origin not listening for new connection, should return an error
_, resp, err := service.EstablishConnection(bastionReq)
assert.Error(t, err)
assert.Nil(t, resp)
}
}
func TestBastionDestination(t *testing.T) {
canonicalJumpDestHeader := http.CanonicalHeaderKey(h2mux.CFJumpDestinationHeader)
tests := []struct {
name string
header http.Header
expectedDest string
wantErr bool
}{
{
name: "hostname destination",
header: http.Header{
canonicalJumpDestHeader: []string{"localhost"},
},
expectedDest: "localhost",
},
{
name: "hostname destination with port",
header: http.Header{
canonicalJumpDestHeader: []string{"localhost:9000"},
},
expectedDest: "localhost:9000",
},
{
name: "hostname destination with scheme and port",
header: http.Header{
canonicalJumpDestHeader: []string{"ssh://localhost:9000"},
},
expectedDest: "localhost:9000",
},
{
name: "full hostname url",
header: http.Header{
canonicalJumpDestHeader: []string{"ssh://localhost:9000/metrics"},
},
expectedDest: "localhost:9000",
},
{
name: "hostname destination with port and path",
header: http.Header{
canonicalJumpDestHeader: []string{"localhost:9000/metrics"},
},
expectedDest: "localhost:9000",
},
{
name: "ip destination",
header: http.Header{
canonicalJumpDestHeader: []string{"127.0.0.1"},
},
expectedDest: "127.0.0.1",
},
{
name: "ip destination with port",
header: http.Header{
canonicalJumpDestHeader: []string{"127.0.0.1:9000"},
},
expectedDest: "127.0.0.1:9000",
},
{
name: "ip destination with port and path",
header: http.Header{
canonicalJumpDestHeader: []string{"127.0.0.1:9000/metrics"},
},
expectedDest: "127.0.0.1:9000",
},
{
name: "ip destination with schem and port",
header: http.Header{
canonicalJumpDestHeader: []string{"tcp://127.0.0.1:9000"},
},
expectedDest: "127.0.0.1:9000",
},
{
name: "full ip url",
header: http.Header{
canonicalJumpDestHeader: []string{"ssh://127.0.0.1:9000/metrics"},
},
expectedDest: "127.0.0.1:9000",
},
{
name: "no destination",
wantErr: true,
},
}
s := newBastionService()
for _, test := range tests {
r := &http.Request{
Header: test.header,
}
dest, err := s.bastionDest(r)
if test.wantErr {
assert.Error(t, err, "Test %s expects error", test.name)
} else {
assert.NoError(t, err, "Test %s expects no error, got error %v", test.name, err)
assert.Equal(t, test.expectedDest, dest, "Test %s expect dest %s, got %s", test.name, test.expectedDest, dest)
}
}
}
func TestHTTPServiceHostHeaderOverride(t *testing.T) {
cfg := OriginRequestConfig{
HTTPHostHeader: t.Name(),
}
handler := func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Host, t.Name())
if websocket.IsWebSocketUpgrade(r) {
respHeaders := websocket.NewResponseHeader(r)
for k, v := range respHeaders {
w.Header().Set(k, v[0])
}
w.WriteHeader(http.StatusSwitchingProtocols)
return
}
w.Write([]byte("ok"))
}
origin := httptest.NewServer(http.HandlerFunc(handler))
defer origin.Close()
originURL, err := url.Parse(origin.URL)
require.NoError(t, err)
httpService := &httpService{
url: originURL,
}
var wg sync.WaitGroup
shutdownC := make(chan struct{})
errC := make(chan error)
require.NoError(t, httpService.start(&wg, testLogger, shutdownC, errC, cfg))
req, err := http.NewRequest(http.MethodGet, originURL.String(), nil)
require.NoError(t, err)
resp, err := httpService.RoundTrip(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
req = req.Clone(context.Background())
_, resp, err = httpService.EstablishConnection(req)
require.NoError(t, err)
require.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
}
func tcpListenRoutine(listener net.Listener, closeChan chan struct{}) {
go func() {
for {
conn, err := listener.Accept()
if err != nil {
close(closeChan)
return
}
// Close immediately, this test is not about testing read/write on connection
conn.Close()
}
}()
}