2020-10-08 10:12:26 +00:00
package origin
import (
"bufio"
2020-10-20 15:26:55 +00:00
"context"
2020-11-02 11:21:34 +00:00
"fmt"
2020-10-08 10:12:26 +00:00
"io"
"net/http"
"strconv"
"strings"
2021-03-08 16:46:23 +00:00
"github.com/pkg/errors"
"github.com/rs/zerolog"
2021-07-01 18:30:26 +00:00
"github.com/cloudflare/cloudflared/carrier"
2020-10-08 10:12:26 +00:00
"github.com/cloudflare/cloudflared/connection"
2020-11-02 11:21:34 +00:00
"github.com/cloudflare/cloudflared/ingress"
2020-10-08 10:12:26 +00:00
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
2021-07-01 09:29:53 +00:00
"github.com/cloudflare/cloudflared/websocket"
2020-10-08 10:12:26 +00:00
)
const (
2021-05-15 04:49:34 +00:00
TagHeaderNamePrefix = "Cf-Warp-Tag-"
LogFieldCFRay = "cfRay"
LogFieldRule = "ingressRule"
LogFieldOriginService = "originService"
2020-10-08 10:12:26 +00:00
)
2020-12-09 21:46:53 +00:00
type proxy struct {
2020-11-02 11:21:34 +00:00
ingressRules ingress . Ingress
2021-01-17 20:22:53 +00:00
warpRouting * ingress . WarpRoutingService
2020-11-02 11:21:34 +00:00
tags [ ] tunnelpogs . Tag
2020-11-25 06:55:13 +00:00
log * zerolog . Logger
2021-03-08 16:46:23 +00:00
bufferPool * bufferPool
2020-10-08 10:12:26 +00:00
}
2021-07-01 18:30:26 +00:00
var switchingProtocolText = fmt . Sprintf ( "%d %s" , http . StatusSwitchingProtocols , http . StatusText ( http . StatusSwitchingProtocols ) )
2021-01-17 20:22:53 +00:00
func NewOriginProxy (
ingressRules ingress . Ingress ,
warpRouting * ingress . WarpRoutingService ,
tags [ ] tunnelpogs . Tag ,
log * zerolog . Logger ) connection . OriginProxy {
2020-12-09 21:46:53 +00:00
return & proxy {
2020-11-02 11:21:34 +00:00
ingressRules : ingressRules ,
2021-01-17 20:22:53 +00:00
warpRouting : warpRouting ,
2020-11-02 11:21:34 +00:00
tags : tags ,
2020-11-25 06:55:13 +00:00
log : log ,
2021-03-08 16:46:23 +00:00
bufferPool : newBufferPool ( 512 * 1024 ) ,
2020-10-08 10:12:26 +00:00
}
}
2021-02-05 13:01:53 +00:00
// Caller is responsible for writing any error to ResponseWriter
2021-01-11 19:59:45 +00:00
func ( p * proxy ) Proxy ( w connection . ResponseWriter , req * http . Request , sourceConnectionType connection . Type ) error {
2020-10-08 10:12:26 +00:00
incrementRequests ( )
defer decrementConcurrentRequests ( )
cfRay := findCfRayHeader ( req )
lbProbe := isLBProbeRequest ( req )
2021-02-02 18:27:50 +00:00
serveCtx , cancel := context . WithCancel ( req . Context ( ) )
defer cancel ( )
2020-12-09 21:46:53 +00:00
p . appendTagHeaders ( req )
2021-01-17 20:22:53 +00:00
if sourceConnectionType == connection . TypeTCP {
if p . warpRouting == nil {
2021-02-26 09:50:19 +00:00
err := errors . New ( ` cloudflared received a request from WARP client, but your configuration has disabled ingress from WARP clients. To enable this, set "warp-routing:\n\t enabled: true" in your config.yaml ` )
2021-01-17 20:22:53 +00:00
p . log . Error ( ) . Msg ( err . Error ( ) )
return err
}
2021-02-05 13:01:53 +00:00
logFields := logFields {
cfRay : cfRay ,
lbProbe : lbProbe ,
rule : ingress . ServiceWarpRouting ,
}
2021-07-01 18:30:26 +00:00
host , err := getRequestHost ( req )
if err != nil {
err = fmt . Errorf ( ` cloudflared recieved a warp-routing request with an empty host value: %v ` , err )
return err
}
if err := p . proxyStreamRequest ( serveCtx , w , host , req , p . warpRouting . Proxy , logFields ) ; err != nil {
2021-05-15 04:49:34 +00:00
p . logRequestError ( err , cfRay , "" , ingress . ServiceWarpRouting )
2021-01-17 20:22:53 +00:00
return err
}
return nil
}
2020-12-09 21:46:53 +00:00
rule , ruleNum := p . ingressRules . FindMatchingRule ( req . Host , req . URL . Path )
2021-02-05 13:01:53 +00:00
logFields := logFields {
cfRay : cfRay ,
lbProbe : lbProbe ,
rule : ruleNum ,
}
p . logRequest ( req , logFields )
2020-11-02 11:21:34 +00:00
2021-07-01 09:29:53 +00:00
switch originProxy := rule . Service . ( type ) {
case ingress . HTTPOriginProxy :
if err := p . proxyHTTPRequest ( w , req , originProxy , sourceConnectionType == connection . TypeWebsocket ,
rule . Config . DisableChunkedEncoding , logFields ) ; err != nil {
2021-05-15 04:49:34 +00:00
rule , srv := ruleField ( p . ingressRules , ruleNum )
p . logRequestError ( err , cfRay , rule , srv )
2021-01-11 19:59:45 +00:00
return err
}
return nil
2020-12-09 21:46:53 +00:00
2021-07-01 09:29:53 +00:00
case ingress . StreamBasedOriginProxy :
2021-07-01 18:30:26 +00:00
dest , err := getDestFromRule ( rule , req )
if err != nil {
return err
}
if err := p . proxyStreamRequest ( serveCtx , w , dest , req , originProxy , logFields ) ; err != nil {
2021-07-01 09:29:53 +00:00
rule , srv := ruleField ( p . ingressRules , ruleNum )
p . logRequestError ( err , cfRay , rule , srv )
return err
}
return nil
default :
return fmt . Errorf ( "Unrecognized service: %s, %t" , rule . Service , originProxy )
2021-07-01 18:30:26 +00:00
}
}
2021-02-02 18:27:50 +00:00
2021-07-01 18:30:26 +00:00
func getDestFromRule ( rule * ingress . Rule , req * http . Request ) ( string , error ) {
switch rule . Service . String ( ) {
case ingress . ServiceBastion :
return carrier . ResolveBastionDest ( req )
default :
return rule . Service . String ( ) , nil
}
}
// getRequestHost returns the host of the http.Request.
func getRequestHost ( r * http . Request ) ( string , error ) {
if r . Host != "" {
return r . Host , nil
2020-10-08 10:12:26 +00:00
}
2021-07-01 18:30:26 +00:00
if r . URL != nil {
return r . URL . Host , nil
}
return "" , errors . New ( "host not set in incoming request" )
2021-01-17 20:22:53 +00:00
}
2021-05-15 04:49:34 +00:00
func ruleField ( ing ingress . Ingress , ruleNum int ) ( ruleID string , srv string ) {
srv = ing . Rules [ ruleNum ] . Service . String ( )
2021-04-09 21:30:14 +00:00
if ing . IsSingleRule ( ) {
2021-05-15 04:49:34 +00:00
return "" , srv
2021-04-09 21:30:14 +00:00
}
2021-05-15 04:49:34 +00:00
return fmt . Sprintf ( "%d" , ruleNum ) , srv
2021-04-09 21:30:14 +00:00
}
2021-07-01 09:29:53 +00:00
func ( p * proxy ) proxyHTTPRequest (
w connection . ResponseWriter ,
req * http . Request ,
httpService ingress . HTTPOriginProxy ,
isWebsocket bool ,
disableChunkedEncoding bool ,
fields logFields ) error {
roundTripReq := req
if isWebsocket {
roundTripReq = req . Clone ( req . Context ( ) )
roundTripReq . Header . Set ( "Connection" , "Upgrade" )
roundTripReq . Header . Set ( "Upgrade" , "websocket" )
roundTripReq . Header . Set ( "Sec-Websocket-Version" , "13" )
roundTripReq . ContentLength = 0
roundTripReq . Body = nil
} else {
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
if disableChunkedEncoding {
roundTripReq . TransferEncoding = [ ] string { "gzip" , "deflate" }
cLength , err := strconv . Atoi ( req . Header . Get ( "Content-Length" ) )
if err == nil {
roundTripReq . ContentLength = int64 ( cLength )
}
2020-10-08 10:12:26 +00:00
}
2021-07-01 09:29:53 +00:00
// Request origin to keep connection alive to improve performance
roundTripReq . Header . Set ( "Connection" , "keep-alive" )
2020-10-08 10:12:26 +00:00
}
2021-07-01 09:29:53 +00:00
resp , err := httpService . RoundTrip ( roundTripReq )
2020-10-08 10:12:26 +00:00
if err != nil {
2021-05-15 04:49:34 +00:00
return errors . Wrap ( err , "Unable to reach the origin service. The service may be down or it may not be responding to traffic from cloudflared" )
2020-10-08 10:12:26 +00:00
}
defer resp . Body . Close ( )
2020-12-09 21:46:53 +00:00
err = w . WriteRespHeaders ( resp . StatusCode , resp . Header )
2020-10-08 10:12:26 +00:00
if err != nil {
2021-02-05 13:01:53 +00:00
return errors . Wrap ( err , "Error writing response header" )
2020-10-08 10:12:26 +00:00
}
2021-07-01 09:29:53 +00:00
if resp . StatusCode == http . StatusSwitchingProtocols {
rwc , ok := resp . Body . ( io . ReadWriteCloser )
if ! ok {
return errors . New ( "internal error: unsupported connection type" )
}
defer rwc . Close ( )
eyeballStream := & bidirectionalStream {
writer : w ,
reader : req . Body ,
}
websocket . Stream ( eyeballStream , rwc , p . log )
return nil
}
2020-11-18 11:53:59 +00:00
if connection . IsServerSentEvent ( resp . Header ) {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msg ( "Detected Server-Side Events from Origin" )
p . writeEventStream ( w , resp . Body )
2020-10-08 10:12:26 +00:00
} else {
// Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
// compression generates dictionary on first write
2020-12-09 21:46:53 +00:00
buf := p . bufferPool . Get ( )
defer p . bufferPool . Put ( buf )
2020-11-25 06:55:13 +00:00
_ , _ = io . CopyBuffer ( w , resp . Body , buf )
2020-10-08 10:12:26 +00:00
}
2021-02-05 13:01:53 +00:00
p . logOriginResponse ( resp , fields )
return nil
2020-10-08 10:12:26 +00:00
}
2021-02-05 13:01:53 +00:00
// proxyStreamRequest first establish a connection with origin, then it writes the status code and headers, and finally it streams data between
// eyeball and origin.
func ( p * proxy ) proxyStreamRequest (
2021-02-02 18:27:50 +00:00
serveCtx context . Context ,
w connection . ResponseWriter ,
2021-07-01 18:30:26 +00:00
dest string ,
2021-02-02 18:27:50 +00:00
req * http . Request ,
connectionProxy ingress . StreamBasedOriginProxy ,
2021-02-05 13:01:53 +00:00
fields logFields ,
) error {
2021-07-01 18:30:26 +00:00
originConn , err := connectionProxy . EstablishConnection ( dest )
2020-10-08 10:12:26 +00:00
if err != nil {
2021-02-05 13:01:53 +00:00
return err
2021-02-02 18:27:50 +00:00
}
2021-07-01 18:30:26 +00:00
resp := & http . Response {
Status : switchingProtocolText ,
StatusCode : http . StatusSwitchingProtocols ,
ContentLength : - 1 ,
}
if secWebsocketKey := req . Header . Get ( "Sec-WebSocket-Key" ) ; secWebsocketKey != "" {
resp . Header = websocket . NewResponseHeader ( req )
2021-02-02 18:27:50 +00:00
}
2021-02-05 13:01:53 +00:00
if err = w . WriteRespHeaders ( resp . StatusCode , resp . Header ) ; err != nil {
return err
2021-02-02 18:27:50 +00:00
}
streamCtx , cancel := context . WithCancel ( serveCtx )
defer cancel ( )
2020-10-30 11:41:14 +00:00
2020-12-09 21:46:53 +00:00
go func ( ) {
2021-02-02 18:27:50 +00:00
// streamCtx is done if req is cancelled or if Stream returns
<- streamCtx . Done ( )
originConn . Close ( )
2020-12-09 21:46:53 +00:00
} ( )
2020-10-30 11:41:14 +00:00
2021-02-11 14:36:42 +00:00
eyeballStream := & bidirectionalStream {
writer : w ,
reader : req . Body ,
}
originConn . Stream ( serveCtx , eyeballStream , p . log )
2021-02-05 13:01:53 +00:00
p . logOriginResponse ( resp , fields )
return nil
2020-10-08 10:12:26 +00:00
}
2021-02-11 14:36:42 +00:00
type bidirectionalStream struct {
reader io . Reader
writer io . Writer
}
func ( wr * bidirectionalStream ) Read ( p [ ] byte ) ( n int , err error ) {
return wr . reader . Read ( p )
}
func ( wr * bidirectionalStream ) Write ( p [ ] byte ) ( n int , err error ) {
return wr . writer . Write ( p )
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) writeEventStream ( w connection . ResponseWriter , respBody io . ReadCloser ) {
2020-10-08 10:12:26 +00:00
reader := bufio . NewReader ( respBody )
for {
line , err := reader . ReadBytes ( '\n' )
if err != nil {
break
}
2020-11-25 06:55:13 +00:00
_ , _ = w . Write ( line )
2020-10-08 10:12:26 +00:00
}
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) appendTagHeaders ( r * http . Request ) {
for _ , tag := range p . tags {
2020-10-08 10:12:26 +00:00
r . Header . Add ( TagHeaderNamePrefix + tag . Name , tag . Value )
}
}
2021-02-05 13:01:53 +00:00
type logFields struct {
cfRay string
lbProbe bool
rule interface { }
}
func ( p * proxy ) logRequest ( r * http . Request , fields logFields ) {
if fields . cfRay != "" {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s %s %s %s" , fields . cfRay , r . Method , r . URL , r . Proto )
} else if fields . lbProbe {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Load Balancer health check %s %s %s" , fields . cfRay , r . Method , r . URL , r . Proto )
2020-10-08 10:12:26 +00:00
} else {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s " , r . Method , r . URL , r . Proto )
2020-10-08 10:12:26 +00:00
}
2021-04-30 23:39:15 +00:00
p . log . Debug ( ) .
Str ( "CF-RAY" , fields . cfRay ) .
2021-05-03 21:46:43 +00:00
Str ( "Header" , fmt . Sprintf ( "%+v" , r . Header ) ) .
2021-04-30 23:39:15 +00:00
Str ( "host" , r . Host ) .
Str ( "path" , r . URL . Path ) .
Interface ( "rule" , fields . rule ) .
Msg ( "Inbound request" )
2020-10-08 10:12:26 +00:00
if contentLen := r . ContentLength ; contentLen == - 1 {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request Content length unknown" , fields . cfRay )
2020-10-08 10:12:26 +00:00
} else {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request content length %d" , fields . cfRay , contentLen )
2020-10-08 10:12:26 +00:00
}
}
2021-02-05 13:01:53 +00:00
func ( p * proxy ) logOriginResponse ( resp * http . Response , fields logFields ) {
responseByCode . WithLabelValues ( strconv . Itoa ( resp . StatusCode ) ) . Inc ( )
if fields . cfRay != "" {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Status: %s served by ingress %d" , fields . cfRay , resp . Status , fields . rule )
} else if fields . lbProbe {
p . log . Debug ( ) . Msgf ( "Response to Load Balancer health check %s" , resp . Status )
2020-10-08 10:12:26 +00:00
} else {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "Status: %s served by ingress %v" , resp . Status , fields . rule )
2020-10-08 10:12:26 +00:00
}
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response Headers %+v" , fields . cfRay , resp . Header )
2020-10-08 10:12:26 +00:00
2021-02-05 13:01:53 +00:00
if contentLen := resp . ContentLength ; contentLen == - 1 {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response content length unknown" , fields . cfRay )
2020-10-08 10:12:26 +00:00
} else {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response content length %d" , fields . cfRay , contentLen )
2020-10-08 10:12:26 +00:00
}
}
2021-05-15 04:49:34 +00:00
func ( p * proxy ) logRequestError ( err error , cfRay string , rule , service string ) {
2020-11-02 11:21:34 +00:00
requestErrors . Inc ( )
2021-04-09 21:30:14 +00:00
log := p . log . Error ( ) . Err ( err )
2020-11-02 11:21:34 +00:00
if cfRay != "" {
2021-04-09 21:30:14 +00:00
log = log . Str ( LogFieldCFRay , cfRay )
}
if rule != "" {
log = log . Str ( LogFieldRule , rule )
2020-11-02 11:21:34 +00:00
}
2021-05-15 04:49:34 +00:00
if service != "" {
log = log . Str ( LogFieldOriginService , service )
}
2021-04-09 21:30:14 +00:00
log . Msg ( "" )
2020-11-02 11:21:34 +00:00
}
2020-10-08 10:12:26 +00:00
func findCfRayHeader ( req * http . Request ) string {
return req . Header . Get ( "Cf-Ray" )
}
func isLBProbeRequest ( req * http . Request ) bool {
return strings . HasPrefix ( req . UserAgent ( ) , lbProbeUserAgentPrefix )
}