cloudflared-mirror/origin/proxy.go

281 lines
8.1 KiB
Go

package origin
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/cloudflare/cloudflared/buffer"
"github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/ingress"
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/cloudflare/cloudflared/websocket"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
const (
TagHeaderNamePrefix = "Cf-Warp-Tag-"
)
type proxy struct {
ingressRules ingress.Ingress
warpRouting *ingress.WarpRoutingService
tags []tunnelpogs.Tag
log *zerolog.Logger
bufferPool *buffer.Pool
}
func NewOriginProxy(
ingressRules ingress.Ingress,
warpRouting *ingress.WarpRoutingService,
tags []tunnelpogs.Tag,
log *zerolog.Logger) connection.OriginProxy {
return &proxy{
ingressRules: ingressRules,
warpRouting: warpRouting,
tags: tags,
log: log,
bufferPool: buffer.NewPool(512 * 1024),
}
}
func (p *proxy) Proxy(w connection.ResponseWriter, req *http.Request, sourceConnectionType connection.Type) error {
incrementRequests()
defer decrementConcurrentRequests()
cfRay := findCfRayHeader(req)
lbProbe := isLBProbeRequest(req)
p.appendTagHeaders(req)
if sourceConnectionType == connection.TypeTCP {
if p.warpRouting == nil {
err := errors.New(`cloudflared received a request from Warp client, but your configuration has disabled ingress from Warp clients. To enable this, set "warp-routing:\n\t enabled: true" in your config.yaml`)
p.log.Error().Msg(err.Error())
return err
}
resp, err := p.handleProxyConn(w, req, nil, p.warpRouting.Proxy)
if err != nil {
p.logRequestError(err, cfRay, ingress.ServiceWarpRouting)
w.WriteErrorResponse()
return err
}
p.logOriginResponse(resp, cfRay, lbProbe, ingress.ServiceWarpRouting)
return nil
}
rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
p.logRequest(req, cfRay, lbProbe, ruleNum)
if sourceConnectionType == connection.TypeHTTP {
resp, err := p.proxyHTTP(w, req, rule)
if err != nil {
p.logErrorAndWriteResponse(w, err, cfRay, ruleNum)
return err
}
p.logOriginResponse(resp, cfRay, lbProbe, ruleNum)
return nil
}
respHeader := http.Header{}
if sourceConnectionType == connection.TypeWebsocket {
go websocket.NewConn(w, p.log).Pinger(req.Context())
respHeader = websocket.NewResponseHeader(req)
}
if hostHeader := rule.Config.HTTPHostHeader; hostHeader != "" {
req.Header.Set("Host", hostHeader)
req.Host = hostHeader
}
connectionProxy, ok := rule.Service.(ingress.StreamBasedOriginProxy)
if !ok {
p.log.Error().Msgf("%s is not a connection-oriented service", rule.Service)
return fmt.Errorf("Not a connection-oriented service")
}
resp, err := p.handleProxyConn(w, req, respHeader, connectionProxy)
if err != nil {
p.logErrorAndWriteResponse(w, err, cfRay, ruleNum)
return err
}
p.logOriginResponse(resp, cfRay, lbProbe, ruleNum)
return nil
}
func (p *proxy) handleProxyConn(
w connection.ResponseWriter,
req *http.Request,
respHeader http.Header,
connectionProxy ingress.StreamBasedOriginProxy) (*http.Response, error) {
connClosedChan := make(chan struct{})
err := p.proxyConnection(connClosedChan, w, req, connectionProxy)
if err != nil {
return nil, err
}
status := http.StatusSwitchingProtocols
resp := &http.Response{
Status: http.StatusText(status),
StatusCode: status,
Header: respHeader,
ContentLength: -1,
}
w.WriteRespHeaders(http.StatusSwitchingProtocols, nil)
<-connClosedChan
return resp, nil
}
func (p *proxy) logErrorAndWriteResponse(w connection.ResponseWriter, err error, cfRay string, ruleNum int) {
p.logRequestError(err, cfRay, ruleNum)
w.WriteErrorResponse()
}
func (p *proxy) proxyHTTP(w connection.ResponseWriter, req *http.Request, rule *ingress.Rule) (*http.Response, error) {
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
if rule.Config.DisableChunkedEncoding {
req.TransferEncoding = []string{"gzip", "deflate"}
cLength, err := strconv.Atoi(req.Header.Get("Content-Length"))
if err == nil {
req.ContentLength = int64(cLength)
}
}
// Request origin to keep connection alive to improve performance
req.Header.Set("Connection", "keep-alive")
if hostHeader := rule.Config.HTTPHostHeader; hostHeader != "" {
req.Header.Set("Host", hostHeader)
req.Host = hostHeader
}
httpService, ok := rule.Service.(ingress.HTTPOriginProxy)
if !ok {
p.log.Error().Msgf("%s is not a http service", rule.Service)
return nil, fmt.Errorf("Not a http service")
}
resp, err := httpService.RoundTrip(req)
if err != nil {
return nil, errors.Wrap(err, "Error proxying request to origin")
}
defer resp.Body.Close()
err = w.WriteRespHeaders(resp.StatusCode, resp.Header)
if err != nil {
return nil, errors.Wrap(err, "Error writing response header")
}
if connection.IsServerSentEvent(resp.Header) {
p.log.Debug().Msg("Detected Server-Side Events from Origin")
p.writeEventStream(w, resp.Body)
} else {
// Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
// compression generates dictionary on first write
buf := p.bufferPool.Get()
defer p.bufferPool.Put(buf)
_, _ = io.CopyBuffer(w, resp.Body, buf)
}
return resp, nil
}
func (p *proxy) proxyConnection(connClosedChan chan struct{},
conn io.ReadWriter, req *http.Request, connectionProxy ingress.StreamBasedOriginProxy) error {
originConn, err := connectionProxy.EstablishConnection(req)
if err != nil {
return err
}
serveCtx, cancel := context.WithCancel(req.Context())
go func() {
// serveCtx is done if req is cancelled, or streamWebsocket returns
<-serveCtx.Done()
originConn.Close()
close(connClosedChan)
}()
go func() {
originConn.Stream(conn)
cancel()
}()
return nil
}
func (p *proxy) writeEventStream(w connection.ResponseWriter, respBody io.ReadCloser) {
reader := bufio.NewReader(respBody)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
break
}
_, _ = w.Write(line)
}
}
func (p *proxy) appendTagHeaders(r *http.Request) {
for _, tag := range p.tags {
r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
}
}
func (p *proxy) logRequest(r *http.Request, cfRay string, lbProbe bool, rule interface{}) {
if cfRay != "" {
p.log.Debug().Msgf("CF-RAY: %s %s %s %s", cfRay, r.Method, r.URL, r.Proto)
} else if lbProbe {
p.log.Debug().Msgf("CF-RAY: %s Load Balancer health check %s %s %s", cfRay, r.Method, r.URL, r.Proto)
} else {
p.log.Debug().Msgf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", r.Method, r.URL, r.Proto)
}
p.log.Debug().Msgf("CF-RAY: %s Request Headers %+v", cfRay, r.Header)
p.log.Debug().Msgf("CF-RAY: %s Serving with ingress rule %v", cfRay, rule)
if contentLen := r.ContentLength; contentLen == -1 {
p.log.Debug().Msgf("CF-RAY: %s Request Content length unknown", cfRay)
} else {
p.log.Debug().Msgf("CF-RAY: %s Request content length %d", cfRay, contentLen)
}
}
func (p *proxy) logOriginResponse(r *http.Response, cfRay string, lbProbe bool, rule interface{}) {
responseByCode.WithLabelValues(strconv.Itoa(r.StatusCode)).Inc()
if cfRay != "" {
p.log.Debug().Msgf("CF-RAY: %s Status: %s served by ingress %d", cfRay, r.Status, rule)
} else if lbProbe {
p.log.Debug().Msgf("Response to Load Balancer health check %s", r.Status)
} else {
p.log.Debug().Msgf("Status: %s served by ingress %v", r.Status, rule)
}
p.log.Debug().Msgf("CF-RAY: %s Response Headers %+v", cfRay, r.Header)
if contentLen := r.ContentLength; contentLen == -1 {
p.log.Debug().Msgf("CF-RAY: %s Response content length unknown", cfRay)
} else {
p.log.Debug().Msgf("CF-RAY: %s Response content length %d", cfRay, contentLen)
}
}
func (p *proxy) logRequestError(err error, cfRay string, rule interface{}) {
requestErrors.Inc()
if cfRay != "" {
p.log.Error().Msgf("CF-RAY: %s Proxying to ingress %v error: %v", cfRay, rule, err)
} else {
p.log.Error().Msgf("Proxying to ingress %v error: %v", rule, err)
}
}
func findCfRayHeader(req *http.Request) string {
return req.Header.Get("Cf-Ray")
}
func isLBProbeRequest(req *http.Request) bool {
return strings.HasPrefix(req.UserAgent(), lbProbeUserAgentPrefix)
}