2020-10-08 10:12:26 +00:00
package origin
import (
"bufio"
2020-10-20 15:26:55 +00:00
"context"
2020-11-02 11:21:34 +00:00
"fmt"
2020-10-08 10:12:26 +00:00
"io"
"net/http"
"strconv"
"strings"
2021-03-08 16:46:23 +00:00
"github.com/pkg/errors"
"github.com/rs/zerolog"
2020-10-08 10:12:26 +00:00
"github.com/cloudflare/cloudflared/connection"
2020-11-02 11:21:34 +00:00
"github.com/cloudflare/cloudflared/ingress"
2020-10-08 10:12:26 +00:00
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
)
const (
TagHeaderNamePrefix = "Cf-Warp-Tag-"
)
2020-12-09 21:46:53 +00:00
type proxy struct {
2020-11-02 11:21:34 +00:00
ingressRules ingress . Ingress
2021-01-17 20:22:53 +00:00
warpRouting * ingress . WarpRoutingService
2020-11-02 11:21:34 +00:00
tags [ ] tunnelpogs . Tag
2020-11-25 06:55:13 +00:00
log * zerolog . Logger
2021-03-08 16:46:23 +00:00
bufferPool * bufferPool
2020-10-08 10:12:26 +00:00
}
2021-01-17 20:22:53 +00:00
func NewOriginProxy (
ingressRules ingress . Ingress ,
warpRouting * ingress . WarpRoutingService ,
tags [ ] tunnelpogs . Tag ,
log * zerolog . Logger ) connection . OriginProxy {
2020-12-09 21:46:53 +00:00
return & proxy {
2020-11-02 11:21:34 +00:00
ingressRules : ingressRules ,
2021-01-17 20:22:53 +00:00
warpRouting : warpRouting ,
2020-11-02 11:21:34 +00:00
tags : tags ,
2020-11-25 06:55:13 +00:00
log : log ,
2021-03-08 16:46:23 +00:00
bufferPool : newBufferPool ( 512 * 1024 ) ,
2020-10-08 10:12:26 +00:00
}
}
2021-02-05 13:01:53 +00:00
// Caller is responsible for writing any error to ResponseWriter
2021-01-11 19:59:45 +00:00
func ( p * proxy ) Proxy ( w connection . ResponseWriter , req * http . Request , sourceConnectionType connection . Type ) error {
2020-10-08 10:12:26 +00:00
incrementRequests ( )
defer decrementConcurrentRequests ( )
cfRay := findCfRayHeader ( req )
lbProbe := isLBProbeRequest ( req )
2021-02-02 18:27:50 +00:00
serveCtx , cancel := context . WithCancel ( req . Context ( ) )
defer cancel ( )
2020-12-09 21:46:53 +00:00
p . appendTagHeaders ( req )
2021-01-17 20:22:53 +00:00
if sourceConnectionType == connection . TypeTCP {
if p . warpRouting == nil {
2021-02-26 09:50:19 +00:00
err := errors . New ( ` cloudflared received a request from WARP client, but your configuration has disabled ingress from WARP clients. To enable this, set "warp-routing:\n\t enabled: true" in your config.yaml ` )
2021-01-17 20:22:53 +00:00
p . log . Error ( ) . Msg ( err . Error ( ) )
return err
}
2021-02-05 13:01:53 +00:00
logFields := logFields {
cfRay : cfRay ,
lbProbe : lbProbe ,
rule : ingress . ServiceWarpRouting ,
}
if err := p . proxyStreamRequest ( serveCtx , w , req , sourceConnectionType , p . warpRouting . Proxy , logFields ) ; err != nil {
2021-01-17 20:22:53 +00:00
p . logRequestError ( err , cfRay , ingress . ServiceWarpRouting )
return err
}
return nil
}
2020-12-09 21:46:53 +00:00
rule , ruleNum := p . ingressRules . FindMatchingRule ( req . Host , req . URL . Path )
2021-02-05 13:01:53 +00:00
logFields := logFields {
cfRay : cfRay ,
lbProbe : lbProbe ,
rule : ruleNum ,
}
p . logRequest ( req , logFields )
2020-11-02 11:21:34 +00:00
2021-01-11 19:59:45 +00:00
if sourceConnectionType == connection . TypeHTTP {
2021-02-05 13:01:53 +00:00
if err := p . proxyHTTPRequest ( w , req , rule , logFields ) ; err != nil {
p . logRequestError ( err , cfRay , ruleNum )
2021-01-11 19:59:45 +00:00
return err
}
return nil
}
2020-12-09 21:46:53 +00:00
2021-01-17 20:22:53 +00:00
connectionProxy , ok := rule . Service . ( ingress . StreamBasedOriginProxy )
if ! ok {
p . log . Error ( ) . Msgf ( "%s is not a connection-oriented service" , rule . Service )
return fmt . Errorf ( "Not a connection-oriented service" )
}
2021-02-02 18:27:50 +00:00
2021-02-05 13:01:53 +00:00
if err := p . proxyStreamRequest ( serveCtx , w , req , sourceConnectionType , connectionProxy , logFields ) ; err != nil {
p . logRequestError ( err , cfRay , ruleNum )
2020-10-08 10:12:26 +00:00
return err
}
2021-01-17 20:22:53 +00:00
return nil
}
2021-02-05 13:01:53 +00:00
func ( p * proxy ) proxyHTTPRequest ( w connection . ResponseWriter , req * http . Request , rule * ingress . Rule , fields logFields ) error {
2020-10-08 10:12:26 +00:00
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
2020-11-02 11:21:34 +00:00
if rule . Config . DisableChunkedEncoding {
2020-10-08 10:12:26 +00:00
req . TransferEncoding = [ ] string { "gzip" , "deflate" }
cLength , err := strconv . Atoi ( req . Header . Get ( "Content-Length" ) )
if err == nil {
req . ContentLength = int64 ( cLength )
}
}
// Request origin to keep connection alive to improve performance
req . Header . Set ( "Connection" , "keep-alive" )
2020-12-09 21:46:53 +00:00
httpService , ok := rule . Service . ( ingress . HTTPOriginProxy )
if ! ok {
p . log . Error ( ) . Msgf ( "%s is not a http service" , rule . Service )
2021-02-05 13:01:53 +00:00
return fmt . Errorf ( "Not a http service" )
2020-12-09 21:46:53 +00:00
}
resp , err := httpService . RoundTrip ( req )
2020-10-08 10:12:26 +00:00
if err != nil {
2021-02-05 13:01:53 +00:00
return errors . Wrap ( err , "Error proxying request to origin" )
2020-10-08 10:12:26 +00:00
}
defer resp . Body . Close ( )
2020-12-09 21:46:53 +00:00
err = w . WriteRespHeaders ( resp . StatusCode , resp . Header )
2020-10-08 10:12:26 +00:00
if err != nil {
2021-02-05 13:01:53 +00:00
return errors . Wrap ( err , "Error writing response header" )
2020-10-08 10:12:26 +00:00
}
2020-11-18 11:53:59 +00:00
if connection . IsServerSentEvent ( resp . Header ) {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msg ( "Detected Server-Side Events from Origin" )
p . writeEventStream ( w , resp . Body )
2020-10-08 10:12:26 +00:00
} else {
// Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
// compression generates dictionary on first write
2020-12-09 21:46:53 +00:00
buf := p . bufferPool . Get ( )
defer p . bufferPool . Put ( buf )
2020-11-25 06:55:13 +00:00
_ , _ = io . CopyBuffer ( w , resp . Body , buf )
2020-10-08 10:12:26 +00:00
}
2021-02-05 13:01:53 +00:00
p . logOriginResponse ( resp , fields )
return nil
2020-10-08 10:12:26 +00:00
}
2021-02-05 13:01:53 +00:00
// proxyStreamRequest first establish a connection with origin, then it writes the status code and headers, and finally it streams data between
// eyeball and origin.
func ( p * proxy ) proxyStreamRequest (
2021-02-02 18:27:50 +00:00
serveCtx context . Context ,
w connection . ResponseWriter ,
req * http . Request ,
sourceConnectionType connection . Type ,
connectionProxy ingress . StreamBasedOriginProxy ,
2021-02-05 13:01:53 +00:00
fields logFields ,
) error {
originConn , resp , err := connectionProxy . EstablishConnection ( req )
2020-10-08 10:12:26 +00:00
if err != nil {
2021-02-05 13:01:53 +00:00
return err
2021-02-02 18:27:50 +00:00
}
2021-02-05 13:01:53 +00:00
if resp . Body != nil {
defer resp . Body . Close ( )
2021-02-02 18:27:50 +00:00
}
2021-02-05 13:01:53 +00:00
if err = w . WriteRespHeaders ( resp . StatusCode , resp . Header ) ; err != nil {
return err
2021-02-02 18:27:50 +00:00
}
streamCtx , cancel := context . WithCancel ( serveCtx )
defer cancel ( )
2020-10-30 11:41:14 +00:00
2020-12-09 21:46:53 +00:00
go func ( ) {
2021-02-02 18:27:50 +00:00
// streamCtx is done if req is cancelled or if Stream returns
<- streamCtx . Done ( )
originConn . Close ( )
2020-12-09 21:46:53 +00:00
} ( )
2020-10-30 11:41:14 +00:00
2021-02-11 14:36:42 +00:00
eyeballStream := & bidirectionalStream {
writer : w ,
reader : req . Body ,
}
originConn . Stream ( serveCtx , eyeballStream , p . log )
2021-02-05 13:01:53 +00:00
p . logOriginResponse ( resp , fields )
return nil
2020-10-08 10:12:26 +00:00
}
2021-02-11 14:36:42 +00:00
type bidirectionalStream struct {
reader io . Reader
writer io . Writer
}
func ( wr * bidirectionalStream ) Read ( p [ ] byte ) ( n int , err error ) {
return wr . reader . Read ( p )
}
func ( wr * bidirectionalStream ) Write ( p [ ] byte ) ( n int , err error ) {
return wr . writer . Write ( p )
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) writeEventStream ( w connection . ResponseWriter , respBody io . ReadCloser ) {
2020-10-08 10:12:26 +00:00
reader := bufio . NewReader ( respBody )
for {
line , err := reader . ReadBytes ( '\n' )
if err != nil {
break
}
2020-11-25 06:55:13 +00:00
_ , _ = w . Write ( line )
2020-10-08 10:12:26 +00:00
}
}
2020-12-09 21:46:53 +00:00
func ( p * proxy ) appendTagHeaders ( r * http . Request ) {
for _ , tag := range p . tags {
2020-10-08 10:12:26 +00:00
r . Header . Add ( TagHeaderNamePrefix + tag . Name , tag . Value )
}
}
2021-02-05 13:01:53 +00:00
type logFields struct {
cfRay string
lbProbe bool
rule interface { }
}
func ( p * proxy ) logRequest ( r * http . Request , fields logFields ) {
if fields . cfRay != "" {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s %s %s %s" , fields . cfRay , r . Method , r . URL , r . Proto )
} else if fields . lbProbe {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Load Balancer health check %s %s %s" , fields . cfRay , r . Method , r . URL , r . Proto )
2020-10-08 10:12:26 +00:00
} else {
2020-12-09 21:46:53 +00:00
p . log . Debug ( ) . Msgf ( "All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s " , r . Method , r . URL , r . Proto )
2020-10-08 10:12:26 +00:00
}
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request Headers %+v" , fields . cfRay , r . Header )
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Serving with ingress rule %v" , fields . cfRay , fields . rule )
2020-10-08 10:12:26 +00:00
if contentLen := r . ContentLength ; contentLen == - 1 {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request Content length unknown" , fields . cfRay )
2020-10-08 10:12:26 +00:00
} else {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Request content length %d" , fields . cfRay , contentLen )
2020-10-08 10:12:26 +00:00
}
}
2021-02-05 13:01:53 +00:00
func ( p * proxy ) logOriginResponse ( resp * http . Response , fields logFields ) {
responseByCode . WithLabelValues ( strconv . Itoa ( resp . StatusCode ) ) . Inc ( )
if fields . cfRay != "" {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Status: %s served by ingress %d" , fields . cfRay , resp . Status , fields . rule )
} else if fields . lbProbe {
p . log . Debug ( ) . Msgf ( "Response to Load Balancer health check %s" , resp . Status )
2020-10-08 10:12:26 +00:00
} else {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "Status: %s served by ingress %v" , resp . Status , fields . rule )
2020-10-08 10:12:26 +00:00
}
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response Headers %+v" , fields . cfRay , resp . Header )
2020-10-08 10:12:26 +00:00
2021-02-05 13:01:53 +00:00
if contentLen := resp . ContentLength ; contentLen == - 1 {
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response content length unknown" , fields . cfRay )
2020-10-08 10:12:26 +00:00
} else {
2021-02-05 13:01:53 +00:00
p . log . Debug ( ) . Msgf ( "CF-RAY: %s Response content length %d" , fields . cfRay , contentLen )
2020-10-08 10:12:26 +00:00
}
}
2021-01-17 20:22:53 +00:00
func ( p * proxy ) logRequestError ( err error , cfRay string , rule interface { } ) {
2020-11-02 11:21:34 +00:00
requestErrors . Inc ( )
if cfRay != "" {
2021-01-17 20:22:53 +00:00
p . log . Error ( ) . Msgf ( "CF-RAY: %s Proxying to ingress %v error: %v" , cfRay , rule , err )
2020-11-02 11:21:34 +00:00
} else {
2021-01-17 20:22:53 +00:00
p . log . Error ( ) . Msgf ( "Proxying to ingress %v error: %v" , rule , err )
2020-11-02 11:21:34 +00:00
}
}
2020-10-08 10:12:26 +00:00
func findCfRayHeader ( req * http . Request ) string {
return req . Header . Get ( "Cf-Ray" )
}
func isLBProbeRequest ( req * http . Request ) bool {
return strings . HasPrefix ( req . UserAgent ( ) , lbProbeUserAgentPrefix )
}