302 lines
7.0 KiB
Go
302 lines
7.0 KiB
Go
package connection
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gobwas/ws/wsutil"
|
|
"github.com/rs/zerolog"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/cloudflare/cloudflared/h2mux"
|
|
)
|
|
|
|
var (
|
|
testMuxerConfig = &MuxerConfig{
|
|
HeartbeatInterval: time.Second * 5,
|
|
MaxHeartbeats: 5,
|
|
CompressionSetting: 0,
|
|
MetricsUpdateFreq: time.Second * 5,
|
|
}
|
|
)
|
|
|
|
func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
|
|
edgeConn, originConn := net.Pipe()
|
|
edgeMuxChan := make(chan *h2mux.Muxer)
|
|
go func() {
|
|
edgeMuxConfig := h2mux.MuxerConfig{
|
|
Log: &log,
|
|
Handler: h2mux.MuxedStreamFunc(func(stream *h2mux.MuxedStream) error {
|
|
// we only expect RPC traffic in client->edge direction, provide minimal support for mocking
|
|
require.True(t, stream.IsRPCStream())
|
|
return stream.WriteHeaders([]h2mux.Header{
|
|
{Name: ":status", Value: "200"},
|
|
})
|
|
}),
|
|
}
|
|
edgeMux, err := h2mux.Handshake(edgeConn, edgeConn, edgeMuxConfig, h2mux.ActiveStreams)
|
|
require.NoError(t, err)
|
|
edgeMuxChan <- edgeMux
|
|
}()
|
|
var connIndex = uint8(0)
|
|
testObserver := NewObserver(&log, &log)
|
|
h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil)
|
|
require.NoError(t, err)
|
|
return h2muxConn, <-edgeMuxChan
|
|
}
|
|
|
|
func TestServeStreamHTTP(t *testing.T) {
|
|
tests := []testRequest{
|
|
{
|
|
name: "ok",
|
|
endpoint: "/ok",
|
|
expectedStatus: http.StatusOK,
|
|
expectedBody: []byte(http.StatusText(http.StatusOK)),
|
|
},
|
|
{
|
|
name: "large_file",
|
|
endpoint: "/large_file",
|
|
expectedStatus: http.StatusOK,
|
|
expectedBody: testLargeResp,
|
|
},
|
|
{
|
|
name: "Bad request",
|
|
endpoint: "/400",
|
|
expectedStatus: http.StatusBadRequest,
|
|
expectedBody: []byte(http.StatusText(http.StatusBadRequest)),
|
|
},
|
|
{
|
|
name: "Internal server error",
|
|
endpoint: "/500",
|
|
expectedStatus: http.StatusInternalServerError,
|
|
expectedBody: []byte(http.StatusText(http.StatusInternalServerError)),
|
|
},
|
|
{
|
|
name: "Proxy error",
|
|
endpoint: "/error",
|
|
expectedStatus: http.StatusBadGateway,
|
|
expectedBody: nil,
|
|
isProxyError: true,
|
|
},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
h2muxConn, edgeMux := newH2MuxConnection(t)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
_ = edgeMux.Serve(ctx)
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
err := h2muxConn.serveMuxer(ctx)
|
|
require.Error(t, err)
|
|
}()
|
|
|
|
for _, test := range tests {
|
|
headers := []h2mux.Header{
|
|
{
|
|
Name: ":path",
|
|
Value: test.endpoint,
|
|
},
|
|
}
|
|
stream, err := edgeMux.OpenStream(ctx, headers, nil)
|
|
require.NoError(t, err)
|
|
require.True(t, hasHeader(stream, ":status", strconv.Itoa(test.expectedStatus)))
|
|
|
|
if test.isProxyError {
|
|
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderCfd))
|
|
} else {
|
|
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
|
|
body := make([]byte, len(test.expectedBody))
|
|
_, err = stream.Read(body)
|
|
require.NoError(t, err)
|
|
require.Equal(t, test.expectedBody, body)
|
|
}
|
|
}
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestServeStreamWS(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
h2muxConn, edgeMux := newH2MuxConnection(t)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
edgeMux.Serve(ctx)
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
err := h2muxConn.serveMuxer(ctx)
|
|
require.Error(t, err)
|
|
}()
|
|
|
|
headers := []h2mux.Header{
|
|
{
|
|
Name: ":path",
|
|
Value: "/ws/echo",
|
|
},
|
|
{
|
|
Name: "connection",
|
|
Value: "upgrade",
|
|
},
|
|
{
|
|
Name: "upgrade",
|
|
Value: "websocket",
|
|
},
|
|
}
|
|
|
|
readPipe, writePipe := io.Pipe()
|
|
stream, err := edgeMux.OpenStream(ctx, headers, readPipe)
|
|
require.NoError(t, err)
|
|
|
|
require.True(t, hasHeader(stream, ":status", strconv.Itoa(http.StatusSwitchingProtocols)))
|
|
assert.True(t, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
|
|
|
|
data := []byte("test websocket")
|
|
err = wsutil.WriteClientBinary(writePipe, data)
|
|
require.NoError(t, err)
|
|
|
|
respBody, err := wsutil.ReadServerBinary(stream)
|
|
require.NoError(t, err)
|
|
require.Equal(t, data, respBody, fmt.Sprintf("Expect %s, got %s", string(data), string(respBody)))
|
|
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestGracefulShutdownH2Mux(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
h2muxConn, edgeMux := newH2MuxConnection(t)
|
|
|
|
shutdownC := make(chan struct{})
|
|
unregisteredC := make(chan struct{})
|
|
h2muxConn.gracefulShutdownC = shutdownC
|
|
h2muxConn.newRPCClientFunc = func(_ context.Context, _ io.ReadWriteCloser, _ *zerolog.Logger) NamedTunnelRPCClient {
|
|
return &mockNamedTunnelRPCClient{
|
|
registered: nil,
|
|
unregistered: unregisteredC,
|
|
}
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(3)
|
|
go func() {
|
|
defer wg.Done()
|
|
_ = edgeMux.Serve(ctx)
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
_ = h2muxConn.serveMuxer(ctx)
|
|
}()
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
h2muxConn.controlLoop(ctx, &mockConnectedFuse{}, true)
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
close(shutdownC)
|
|
|
|
select {
|
|
case <-unregisteredC:
|
|
break // ok
|
|
case <-time.Tick(time.Second):
|
|
assert.Fail(t, "timed out waiting for control loop to unregister")
|
|
}
|
|
|
|
cancel()
|
|
wg.Wait()
|
|
|
|
assert.True(t, h2muxConn.stoppedGracefully)
|
|
assert.Nil(t, h2muxConn.gracefulShutdownC)
|
|
}
|
|
|
|
func hasHeader(stream *h2mux.MuxedStream, name, val string) bool {
|
|
for _, header := range stream.Headers {
|
|
if header.Name == name && header.Value == val {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func benchmarkServeStreamHTTPSimple(b *testing.B, test testRequest) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
h2muxConn, edgeMux := newH2MuxConnection(b)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
edgeMux.Serve(ctx)
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
err := h2muxConn.serveMuxer(ctx)
|
|
require.Error(b, err)
|
|
}()
|
|
|
|
headers := []h2mux.Header{
|
|
{
|
|
Name: ":path",
|
|
Value: test.endpoint,
|
|
},
|
|
}
|
|
|
|
body := make([]byte, len(test.expectedBody))
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
b.StartTimer()
|
|
stream, openstreamErr := edgeMux.OpenStream(ctx, headers, nil)
|
|
_, readBodyErr := stream.Read(body)
|
|
b.StopTimer()
|
|
|
|
require.NoError(b, openstreamErr)
|
|
assert.True(b, hasHeader(stream, ResponseMetaHeader, responseMetaHeaderOrigin))
|
|
require.True(b, hasHeader(stream, ":status", strconv.Itoa(http.StatusOK)))
|
|
require.NoError(b, readBodyErr)
|
|
require.Equal(b, test.expectedBody, body)
|
|
}
|
|
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
|
|
func BenchmarkServeStreamHTTPSimple(b *testing.B) {
|
|
test := testRequest{
|
|
name: "ok",
|
|
endpoint: "/ok",
|
|
expectedStatus: http.StatusOK,
|
|
expectedBody: []byte(http.StatusText(http.StatusOK)),
|
|
}
|
|
|
|
benchmarkServeStreamHTTPSimple(b, test)
|
|
}
|
|
|
|
func BenchmarkServeStreamHTTPLargeFile(b *testing.B) {
|
|
test := testRequest{
|
|
name: "large_file",
|
|
endpoint: "/large_file",
|
|
expectedStatus: http.StatusOK,
|
|
expectedBody: testLargeResp,
|
|
}
|
|
|
|
benchmarkServeStreamHTTPSimple(b, test)
|
|
}
|