From 5c6207debc89ada56a63dfa4e73bf875a91e132d Mon Sep 17 00:00:00 2001 From: Sudarsan Reddy Date: Fri, 4 Mar 2022 11:35:57 +0000 Subject: [PATCH] TUN-5696: HTTP/2 Configuration Update --- connection/connection.go | 1 + connection/http2.go | 35 +++++++++++++++++++++++++++++++++++ connection/http2_test.go | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/connection/connection.go b/connection/connection.go index 525c1a6e..ad14a85c 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -80,6 +80,7 @@ const ( TypeTCP TypeControlStream TypeHTTP + TypeConfiguration ) // ShouldFlush returns whether this kind of connection should actively flush data diff --git a/connection/http2.go b/connection/http2.go index b015117c..55e73ad3 100644 --- a/connection/http2.go +++ b/connection/http2.go @@ -2,6 +2,7 @@ package connection import ( "context" + gojson "encoding/json" "fmt" "io" "net" @@ -23,6 +24,7 @@ const ( InternalTCPProxySrcHeader = "Cf-Cloudflared-Proxy-Src" WebsocketUpgrade = "websocket" ControlStreamUpgrade = "control-stream" + ConfigurationUpdate = "update-configuration" ) var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed") @@ -120,6 +122,13 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) { respWriter.WriteErrorResponse() } + case TypeConfiguration: + fmt.Println("TYPE CONFIGURATION?") + if err := c.handleConfigurationUpdate(respWriter, r); err != nil { + c.log.Error().Err(err) + respWriter.WriteErrorResponse() + } + case TypeWebsocket, TypeHTTP: stripWebsocketUpgradeHeader(r) if err := originProxy.ProxyHTTP(respWriter, r, connType == TypeWebsocket); err != nil { @@ -152,6 +161,26 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +// ConfigurationUpdateBody is the representation followed by the edge to send updates to cloudflared. +type ConfigurationUpdateBody struct { + Version int32 `json:"version"` + Config gojson.RawMessage `json:"config"` +} + +func (c *HTTP2Connection) handleConfigurationUpdate(respWriter *http2RespWriter, r *http.Request) error { + var configBody ConfigurationUpdateBody + if err := json.NewDecoder(r.Body).Decode(&configBody); err != nil { + return err + } + resp := c.orchestrator.UpdateConfig(configBody.Version, configBody.Config) + bdy, err := json.Marshal(resp) + if err != nil { + return err + } + _, err = respWriter.Write(bdy) + return err +} + func (c *HTTP2Connection) close() { // Wait for all serve HTTP handlers to return c.activeRequestsWG.Wait() @@ -258,6 +287,8 @@ func (rp *http2RespWriter) Close() error { func determineHTTP2Type(r *http.Request) Type { switch { + case isConfigurationUpdate(r): + return TypeConfiguration case isWebsocketUpgrade(r): return TypeWebsocket case IsTCPStream(r): @@ -291,6 +322,10 @@ func isWebsocketUpgrade(r *http.Request) bool { return r.Header.Get(InternalUpgradeHeader) == WebsocketUpgrade } +func isConfigurationUpdate(r *http.Request) bool { + return r.Header.Get(InternalUpgradeHeader) == ConfigurationUpdate +} + // IsTCPStream discerns if the connection request needs a tcp stream proxy. func IsTCPStream(r *http.Request) bool { return r.Header.Get(InternalTCPProxySrcHeader) != "" diff --git a/connection/http2_test.go b/connection/http2_test.go index c067229c..384d29fb 100644 --- a/connection/http2_test.go +++ b/connection/http2_test.go @@ -1,6 +1,7 @@ package connection import ( + "bytes" "context" "errors" "fmt" @@ -53,6 +54,41 @@ func newTestHTTP2Connection() (*HTTP2Connection, net.Conn) { ), edgeConn } +func TestHTTP2ConfigurationSet(t *testing.T) { + http2Conn, edgeConn := newTestHTTP2Connection() + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + http2Conn.Serve(ctx) + }() + + edgeHTTP2Conn, err := testTransport.NewClientConn(edgeConn) + require.NoError(t, err) + + endpoint := fmt.Sprintf("http://localhost:8080/ok") + reqBody := []byte(`{ +"version": 2, +"config": {"warp-routing": {"enabled": true}, "originRequest" : {"connectTimeout": 10}, "ingress" : [ {"hostname": "test", "service": "https://localhost:8000" } , {"service": "http_status:404"} ]}} +`) + reader := bytes.NewReader(reqBody) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, reader) + require.NoError(t, err) + req.Header.Set(InternalUpgradeHeader, ConfigurationUpdate) + + resp, err := edgeHTTP2Conn.RoundTrip(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + bdy, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, `{"lastAppliedVersion":2,"err":null}`, string(bdy)) + cancel() + wg.Wait() + +} + func TestServeHTTP(t *testing.T) { tests := []testRequest{ {