TUN-5696: HTTP/2 Configuration Update
This commit is contained in:
parent
7220c2c214
commit
5c6207debc
|
@ -80,6 +80,7 @@ const (
|
||||||
TypeTCP
|
TypeTCP
|
||||||
TypeControlStream
|
TypeControlStream
|
||||||
TypeHTTP
|
TypeHTTP
|
||||||
|
TypeConfiguration
|
||||||
)
|
)
|
||||||
|
|
||||||
// ShouldFlush returns whether this kind of connection should actively flush data
|
// ShouldFlush returns whether this kind of connection should actively flush data
|
||||||
|
|
|
@ -2,6 +2,7 @@ package connection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
gojson "encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
@ -23,6 +24,7 @@ const (
|
||||||
InternalTCPProxySrcHeader = "Cf-Cloudflared-Proxy-Src"
|
InternalTCPProxySrcHeader = "Cf-Cloudflared-Proxy-Src"
|
||||||
WebsocketUpgrade = "websocket"
|
WebsocketUpgrade = "websocket"
|
||||||
ControlStreamUpgrade = "control-stream"
|
ControlStreamUpgrade = "control-stream"
|
||||||
|
ConfigurationUpdate = "update-configuration"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
|
var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
|
||||||
|
@ -120,6 +122,13 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
respWriter.WriteErrorResponse()
|
respWriter.WriteErrorResponse()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case TypeConfiguration:
|
||||||
|
fmt.Println("TYPE CONFIGURATION?")
|
||||||
|
if err := c.handleConfigurationUpdate(respWriter, r); err != nil {
|
||||||
|
c.log.Error().Err(err)
|
||||||
|
respWriter.WriteErrorResponse()
|
||||||
|
}
|
||||||
|
|
||||||
case TypeWebsocket, TypeHTTP:
|
case TypeWebsocket, TypeHTTP:
|
||||||
stripWebsocketUpgradeHeader(r)
|
stripWebsocketUpgradeHeader(r)
|
||||||
if err := originProxy.ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil {
|
if err := originProxy.ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil {
|
||||||
|
@ -152,6 +161,26 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConfigurationUpdateBody is the representation followed by the edge to send updates to cloudflared.
|
||||||
|
type ConfigurationUpdateBody struct {
|
||||||
|
Version int32 `json:"version"`
|
||||||
|
Config gojson.RawMessage `json:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *HTTP2Connection) handleConfigurationUpdate(respWriter *http2RespWriter, r *http.Request) error {
|
||||||
|
var configBody ConfigurationUpdateBody
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&configBody); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp := c.orchestrator.UpdateConfig(configBody.Version, configBody.Config)
|
||||||
|
bdy, err := json.Marshal(resp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = respWriter.Write(bdy)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *HTTP2Connection) close() {
|
func (c *HTTP2Connection) close() {
|
||||||
// Wait for all serve HTTP handlers to return
|
// Wait for all serve HTTP handlers to return
|
||||||
c.activeRequestsWG.Wait()
|
c.activeRequestsWG.Wait()
|
||||||
|
@ -258,6 +287,8 @@ func (rp *http2RespWriter) Close() error {
|
||||||
|
|
||||||
func determineHTTP2Type(r *http.Request) Type {
|
func determineHTTP2Type(r *http.Request) Type {
|
||||||
switch {
|
switch {
|
||||||
|
case isConfigurationUpdate(r):
|
||||||
|
return TypeConfiguration
|
||||||
case isWebsocketUpgrade(r):
|
case isWebsocketUpgrade(r):
|
||||||
return TypeWebsocket
|
return TypeWebsocket
|
||||||
case IsTCPStream(r):
|
case IsTCPStream(r):
|
||||||
|
@ -291,6 +322,10 @@ func isWebsocketUpgrade(r *http.Request) bool {
|
||||||
return r.Header.Get(InternalUpgradeHeader) == WebsocketUpgrade
|
return r.Header.Get(InternalUpgradeHeader) == WebsocketUpgrade
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isConfigurationUpdate(r *http.Request) bool {
|
||||||
|
return r.Header.Get(InternalUpgradeHeader) == ConfigurationUpdate
|
||||||
|
}
|
||||||
|
|
||||||
// IsTCPStream discerns if the connection request needs a tcp stream proxy.
|
// IsTCPStream discerns if the connection request needs a tcp stream proxy.
|
||||||
func IsTCPStream(r *http.Request) bool {
|
func IsTCPStream(r *http.Request) bool {
|
||||||
return r.Header.Get(InternalTCPProxySrcHeader) != ""
|
return r.Header.Get(InternalTCPProxySrcHeader) != ""
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package connection
|
package connection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -53,6 +54,41 @@ func newTestHTTP2Connection() (*HTTP2Connection, net.Conn) {
|
||||||
), edgeConn
|
), edgeConn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHTTP2ConfigurationSet(t *testing.T) {
|
||||||
|
http2Conn, edgeConn := newTestHTTP2Connection()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
http2Conn.Serve(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
edgeHTTP2Conn, err := testTransport.NewClientConn(edgeConn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
endpoint := fmt.Sprintf("http://localhost:8080/ok")
|
||||||
|
reqBody := []byte(`{
|
||||||
|
"version": 2,
|
||||||
|
"config": {"warp-routing": {"enabled": true}, "originRequest" : {"connectTimeout": 10}, "ingress" : [ {"hostname": "test", "service": "https://localhost:8000" } , {"service": "http_status:404"} ]}}
|
||||||
|
`)
|
||||||
|
reader := bytes.NewReader(reqBody)
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set(InternalUpgradeHeader, ConfigurationUpdate)
|
||||||
|
|
||||||
|
resp, err := edgeHTTP2Conn.RoundTrip(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||||
|
bdy, err := ioutil.ReadAll(resp.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, `{"lastAppliedVersion":2,"err":null}`, string(bdy))
|
||||||
|
cancel()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestServeHTTP(t *testing.T) {
|
func TestServeHTTP(t *testing.T) {
|
||||||
tests := []testRequest{
|
tests := []testRequest{
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue