2020-10-08 10:12:26 +00:00
package origin
import (
"bufio"
2020-10-20 15:26:55 +00:00
"context"
2020-11-02 11:21:34 +00:00
"fmt"
2020-10-08 10:12:26 +00:00
"io"
"net/http"
"strconv"
"strings"
"github.com/cloudflare/cloudflared/buffer"
"github.com/cloudflare/cloudflared/connection"
2020-11-02 11:21:34 +00:00
"github.com/cloudflare/cloudflared/ingress"
2020-10-08 10:12:26 +00:00
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/cloudflare/cloudflared/websocket"
"github.com/pkg/errors"
2020-11-25 06:55:13 +00:00
"github.com/rs/zerolog"
2020-10-08 10:12:26 +00:00
)
const (
TagHeaderNamePrefix = "Cf-Warp-Tag-"
)
2020-12-09 21:46:53 +00:00
type proxy struct {
2020-11-02 11:21:34 +00:00
ingressRules ingress . Ingress
2021-01-17 20:22:53 +00:00
warpRouting * ingress . WarpRoutingService
2020-11-02 11:21:34 +00:00
tags [ ] tunnelpogs . Tag
2020-11-25 06:55:13 +00:00
log * zerolog . Logger
2020-11-02 11:21:34 +00:00
bufferPool * buffer . Pool
2020-10-08 10:12:26 +00:00
}
2021-01-17 20:22:53 +00:00
func NewOriginProxy (
ingressRules ingress . Ingress ,
warpRouting * ingress . WarpRoutingService ,
tags [ ] tunnelpogs . Tag ,
log * zerolog . Logger ) connection . OriginProxy {
2020-12-09 21:46:53 +00:00
return & proxy {
2020-11-02 11:21:34 +00:00
ingressRules : ingressRules ,
2021-01-17 20:22:53 +00:00
warpRouting : warpRouting ,
2020-11-02 11:21:34 +00:00
tags : tags ,
2020-11-25 06:55:13 +00:00
log : log ,
2020-11-02 11:21:34 +00:00
bufferPool : buffer . NewPool ( 512 * 1024 ) ,
2020-10-08 10:12:26 +00:00
}
}
2021-01-11 19:59:45 +00:00
func ( p * proxy ) Proxy ( w connection . ResponseWriter , req * http . Request , sourceConnectionType connection . Type ) error {
2020-10-08 10:12:26 +00:00
incrementRequests ( )
defer decrementConcurrentRequests ( )
cfRay := findCfRayHeader ( req )
lbProbe := isLBProbeRequest ( req )
2020-12-09 21:46:53 +00:00
p . appendTagHeaders ( req )
2021-01-17 20:22:53 +00:00
if sourceConnectionType == connection . TypeTCP {
if p . warpRouting == nil {
err := errors . New ( ` cloudflared received a request from Warp client, but your configuration has disabled ingress from Warp clients. To enable this, set "warp-routing:\n\t enabled: true" in your config.yaml ` )
p . log . Error ( ) . Msg ( err . Error ( ) )
return err
}
resp , err := p . handleProxyConn ( w , req , nil , p . warpRouting . Proxy )
if err != nil {
p . logRequestError ( err , cfRay , ingress . ServiceWarpRouting )
w . WriteErrorResponse ( )
return err
}
p . logOriginResponse ( resp , cfRay , lbProbe , ingress . ServiceWarpRouting )
return nil
}
2020-12-09 21:46:53 +00:00
rule , ruleNum := p . ingressRules . FindMatchingRule ( req . Host , req . URL . Path )
p . logRequest ( req , cfRay , lbProbe , ruleNum )
2020-11-02 11:21:34 +00:00
2021-01-11 19:59:45 +00:00
if sourceConnectionType == connection . TypeHTTP {
resp , err := p . proxyHTTP ( w , req , rule )
if err != nil {
p . logErrorAndWriteResponse ( w , err , cfRay , ruleNum )
return err
}
2020-12-09 21:46:53 +00:00
2021-01-11 19:59:45 +00:00
p . logOriginResponse ( resp , cfRay , lbProbe , ruleNum )
return nil
}
2020-12-09 21:46:53 +00:00
2021-01-11 19:59:45 +00:00
respHeader := http . Header { }
if sourceConnectionType == connection . TypeWebsocket {
go websocket . NewConn ( w , p . log ) . Pinger ( req . Context ( ) )
respHeader = websocket . NewResponseHeader ( req )
2020-10-08 10:12:26 +00:00
}
2021-01-11 19:59:45 +00:00
2021-01-17 20:22:53 +00:00
if hostHeader := rule . Config . HTTPHostHeader ; hostHeader != "" {
req . Header . Set ( "Host" , hostHeader )
req . Host = hostHeader
}
connectionProxy , ok := rule . Service . ( ingress . StreamBasedOriginProxy )
if ! ok {
p . log . Error ( ) . Msgf ( "%s is not a connection-oriented service" , rule . Service )
return fmt . Errorf ( "Not a connection-oriented service" )
}
resp , err := p . handleProxyConn ( w , req , respHeader , connectionProxy )
2020-10-08 10:12:26 +00:00
if err != nil {
2021-01-11 19:59:45 +00:00
p . logErrorAndWriteResponse ( w , err , cfRay , ruleNum )
2020-10-08 10:12:26 +00:00
return err
}
2020-12-09 21:46:53 +00:00
2021-01-17 20:22:53 +00:00
p . logOriginResponse ( resp , cfRay , lbProbe , ruleNum )
return nil
}
func ( p * proxy ) handleProxyConn (
w connection . ResponseWriter ,
req * http . Request ,
respHeader http . Header ,
connectionProxy ingress . StreamBasedOriginProxy ) ( * http . Response , error ) {
connClosedChan := make ( chan struct { } )
err := p . proxyConnection ( connClosedChan , w , req , connectionProxy )
if err != nil {
return nil , err
}
2021-01-11 19:59:45 +00:00
status := http . StatusSwitchingProtocols
resp := & http . Response {
Status : http . StatusText ( status ) ,
StatusCode : status ,
Header : respHeader ,
ContentLength : - 1 ,
}
w . WriteRespHeaders ( http . StatusSwitchingProtocols , nil )
<- connClosedChan
2021-01-17 20:22:53 +00:00
return resp , nil
2020-12-09 21:46:53 +00:00
2020-10-08 10:12:26 +00:00
}
2021-01-11 19:59:45 +00:00
func ( p * proxy ) logErrorAndWriteResponse ( w connection . ResponseWriter , err error , cfRay string , ruleNum int ) {
p . logRequestError ( err , cfRay , ruleNum )
w . WriteErrorResponse ( )
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) proxyHTTP ( w connection . ResponseWriter , req * http . Request , rule * ingress . Rule ) ( * http . Response , error ) {
2020-10-08 10:12:26 +00:00
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
2020-11-02 11:21:34 +00:00
if rule . Config . DisableChunkedEncoding {
2020-10-08 10:12:26 +00:00
req . TransferEncoding = [ ] string { "gzip" , "deflate" }
cLength , err := strconv . Atoi ( req . Header . Get ( "Content-Length" ) )
if err == nil {
req . ContentLength = int64 ( cLength )
}
}
// Request origin to keep connection alive to improve performance
req . Header . Set ( "Connection" , "keep-alive" )
2020-11-02 11:21:34 +00:00
if hostHeader := rule . Config . HTTPHostHeader ; hostHeader != "" {
req . Header . Set ( "Host" , hostHeader )
req . Host = hostHeader
}
2020-10-08 10:12:26 +00:00
2020-12-09 21:46:53 +00:00
httpService , ok := rule . Service . ( ingress . HTTPOriginProxy )
if ! ok {
p . log . Error ( ) . Msgf ( "%s is not a http service" , rule . Service )
return nil , fmt . Errorf ( "Not a http service" )
}
resp , err := httpService . RoundTrip ( req )
2020-10-08 10:12:26 +00:00
if err != nil {
return nil , errors . Wrap ( err , "Error proxying request to origin" )
}
defer resp . Body . Close ( )
2020-12-09 21:46:53 +00:00
err = w . WriteRespHeaders ( resp . StatusCode , resp . Header )
2020-10-08 10:12:26 +00:00
if err != nil {
return nil , errors . Wrap ( err , "Error writing response header" )
}
2020-11-18 11:53:59 +00:00
if connection . IsServerSentEvent ( resp . Header ) {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msg ( "Detected Server-Side Events from Origin" )
p . writeEventStream ( w , resp . Body )
2020-10-08 10:12:26 +00:00
} else {
// Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
// compression generates dictionary on first write
2020-12-09 21:46:53 +00:00
buf := p . bufferPool . Get ( )
defer p . bufferPool . Put ( buf )
2020-11-25 06:55:13 +00:00
_ , _ = io . CopyBuffer ( w , resp . Body , buf )
2020-10-08 10:12:26 +00:00
}
return resp , nil
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) proxyConnection ( connClosedChan chan struct { } ,
2021-01-17 20:22:53 +00:00
conn io . ReadWriter , req * http . Request , connectionProxy ingress . StreamBasedOriginProxy ) error {
originConn , err := connectionProxy . EstablishConnection ( req )
2020-10-08 10:12:26 +00:00
if err != nil {
2020-12-09 21:46:53 +00:00
return err
2020-10-08 10:12:26 +00:00
}
2020-10-20 15:26:55 +00:00
serveCtx , cancel := context . WithCancel ( req . Context ( ) )
go func ( ) {
2020-10-30 11:41:14 +00:00
// serveCtx is done if req is cancelled, or streamWebsocket returns
2020-10-20 15:26:55 +00:00
<- serveCtx . Done ( )
2020-12-09 21:46:53 +00:00
originConn . Close ( )
2020-10-30 11:41:14 +00:00
close ( connClosedChan )
2020-10-20 15:26:55 +00:00
} ( )
2020-10-30 11:41:14 +00:00
2020-12-09 21:46:53 +00:00
go func ( ) {
originConn . Stream ( conn )
cancel ( )
} ( )
2020-10-30 11:41:14 +00:00
return nil
2020-10-08 10:12:26 +00:00
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) writeEventStream ( w connection . ResponseWriter , respBody io . ReadCloser ) {
2020-10-08 10:12:26 +00:00
reader := bufio . NewReader ( respBody )
for {
line , err := reader . ReadBytes ( '\n' )
if err != nil {
break
}
2020-11-25 06:55:13 +00:00
_ , _ = w . Write ( line )
2020-10-08 10:12:26 +00:00
}
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) appendTagHeaders ( r * http . Request ) {
for _ , tag := range p . tags {
2020-10-08 10:12:26 +00:00
r . Header . Add ( TagHeaderNamePrefix + tag . Name , tag . Value )
}
}
2021-01-17 20:22:53 +00:00
func ( p * proxy ) logRequest ( r * http . Request , cfRay string , lbProbe bool , rule interface { } ) {
2020-10-08 10:12:26 +00:00
if cfRay != "" {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s %s %s %s" , cfRay , r . Method , r . URL , r . Proto )
2020-10-08 10:12:26 +00:00
} else if lbProbe {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Load Balancer health check %s %s %s" , cfRay , r . Method , r . URL , r . Proto )
2020-10-08 10:12:26 +00:00
} else {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s " , r . Method , r . URL , r . Proto )
2020-10-08 10:12:26 +00:00
}
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request Headers %+v" , cfRay , r . Header )
2021-01-17 20:22:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Serving with ingress rule %v" , cfRay , rule )
2020-10-08 10:12:26 +00:00
if contentLen := r . ContentLength ; contentLen == - 1 {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request Content length unknown" , cfRay )
2020-10-08 10:12:26 +00:00
} else {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request content length %d" , cfRay , contentLen )
2020-10-08 10:12:26 +00:00
}
}
2021-01-17 20:22:53 +00:00
func ( p * proxy ) logOriginResponse ( r * http . Response , cfRay string , lbProbe bool , rule interface { } ) {
2020-11-02 11:21:34 +00:00
responseByCode . WithLabelValues ( strconv . Itoa ( r . StatusCode ) ) . Inc ( )
2020-10-08 10:12:26 +00:00
if cfRay != "" {
2021-01-17 20:22:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Status: %s served by ingress %d" , cfRay , r . Status , rule )
2020-10-08 10:12:26 +00:00
} else if lbProbe {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "Response to Load Balancer health check %s" , r . Status )
2020-10-08 10:12:26 +00:00
} else {
2021-01-17 20:22:53 +00:00
p . log . Debug ( ) . Msgf ( "Status: %s served by ingress %v" , r . Status , rule )
2020-10-08 10:12:26 +00:00
}
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response Headers %+v" , cfRay , r . Header )
2020-10-08 10:12:26 +00:00
if contentLen := r . ContentLength ; contentLen == - 1 {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response content length unknown" , cfRay )
2020-10-08 10:12:26 +00:00
} else {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response content length %d" , cfRay , contentLen )
2020-10-08 10:12:26 +00:00
}
}
2021-01-17 20:22:53 +00:00
func ( p * proxy ) logRequestError ( err error , cfRay string , rule interface { } ) {
2020-11-02 11:21:34 +00:00
requestErrors . Inc ( )
if cfRay != "" {
2021-01-17 20:22:53 +00:00
p . log . Error ( ) . Msgf ( "CF-RAY: %s Proxying to ingress %v error: %v" , cfRay , rule , err )
2020-11-02 11:21:34 +00:00
} else {
2021-01-17 20:22:53 +00:00
p . log . Error ( ) . Msgf ( "Proxying to ingress %v error: %v" , rule , err )
2020-11-02 11:21:34 +00:00
}
}
2020-10-08 10:12:26 +00:00
func findCfRayHeader ( req * http . Request ) string {
return req . Header . Get ( "Cf-Ray" )
}
func isLBProbeRequest ( req * http . Request ) bool {
return strings . HasPrefix ( req . UserAgent ( ) , lbProbeUserAgentPrefix )
}