2018-05-01 23:45:06 +00:00
package origin
import (
"bufio"
2019-01-10 20:55:44 +00:00
"context"
2018-05-01 23:45:06 +00:00
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
2019-02-19 17:40:49 +00:00
"sync"
2018-05-01 23:45:06 +00:00
"time"
2019-11-21 17:03:13 +00:00
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
2020-02-24 17:06:19 +00:00
"github.com/cloudflare/cloudflared/buffer"
2019-06-17 21:18:47 +00:00
"github.com/cloudflare/cloudflared/cmd/cloudflared/buildinfo"
2020-07-29 22:48:27 +00:00
"github.com/cloudflare/cloudflared/cmd/cloudflared/ui"
2019-11-21 18:10:44 +00:00
"github.com/cloudflare/cloudflared/connection"
2018-05-01 23:45:06 +00:00
"github.com/cloudflare/cloudflared/h2mux"
2020-10-09 00:12:29 +00:00
"github.com/cloudflare/cloudflared/ingress"
2020-04-29 20:51:32 +00:00
"github.com/cloudflare/cloudflared/logger"
2019-03-04 19:48:56 +00:00
"github.com/cloudflare/cloudflared/signal"
2018-05-01 23:45:06 +00:00
"github.com/cloudflare/cloudflared/tunnelrpc"
2020-06-17 18:33:55 +00:00
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
2018-05-01 23:45:06 +00:00
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
"github.com/cloudflare/cloudflared/validation"
"github.com/cloudflare/cloudflared/websocket"
)
const (
dialTimeout = 15 * time . Second
2019-04-02 23:12:09 +00:00
openStreamTimeout = 30 * time . Second
2019-12-04 17:22:08 +00:00
muxerTimeout = 5 * time . Second
2018-05-01 23:45:06 +00:00
lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
TagHeaderNamePrefix = "Cf-Warp-Tag-"
DuplicateConnectionError = "EDUPCONN"
2020-03-31 13:59:00 +00:00
FeatureSerializedHeaders = "serialized_headers"
FeatureQuickReconnects = "quick_reconnects"
2018-05-01 23:45:06 +00:00
)
2020-09-28 09:10:30 +00:00
type rpcName string
2020-01-28 16:43:37 +00:00
const (
2020-09-28 09:10:30 +00:00
register rpcName = "register"
reconnect rpcName = "reconnect"
unregister rpcName = "unregister"
authenticate rpcName = " authenticate"
2020-01-28 16:43:37 +00:00
)
2018-05-01 23:45:06 +00:00
type TunnelConfig struct {
2020-06-25 18:25:39 +00:00
BuildInfo * buildinfo . BuildInfo
ClientID string
ClientTlsConfig * tls . Config
CloseConnOnce * sync . Once // Used to close connectedSignal no more than once
CompressionQuality uint64
EdgeAddrs [ ] string
GracePeriod time . Duration
HAConnections int
HTTPTransport http . RoundTripper
HeartbeatInterval time . Duration
Hostname string
HTTPHostHeader string
IncidentLookup IncidentLookup
IsAutoupdated bool
IsFreeTunnel bool
LBPool string
Logger logger . Service
TransportLogger logger . Service
MaxHeartbeats uint64
Metrics * TunnelMetrics
MetricsUpdateFreq time . Duration
NoChunkedEncoding bool
OriginCert [ ] byte
ReportedVersion string
Retries uint
RunFromTerminal bool
Tags [ ] tunnelpogs . Tag
TlsConfig * tls . Config
WSGI bool
2019-02-14 10:40:54 +00:00
// OriginUrl may not be used if a user specifies a unix socket.
OriginUrl string
2019-12-04 17:22:08 +00:00
// feature-flag to use new edge reconnect tokens
UseReconnectToken bool
2020-06-25 18:25:39 +00:00
NamedTunnel * NamedTunnelConfig
ReplaceExisting bool
2020-07-24 22:17:17 +00:00
TunnelEventChan chan <- ui . TunnelEvent
2020-10-15 17:41:50 +00:00
IngressRules ingress . Ingress
2018-05-01 23:45:06 +00:00
}
type dupConnRegisterTunnelError struct { }
2020-06-25 18:25:39 +00:00
var errDuplicationConnection = & dupConnRegisterTunnelError { }
2018-05-01 23:45:06 +00:00
func ( e dupConnRegisterTunnelError ) Error ( ) string {
2020-07-07 21:35:44 +00:00
return "already connected to this server, trying another address"
2018-05-01 23:45:06 +00:00
}
type muxerShutdownError struct { }
func ( e muxerShutdownError ) Error ( ) string {
return "muxer shutdown"
}
// RegisterTunnel error from server
type serverRegisterTunnelError struct {
cause error
permanent bool
}
func ( e serverRegisterTunnelError ) Error ( ) string {
return e . cause . Error ( )
}
// RegisterTunnel error from client
type clientRegisterTunnelError struct {
cause error
}
2020-09-28 09:10:30 +00:00
func newRPCError ( cause error , counter * prometheus . CounterVec , name rpcName ) clientRegisterTunnelError {
2020-01-28 17:29:33 +00:00
counter . WithLabelValues ( cause . Error ( ) , string ( name ) ) . Inc ( )
2019-03-15 23:46:53 +00:00
return clientRegisterTunnelError { cause : cause }
}
2018-05-01 23:45:06 +00:00
func ( e clientRegisterTunnelError ) Error ( ) string {
return e . cause . Error ( )
}
2019-12-04 17:22:08 +00:00
func ( c * TunnelConfig ) muxerConfig ( handler h2mux . MuxedStreamHandler ) h2mux . MuxerConfig {
return h2mux . MuxerConfig {
Timeout : muxerTimeout ,
Handler : handler ,
IsClient : true ,
HeartbeatInterval : c . HeartbeatInterval ,
MaxHeartbeats : c . MaxHeartbeats ,
2020-04-29 20:51:32 +00:00
Logger : c . TransportLogger ,
2019-12-04 17:22:08 +00:00
CompressionQuality : h2mux . CompressionSetting ( c . CompressionQuality ) ,
}
}
2018-10-08 19:20:28 +00:00
func ( c * TunnelConfig ) RegistrationOptions ( connectionID uint8 , OriginLocalIP string , uuid uuid . UUID ) * tunnelpogs . RegistrationOptions {
2018-05-01 23:45:06 +00:00
policy := tunnelrpc . ExistingTunnelPolicy_balance
if c . HAConnections <= 1 && c . LBPool == "" {
policy = tunnelrpc . ExistingTunnelPolicy_disconnect
}
return & tunnelpogs . RegistrationOptions {
ClientID : c . ClientID ,
Version : c . ReportedVersion ,
OS : fmt . Sprintf ( "%s_%s" , c . BuildInfo . GoOS , c . BuildInfo . GoArch ) ,
ExistingTunnelPolicy : policy ,
PoolName : c . LBPool ,
Tags : c . Tags ,
ConnectionID : connectionID ,
OriginLocalIP : OriginLocalIP ,
IsAutoupdated : c . IsAutoupdated ,
RunFromTerminal : c . RunFromTerminal ,
CompressionQuality : c . CompressionQuality ,
2018-10-08 19:20:28 +00:00
UUID : uuid . String ( ) ,
2020-03-31 13:59:00 +00:00
Features : c . SupportedFeatures ( ) ,
2018-05-01 23:45:06 +00:00
}
}
2020-07-31 15:22:23 +00:00
func ( c * TunnelConfig ) ConnectionOptions ( originLocalAddr string , numPreviousAttempts uint8 ) * tunnelpogs . ConnectionOptions {
2020-06-25 18:25:39 +00:00
// attempt to parse out origin IP, but don't fail since it's informational field
host , _ , _ := net . SplitHostPort ( originLocalAddr )
originIP := net . ParseIP ( host )
return & tunnelpogs . ConnectionOptions {
2020-07-31 15:22:23 +00:00
Client : c . NamedTunnel . Client ,
OriginLocalIP : originIP ,
ReplaceExisting : c . ReplaceExisting ,
CompressionQuality : uint8 ( c . CompressionQuality ) ,
NumPreviousAttempts : numPreviousAttempts ,
2020-06-25 18:25:39 +00:00
}
}
2020-03-31 13:59:00 +00:00
func ( c * TunnelConfig ) SupportedFeatures ( ) [ ] string {
2020-06-25 18:25:39 +00:00
features := [ ] string { FeatureSerializedHeaders }
if c . NamedTunnel == nil {
features = append ( features , FeatureQuickReconnects )
2020-03-31 13:59:00 +00:00
}
2020-06-25 18:25:39 +00:00
return features
2020-03-31 13:59:00 +00:00
}
2020-09-01 16:06:00 +00:00
func ( c * TunnelConfig ) IsTrialTunnel ( ) bool {
return c . Hostname == ""
}
2020-06-17 18:33:55 +00:00
type NamedTunnelConfig struct {
2020-06-25 18:25:39 +00:00
Auth pogs . TunnelAuth
ID uuid . UUID
Client pogs . ClientInfo
2020-06-17 18:33:55 +00:00
}
2020-06-25 18:25:39 +00:00
func StartTunnelDaemon ( ctx context . Context , config * TunnelConfig , connectedSignal * signal . Signal , cloudflaredID uuid . UUID , reconnectCh chan ReconnectSignal ) error {
s , err := NewSupervisor ( config , cloudflaredID )
2019-12-13 23:05:21 +00:00
if err != nil {
return err
}
2020-03-19 15:38:28 +00:00
return s . Run ( ctx , connectedSignal , reconnectCh )
2018-05-01 23:45:06 +00:00
}
func ServeTunnelLoop ( ctx context . Context ,
2020-08-18 10:14:14 +00:00
credentialManager * reconnectCredentialManager ,
2018-05-01 23:45:06 +00:00
config * TunnelConfig ,
addr * net . TCPAddr ,
2020-06-25 18:25:39 +00:00
connectionIndex uint8 ,
2019-03-04 19:48:56 +00:00
connectedSignal * signal . Signal ,
2020-06-17 18:33:55 +00:00
cloudflaredUUID uuid . UUID ,
2020-02-24 17:06:19 +00:00
bufferPool * buffer . Pool ,
2020-04-30 05:02:08 +00:00
reconnectCh chan ReconnectSignal ,
2018-05-01 23:45:06 +00:00
) error {
config . Metrics . incrementHaConnections ( )
defer config . Metrics . decrementHaConnections ( )
backoff := BackoffHandler { MaxRetries : config . Retries }
connectedFuse := h2mux . NewBooleanFuse ( )
go func ( ) {
if connectedFuse . Await ( ) {
2019-03-04 19:48:56 +00:00
connectedSignal . Notify ( )
2018-05-01 23:45:06 +00:00
}
} ( )
// Ensure the above goroutine will terminate if we return without connecting
defer connectedFuse . Fuse ( false )
for {
2019-11-04 11:11:54 +00:00
err , recoverable := ServeTunnel (
ctx ,
2019-12-06 21:32:15 +00:00
credentialManager ,
2019-11-04 11:11:54 +00:00
config ,
2020-04-29 20:51:32 +00:00
config . Logger ,
2020-06-25 18:25:39 +00:00
addr , connectionIndex ,
2019-11-04 11:11:54 +00:00
connectedFuse ,
& backoff ,
2020-06-17 18:33:55 +00:00
cloudflaredUUID ,
2020-02-24 17:06:19 +00:00
bufferPool ,
2020-03-19 15:38:28 +00:00
reconnectCh ,
2019-11-04 11:11:54 +00:00
)
2018-05-01 23:45:06 +00:00
if recoverable {
if duration , ok := backoff . GetBackoffDuration ( ctx ) ; ok {
2020-07-24 22:17:17 +00:00
if config . TunnelEventChan != nil {
config . TunnelEventChan <- ui . TunnelEvent { Index : connectionIndex , EventType : ui . Reconnecting }
2020-08-10 23:09:02 +00:00
}
2020-06-25 18:25:39 +00:00
config . Logger . Infof ( "Retrying connection %d in %s seconds" , connectionIndex , duration )
2018-05-01 23:45:06 +00:00
backoff . Backoff ( ctx )
continue
}
}
return err
}
}
func ServeTunnel (
ctx context . Context ,
2020-08-18 10:14:14 +00:00
credentialManager * reconnectCredentialManager ,
2018-05-01 23:45:06 +00:00
config * TunnelConfig ,
2020-04-29 20:51:32 +00:00
logger logger . Service ,
2018-05-01 23:45:06 +00:00
addr * net . TCPAddr ,
2020-06-25 18:25:39 +00:00
connectionIndex uint8 ,
2018-05-01 23:45:06 +00:00
connectedFuse * h2mux . BooleanFuse ,
backoff * BackoffHandler ,
2020-06-17 18:33:55 +00:00
cloudflaredUUID uuid . UUID ,
2020-02-24 17:06:19 +00:00
bufferPool * buffer . Pool ,
2020-04-30 05:02:08 +00:00
reconnectCh chan ReconnectSignal ,
2018-05-01 23:45:06 +00:00
) ( err error , recoverable bool ) {
// Treat panics as recoverable errors
defer func ( ) {
if r := recover ( ) ; r != nil {
var ok bool
err , ok = r . ( error )
if ! ok {
err = fmt . Errorf ( "ServeTunnel: %v" , r )
}
recoverable = true
}
} ( )
2020-07-29 22:48:27 +00:00
// If launch-ui flag is set, send disconnect msg
2020-07-24 22:17:17 +00:00
if config . TunnelEventChan != nil {
2020-07-29 22:48:27 +00:00
defer func ( ) {
2020-07-24 22:17:17 +00:00
config . TunnelEventChan <- ui . TunnelEvent { Index : connectionIndex , EventType : ui . Disconnected }
2020-07-29 22:48:27 +00:00
} ( )
}
2020-06-25 18:25:39 +00:00
connectionTag := uint8ToString ( connectionIndex )
2018-05-01 23:45:06 +00:00
// Returns error from parsing the origin URL or handshake errors
2020-06-25 18:25:39 +00:00
handler , originLocalAddr , err := NewTunnelHandler ( ctx , config , addr , connectionIndex , bufferPool )
2018-05-01 23:45:06 +00:00
if err != nil {
switch err . ( type ) {
2019-11-21 18:10:44 +00:00
case connection . DialError :
2020-06-25 18:25:39 +00:00
logger . Errorf ( "Connection %d unable to dial edge: %s" , connectionIndex , err )
2018-05-01 23:45:06 +00:00
case h2mux . MuxerHandshakeError :
2020-06-25 18:25:39 +00:00
logger . Errorf ( "Connection %d handshake with edge server failed: %s" , connectionIndex , err )
2018-05-01 23:45:06 +00:00
default :
2020-06-25 18:25:39 +00:00
logger . Errorf ( "Connection %d failed: %s" , connectionIndex , err )
2018-05-01 23:45:06 +00:00
return err , false
}
return err , true
}
errGroup , serveCtx := errgroup . WithContext ( ctx )
2019-12-06 21:32:15 +00:00
errGroup . Go ( func ( ) ( err error ) {
defer func ( ) {
if err == nil {
connectedFuse . Fuse ( true )
backoff . SetGracePeriod ( )
}
} ( )
2020-06-25 18:25:39 +00:00
if config . NamedTunnel != nil {
2020-07-31 15:22:23 +00:00
return RegisterConnection ( ctx , handler . muxer , config , connectionIndex , originLocalAddr , uint8 ( backoff . retries ) )
2020-06-25 18:25:39 +00:00
}
2019-12-06 21:32:15 +00:00
if config . UseReconnectToken && connectedFuse . Value ( ) {
2020-08-18 10:14:14 +00:00
err := ReconnectTunnel ( serveCtx , handler . muxer , config , logger , connectionIndex , originLocalAddr , cloudflaredUUID , credentialManager )
if err == nil {
return nil
2019-12-06 21:32:15 +00:00
}
// log errors and proceed to RegisterTunnel
2020-08-18 10:14:14 +00:00
logger . Errorf ( "Couldn't reconnect connection %d. Reregistering it instead. Error was: %v" , connectionIndex , err )
2018-05-01 23:45:06 +00:00
}
2020-06-25 18:25:39 +00:00
return RegisterTunnel ( serveCtx , credentialManager , handler . muxer , config , logger , connectionIndex , originLocalAddr , cloudflaredUUID )
2018-05-01 23:45:06 +00:00
} )
errGroup . Go ( func ( ) error {
updateMetricsTickC := time . Tick ( config . MetricsUpdateFreq )
for {
select {
case <- serveCtx . Done ( ) :
// UnregisterTunnel blocks until the RPC call returns
2020-05-05 22:56:39 +00:00
if connectedFuse . Value ( ) {
2020-06-25 18:25:39 +00:00
if config . NamedTunnel != nil {
_ = UnregisterConnection ( ctx , handler . muxer , config )
} else {
2020-09-28 09:10:30 +00:00
_ = UnregisterTunnel ( handler . muxer , config )
2020-06-25 18:25:39 +00:00
}
2020-05-05 22:56:39 +00:00
}
2018-05-01 23:45:06 +00:00
handler . muxer . Shutdown ( )
2020-06-25 18:25:39 +00:00
return nil
2018-05-01 23:45:06 +00:00
case <- updateMetricsTickC :
handler . UpdateMetrics ( connectionTag )
}
}
} )
2020-03-19 15:38:28 +00:00
errGroup . Go ( func ( ) error {
2020-04-30 05:02:08 +00:00
for {
select {
case reconnect := <- reconnectCh :
return & reconnect
case <- serveCtx . Done ( ) :
return nil
}
2020-03-19 22:48:42 +00:00
}
2020-03-19 15:38:28 +00:00
} )
2018-05-01 23:45:06 +00:00
errGroup . Go ( func ( ) error {
// All routines should stop when muxer finish serving. When muxer is shutdown
// gracefully, it doesn't return an error, so we need to return errMuxerShutdown
// here to notify other routines to stop
err := handler . muxer . Serve ( serveCtx )
if err == nil {
return muxerShutdownError { }
}
return err
} )
err = errGroup . Wait ( )
if err != nil {
2020-07-07 21:35:44 +00:00
switch err := err . ( type ) {
case * dupConnRegisterTunnelError :
// don't retry this connection anymore, let supervisor pick new a address
return err , false
case * serverRegisterTunnelError :
logger . Errorf ( "Register tunnel error from server side: %s" , err . cause )
2018-05-01 23:45:06 +00:00
// Don't send registration error return from server to Sentry. They are
// logged on server side
2019-01-10 20:55:44 +00:00
if incidents := config . IncidentLookup . ActiveIncidents ( ) ; len ( incidents ) > 0 {
logger . Error ( activeIncidentsMsg ( incidents ) )
}
2020-07-07 21:35:44 +00:00
return err . cause , ! err . permanent
case * clientRegisterTunnelError :
logger . Errorf ( "Register tunnel error on client side: %s" , err . cause )
2018-05-01 23:45:06 +00:00
return err , true
2020-07-07 21:35:44 +00:00
case * muxerShutdownError :
2020-04-30 05:02:08 +00:00
logger . Info ( "Muxer shutdown" )
return err , true
case * ReconnectSignal :
2020-07-07 21:35:44 +00:00
logger . Infof ( "Restarting connection %d due to reconnect signal in %d seconds" , connectionIndex , err . Delay )
err . DelayBeforeReconnect ( )
2018-05-01 23:45:06 +00:00
return err , true
default :
2020-07-07 21:35:44 +00:00
if err == context . Canceled {
logger . Debugf ( "Serve tunnel error: %s" , err )
return err , false
}
2020-04-29 20:51:32 +00:00
logger . Errorf ( "Serve tunnel error: %s" , err )
2018-05-01 23:45:06 +00:00
return err , true
}
}
return nil , true
}
2020-06-25 18:25:39 +00:00
func RegisterConnection (
ctx context . Context ,
muxer * h2mux . Muxer ,
config * TunnelConfig ,
connectionIndex uint8 ,
originLocalAddr string ,
2020-07-31 15:22:23 +00:00
numPreviousAttempts uint8 ,
2020-06-25 18:25:39 +00:00
) error {
const registerConnection = "registerConnection"
config . TransportLogger . Debug ( "initiating RPC stream for RegisterConnection" )
2020-09-28 09:10:30 +00:00
rpcClient , err := newTunnelRPCClient ( ctx , muxer , config , registerConnection )
2020-06-25 18:25:39 +00:00
if err != nil {
2020-09-28 09:10:30 +00:00
return err
2020-06-25 18:25:39 +00:00
}
2020-09-28 09:10:30 +00:00
defer rpcClient . Close ( )
2020-06-25 18:25:39 +00:00
2020-09-28 09:10:30 +00:00
conn , err := rpcClient . RegisterConnection (
2020-06-25 18:25:39 +00:00
ctx ,
config . NamedTunnel . Auth ,
config . NamedTunnel . ID ,
connectionIndex ,
2020-07-31 15:22:23 +00:00
config . ConnectionOptions ( originLocalAddr , numPreviousAttempts ) ,
2020-06-25 18:25:39 +00:00
)
if err != nil {
if err . Error ( ) == DuplicateConnectionError {
config . Metrics . regFail . WithLabelValues ( "dup_edge_conn" , registerConnection ) . Inc ( )
return errDuplicationConnection
}
config . Metrics . regFail . WithLabelValues ( "server_error" , registerConnection ) . Inc ( )
return serverRegistrationErrorFromRPC ( err )
}
config . Metrics . regSuccess . WithLabelValues ( registerConnection ) . Inc ( )
config . Logger . Infof ( "Connection %d registered with %s using ID %s" , connectionIndex , conn . Location , conn . UUID )
2020-07-29 22:48:27 +00:00
// If launch-ui flag is set, send connect msg
2020-07-24 22:17:17 +00:00
if config . TunnelEventChan != nil {
config . TunnelEventChan <- ui . TunnelEvent { Index : connectionIndex , EventType : ui . Connected , Location : conn . Location }
2020-07-29 22:48:27 +00:00
}
2020-06-25 18:25:39 +00:00
return nil
}
func serverRegistrationErrorFromRPC ( err error ) * serverRegisterTunnelError {
if retryable , ok := err . ( * tunnelpogs . RetryableError ) ; ok {
return & serverRegisterTunnelError {
cause : retryable . Unwrap ( ) ,
permanent : false ,
}
}
return & serverRegisterTunnelError {
cause : err ,
permanent : true ,
}
}
func UnregisterConnection (
ctx context . Context ,
muxer * h2mux . Muxer ,
config * TunnelConfig ,
) error {
config . TransportLogger . Debug ( "initiating RPC stream for UnregisterConnection" )
2020-09-28 09:10:30 +00:00
rpcClient , err := newTunnelRPCClient ( ctx , muxer , config , register )
2020-06-25 18:25:39 +00:00
if err != nil {
// RPC stream open error
2020-09-28 09:10:30 +00:00
return err
2020-06-25 18:25:39 +00:00
}
2020-09-28 09:10:30 +00:00
defer rpcClient . Close ( )
2020-06-25 18:25:39 +00:00
2020-09-28 09:10:30 +00:00
return rpcClient . UnregisterConnection ( ctx )
2020-06-25 18:25:39 +00:00
}
2018-11-26 17:32:27 +00:00
func RegisterTunnel (
ctx context . Context ,
2020-08-18 10:14:14 +00:00
credentialManager * reconnectCredentialManager ,
2018-11-26 17:32:27 +00:00
muxer * h2mux . Muxer ,
config * TunnelConfig ,
2020-04-29 20:51:32 +00:00
logger logger . Service ,
2018-11-26 17:32:27 +00:00
connectionID uint8 ,
originLocalIP string ,
2019-02-01 20:11:12 +00:00
uuid uuid . UUID ,
2018-11-26 17:32:27 +00:00
) error {
2019-01-28 20:11:56 +00:00
config . TransportLogger . Debug ( "initiating RPC stream to register" )
2020-07-24 22:17:17 +00:00
if config . TunnelEventChan != nil {
config . TunnelEventChan <- ui . TunnelEvent { EventType : ui . RegisteringTunnel }
}
2020-09-28 09:10:30 +00:00
rpcClient , err := newTunnelRPCClient ( ctx , muxer , config , register )
2018-05-01 23:45:06 +00:00
if err != nil {
2020-09-28 09:10:30 +00:00
return err
2018-05-01 23:45:06 +00:00
}
2020-09-28 09:10:30 +00:00
defer rpcClient . Close ( )
2018-05-01 23:45:06 +00:00
// Request server info without blocking tunnel registration; must use capnp library directly.
2020-09-28 09:10:30 +00:00
serverInfoPromise := tunnelrpc . TunnelServer { Client : rpcClient . Client } . GetServerInfo ( ctx , func ( tunnelrpc . TunnelServer_getServerInfo_Params ) error {
2018-05-01 23:45:06 +00:00
return nil
} )
2020-08-17 22:43:20 +00:00
LogServerInfo ( serverInfoPromise . Result ( ) , connectionID , config . Metrics , logger , config . TunnelEventChan )
2020-09-28 09:10:30 +00:00
registration := rpcClient . RegisterTunnel (
2018-05-01 23:45:06 +00:00
ctx ,
config . OriginCert ,
config . Hostname ,
2018-10-08 19:20:28 +00:00
config . RegistrationOptions ( connectionID , originLocalIP , uuid ) ,
2018-05-01 23:45:06 +00:00
)
2019-11-21 16:56:04 +00:00
if registrationErr := registration . DeserializeError ( ) ; registrationErr != nil {
2018-05-01 23:45:06 +00:00
// RegisterTunnel RPC failure
2020-01-28 16:43:37 +00:00
return processRegisterTunnelError ( registrationErr , config . Metrics , register )
2018-05-01 23:45:06 +00:00
}
2020-07-24 22:17:17 +00:00
// Send free tunnel URL to UI
if config . TunnelEventChan != nil {
config . TunnelEventChan <- ui . TunnelEvent { EventType : ui . SetUrl , Url : registration . Url }
}
2020-08-18 10:14:14 +00:00
credentialManager . SetEventDigest ( connectionID , registration . EventDigest )
2020-03-13 22:31:03 +00:00
return processRegistrationSuccess ( config , logger , connectionID , registration , register , credentialManager )
2019-12-06 21:32:15 +00:00
}
2020-03-13 22:31:03 +00:00
func processRegistrationSuccess (
config * TunnelConfig ,
2020-04-29 20:51:32 +00:00
logger logger . Service ,
2020-03-13 22:31:03 +00:00
connectionID uint8 ,
registration * tunnelpogs . TunnelRegistration ,
2020-09-28 09:10:30 +00:00
name rpcName ,
2020-08-18 10:14:14 +00:00
credentialManager * reconnectCredentialManager ,
2020-03-13 22:31:03 +00:00
) error {
2018-05-01 23:45:06 +00:00
for _ , logLine := range registration . LogLines {
2019-10-14 23:36:34 +00:00
logger . Info ( logLine )
2018-05-01 23:45:06 +00:00
}
2019-02-19 23:01:30 +00:00
2018-10-08 19:20:28 +00:00
if registration . TunnelID != "" {
2018-11-26 17:32:27 +00:00
config . Metrics . tunnelsHA . AddTunnelID ( connectionID , registration . TunnelID )
2019-10-14 23:36:34 +00:00
logger . Infof ( "Each HA connection's tunnel IDs: %v" , config . Metrics . tunnelsHA . String ( ) )
2018-10-08 19:20:28 +00:00
}
2020-08-26 19:19:24 +00:00
// Print out the user's trial zone URL in a nice box (if they requested and got one and UI flag is not set)
if config . TunnelEventChan == nil {
2020-09-01 16:06:00 +00:00
if config . IsTrialTunnel ( ) {
if registrationURL , err := url . Parse ( registration . Url ) ; err == nil {
for _ , line := range asciiBox ( trialZoneMsg ( registrationURL . String ( ) ) , 2 ) {
2020-08-26 19:19:24 +00:00
logger . Info ( line )
}
} else {
logger . Error ( "Failed to connect tunnel, please try again." )
return fmt . Errorf ( "empty URL in response from Cloudflare edge" )
2018-10-09 00:13:08 +00:00
}
2018-10-08 19:20:28 +00:00
}
}
2020-03-13 22:31:03 +00:00
credentialManager . SetConnDigest ( connectionID , registration . ConnDigest )
2019-04-16 20:26:31 +00:00
config . Metrics . userHostnamesCounts . WithLabelValues ( registration . Url ) . Inc ( )
2019-10-14 23:36:34 +00:00
logger . Infof ( "Route propagating, it may take up to 1 minute for your new route to become functional" )
2020-01-28 16:43:37 +00:00
config . Metrics . regSuccess . WithLabelValues ( string ( name ) ) . Inc ( )
2018-05-01 23:45:06 +00:00
return nil
}
2020-09-28 09:10:30 +00:00
func processRegisterTunnelError ( err tunnelpogs . TunnelRegistrationError , metrics * TunnelMetrics , name rpcName ) error {
2019-11-21 16:56:04 +00:00
if err . Error ( ) == DuplicateConnectionError {
2020-01-28 16:43:37 +00:00
metrics . regFail . WithLabelValues ( "dup_edge_conn" , string ( name ) ) . Inc ( )
2020-06-25 18:25:39 +00:00
return errDuplicationConnection
2019-02-19 23:01:30 +00:00
}
2020-01-28 16:43:37 +00:00
metrics . regFail . WithLabelValues ( "server_error" , string ( name ) ) . Inc ( )
2019-02-19 23:01:30 +00:00
return serverRegisterTunnelError {
2020-06-25 18:25:39 +00:00
cause : err ,
2019-11-21 16:56:04 +00:00
permanent : err . IsPermanent ( ) ,
2019-02-19 23:01:30 +00:00
}
}
2020-09-28 09:10:30 +00:00
func UnregisterTunnel ( muxer * h2mux . Muxer , config * TunnelConfig ) error {
config . TransportLogger . Debug ( "initiating RPC stream to unregister" )
2019-04-02 23:12:09 +00:00
ctx := context . Background ( )
2020-09-28 09:10:30 +00:00
rpcClient , err := newTunnelRPCClient ( ctx , muxer , config , unregister )
2018-05-01 23:45:06 +00:00
if err != nil {
// RPC stream open error
return err
}
2020-09-28 09:10:30 +00:00
defer rpcClient . Close ( )
2020-05-05 22:56:39 +00:00
2018-05-01 23:45:06 +00:00
// gracePeriod is encoded in int64 using capnproto
2020-09-28 09:10:30 +00:00
return rpcClient . UnregisterTunnel ( ctx , config . GracePeriod . Nanoseconds ( ) )
2018-05-01 23:45:06 +00:00
}
func LogServerInfo (
promise tunnelrpc . ServerInfo_Promise ,
connectionID uint8 ,
metrics * TunnelMetrics ,
2020-04-29 20:51:32 +00:00
logger logger . Service ,
2020-08-17 22:43:20 +00:00
tunnelEventChan chan <- ui . TunnelEvent ,
2018-05-01 23:45:06 +00:00
) {
serverInfoMessage , err := promise . Struct ( )
if err != nil {
2020-04-29 20:51:32 +00:00
logger . Errorf ( "Failed to retrieve server information: %s" , err )
2018-05-01 23:45:06 +00:00
return
}
serverInfo , err := tunnelpogs . UnmarshalServerInfo ( serverInfoMessage )
if err != nil {
2020-04-29 20:51:32 +00:00
logger . Errorf ( "Failed to retrieve server information: %s" , err )
2018-05-01 23:45:06 +00:00
return
}
2020-08-17 22:43:20 +00:00
// If launch-ui flag is set, send connect msg
if tunnelEventChan != nil {
tunnelEventChan <- ui . TunnelEvent { Index : connectionID , EventType : ui . Connected , Location : serverInfo . LocationName }
}
2018-05-01 23:45:06 +00:00
logger . Infof ( "Connected to %s" , serverInfo . LocationName )
metrics . registerServerLocation ( uint8ToString ( connectionID ) , serverInfo . LocationName )
}
type TunnelHandler struct {
2019-08-30 03:55:54 +00:00
originUrl string
2020-10-15 17:41:50 +00:00
ingressRules ingress . Ingress
2019-08-30 03:55:54 +00:00
httpHostHeader string
muxer * h2mux . Muxer
httpClient http . RoundTripper
tlsConfig * tls . Config
tags [ ] tunnelpogs . Tag
metrics * TunnelMetrics
2018-05-01 23:45:06 +00:00
// connectionID is only used by metrics, and prometheus requires labels to be string
connectionID string
2020-04-29 20:51:32 +00:00
logger logger . Service
2018-05-01 23:45:06 +00:00
noChunkedEncoding bool
2020-02-24 17:06:19 +00:00
bufferPool * buffer . Pool
2018-05-01 23:45:06 +00:00
}
// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
func NewTunnelHandler ( ctx context . Context ,
config * TunnelConfig ,
2019-11-21 18:10:44 +00:00
addr * net . TCPAddr ,
2018-05-01 23:45:06 +00:00
connectionID uint8 ,
2020-02-24 17:06:19 +00:00
bufferPool * buffer . Pool ,
2018-05-01 23:45:06 +00:00
) ( * TunnelHandler , string , error ) {
2020-10-12 17:54:15 +00:00
// Check single-origin config
var originURL string
var err error
2020-10-15 17:41:50 +00:00
if config . IngressRules . IsEmpty ( ) {
2020-10-12 17:54:15 +00:00
originURL , err = validation . ValidateUrl ( config . OriginUrl )
if err != nil {
return nil , "" , fmt . Errorf ( "unable to parse origin URL %#v" , originURL )
}
2018-05-01 23:45:06 +00:00
}
2020-10-12 17:54:15 +00:00
2018-05-01 23:45:06 +00:00
h := & TunnelHandler {
originUrl : originURL ,
2020-10-12 17:54:15 +00:00
ingressRules : config . IngressRules ,
2019-08-30 03:55:54 +00:00
httpHostHeader : config . HTTPHostHeader ,
2018-05-01 23:45:06 +00:00
httpClient : config . HTTPTransport ,
tlsConfig : config . ClientTlsConfig ,
tags : config . Tags ,
metrics : config . Metrics ,
connectionID : uint8ToString ( connectionID ) ,
logger : config . Logger ,
noChunkedEncoding : config . NoChunkedEncoding ,
2020-02-24 17:06:19 +00:00
bufferPool : bufferPool ,
2018-05-01 23:45:06 +00:00
}
if h . httpClient == nil {
h . httpClient = http . DefaultTransport
}
2019-11-21 18:10:44 +00:00
edgeConn , err := connection . DialEdge ( ctx , dialTimeout , config . TlsConfig , addr )
2018-05-01 23:45:06 +00:00
if err != nil {
2019-11-21 18:10:44 +00:00
return nil , "" , err
2018-05-01 23:45:06 +00:00
}
// Establish a muxed connection with the edge
// Client mux handshake with agent server
2019-12-04 17:22:08 +00:00
h . muxer , err = h2mux . Handshake ( edgeConn , edgeConn , config . muxerConfig ( h ) , h . metrics . activeStreams )
2018-05-01 23:45:06 +00:00
if err != nil {
2019-12-24 05:11:00 +00:00
return nil , "" , errors . Wrap ( err , "h2mux handshake with edge error" )
2018-05-01 23:45:06 +00:00
}
2019-11-21 18:10:44 +00:00
return h , edgeConn . LocalAddr ( ) . String ( ) , nil
2018-05-01 23:45:06 +00:00
}
func ( h * TunnelHandler ) AppendTagHeaders ( r * http . Request ) {
for _ , tag := range h . tags {
r . Header . Add ( TagHeaderNamePrefix + tag . Name , tag . Value )
}
}
func ( h * TunnelHandler ) ServeStream ( stream * h2mux . MuxedStream ) error {
h . metrics . incrementRequests ( h . connectionID )
2019-04-25 23:13:06 +00:00
defer h . metrics . decrementConcurrentRequests ( h . connectionID )
req , reqErr := h . createRequest ( stream )
if reqErr != nil {
2020-04-10 19:26:09 +00:00
h . writeErrorResponse ( stream , reqErr )
2019-04-25 23:13:06 +00:00
return reqErr
}
2020-05-29 09:21:03 +00:00
cfRay := findCfRayHeader ( req )
lbProbe := isLBProbeRequest ( req )
2019-04-25 23:13:06 +00:00
h . logRequest ( req , cfRay , lbProbe )
var resp * http . Response
var respErr error
if websocket . IsWebSocketUpgrade ( req ) {
resp , respErr = h . serveWebsocket ( stream , req )
} else {
resp , respErr = h . serveHTTP ( stream , req )
}
if respErr != nil {
2020-04-10 19:26:09 +00:00
h . writeErrorResponse ( stream , respErr )
2019-04-25 23:13:06 +00:00
return respErr
}
h . logResponseOk ( resp , cfRay , lbProbe )
return nil
}
func ( h * TunnelHandler ) createRequest ( stream * h2mux . MuxedStream ) ( * http . Request , error ) {
2018-05-01 23:45:06 +00:00
req , err := http . NewRequest ( "GET" , h . originUrl , h2mux . MuxedStreamReader { MuxedStream : stream } )
if err != nil {
2019-04-25 23:13:06 +00:00
return nil , errors . Wrap ( err , "Unexpected error from http.NewRequest" )
2018-05-01 23:45:06 +00:00
}
2020-03-06 13:49:09 +00:00
err = h2mux . H2RequestHeadersToH1Request ( stream . Headers , req )
2018-05-01 23:45:06 +00:00
if err != nil {
2019-04-25 23:13:06 +00:00
return nil , errors . Wrap ( err , "invalid request received" )
2018-05-01 23:45:06 +00:00
}
h . AppendTagHeaders ( req )
2020-10-15 17:41:50 +00:00
if ! h . ingressRules . IsEmpty ( ) {
ruleNumber := h . ingressRules . FindMatchingRule ( req . Host , req . URL . Path )
destination := h . ingressRules . Rules [ ruleNumber ] . Service
2020-10-12 17:54:15 +00:00
req . URL . Host = destination . Host
req . URL . Scheme = destination . Scheme
}
2019-04-25 23:13:06 +00:00
return req , nil
}
2018-05-01 23:45:06 +00:00
2019-04-25 23:13:06 +00:00
func ( h * TunnelHandler ) serveWebsocket ( stream * h2mux . MuxedStream , req * http . Request ) ( * http . Response , error ) {
2019-08-30 03:55:54 +00:00
if h . httpHostHeader != "" {
req . Header . Set ( "Host" , h . httpHostHeader )
req . Host = h . httpHostHeader
}
2019-04-25 23:13:06 +00:00
conn , response , err := websocket . ClientConnect ( req , h . tlsConfig )
if err != nil {
return nil , err
}
defer conn . Close ( )
2020-03-06 13:49:09 +00:00
err = stream . WriteHeaders ( h2mux . H1ResponseToH2ResponseHeaders ( response ) )
2019-04-25 23:13:06 +00:00
if err != nil {
return nil , errors . Wrap ( err , "Error writing response header" )
}
// Copy to/from stream to the undelying connection. Use the underlying
// connection because cloudflared doesn't operate on the message themselves
websocket . Stream ( conn . UnderlyingConn ( ) , stream )
2020-03-31 14:56:22 +00:00
2019-04-25 23:13:06 +00:00
return response , nil
}
func ( h * TunnelHandler ) serveHTTP ( stream * h2mux . MuxedStream , req * http . Request ) ( * http . Response , error ) {
// Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
if h . noChunkedEncoding {
req . TransferEncoding = [ ] string { "gzip" , "deflate" }
cLength , err := strconv . Atoi ( req . Header . Get ( "Content-Length" ) )
if err == nil {
req . ContentLength = int64 ( cLength )
}
}
2018-07-30 14:23:55 +00:00
2019-04-25 23:13:06 +00:00
// Request origin to keep connection alive to improve performance
req . Header . Set ( "Connection" , "keep-alive" )
2018-05-01 23:45:06 +00:00
2019-08-30 03:55:54 +00:00
if h . httpHostHeader != "" {
req . Header . Set ( "Host" , h . httpHostHeader )
req . Host = h . httpHostHeader
}
2019-04-25 23:13:06 +00:00
response , err := h . httpClient . RoundTrip ( req )
if err != nil {
return nil , errors . Wrap ( err , "Error proxying request to origin" )
}
defer response . Body . Close ( )
2018-05-01 23:45:06 +00:00
2020-04-09 20:59:15 +00:00
headers := h2mux . H1ResponseToH2ResponseHeaders ( response )
2020-04-10 19:26:09 +00:00
headers = append ( headers , h2mux . CreateResponseMetaHeader ( h2mux . ResponseMetaHeaderField , h2mux . ResponseSourceOrigin ) )
2020-04-09 20:59:15 +00:00
err = stream . WriteHeaders ( headers )
2019-04-25 23:13:06 +00:00
if err != nil {
return nil , errors . Wrap ( err , "Error writing response header" )
2018-05-01 23:45:06 +00:00
}
2019-04-25 23:13:06 +00:00
if h . isEventStream ( response ) {
h . writeEventStream ( stream , response . Body )
} else {
// Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
// compression generates dictionary on first write
2020-02-24 17:06:19 +00:00
buf := h . bufferPool . Get ( )
defer h . bufferPool . Put ( buf )
io . CopyBuffer ( stream , response . Body , buf )
2019-04-25 23:13:06 +00:00
}
return response , nil
2018-05-01 23:45:06 +00:00
}
func ( h * TunnelHandler ) writeEventStream ( stream * h2mux . MuxedStream , responseBody io . ReadCloser ) {
reader := bufio . NewReader ( responseBody )
for {
line , err := reader . ReadBytes ( '\n' )
if err != nil {
break
}
stream . Write ( line )
}
}
func ( h * TunnelHandler ) isEventStream ( response * http . Response ) bool {
if response . Header . Get ( "content-type" ) == "text/event-stream" {
h . logger . Debug ( "Detected Server-Side Events from Origin" )
return true
}
return false
}
2020-04-10 19:26:09 +00:00
func ( h * TunnelHandler ) writeErrorResponse ( stream * h2mux . MuxedStream , err error ) {
2020-04-29 20:51:32 +00:00
h . logger . Errorf ( "HTTP request error: %s" , err )
2020-04-09 20:59:15 +00:00
stream . WriteHeaders ( [ ] h2mux . Header {
{ Name : ":status" , Value : "502" } ,
2020-04-10 19:26:09 +00:00
h2mux . CreateResponseMetaHeader ( h2mux . ResponseMetaHeaderField , h2mux . ResponseSourceCloudflared ) ,
2020-04-09 20:59:15 +00:00
} )
2018-05-01 23:45:06 +00:00
stream . Write ( [ ] byte ( "502 Bad Gateway" ) )
h . metrics . incrementResponses ( h . connectionID , "502" )
}
func ( h * TunnelHandler ) logRequest ( req * http . Request , cfRay string , lbProbe bool ) {
2020-04-29 20:51:32 +00:00
logger := h . logger
2018-05-01 23:45:06 +00:00
if cfRay != "" {
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s %s %s %s" , cfRay , req . Method , req . URL , req . Proto )
2018-05-01 23:45:06 +00:00
} else if lbProbe {
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s Load Balancer health check %s %s %s" , cfRay , req . Method , req . URL , req . Proto )
2018-05-01 23:45:06 +00:00
} else {
2020-04-29 20:51:32 +00:00
logger . Infof ( "CF-RAY: %s All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s " , cfRay , req . Method , req . URL , req . Proto )
2019-01-28 20:05:59 +00:00
}
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s Request Headers %+v" , cfRay , req . Header )
2019-01-28 20:05:59 +00:00
if contentLen := req . ContentLength ; contentLen == - 1 {
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s Request Content length unknown" , cfRay )
2019-01-28 20:05:59 +00:00
} else {
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s Request content length %d" , cfRay , contentLen )
2018-05-01 23:45:06 +00:00
}
}
2019-04-25 23:13:06 +00:00
func ( h * TunnelHandler ) logResponseOk ( r * http . Response , cfRay string , lbProbe bool ) {
h . metrics . incrementResponses ( h . connectionID , "200" )
2020-04-29 20:51:32 +00:00
logger := h . logger
2018-05-01 23:45:06 +00:00
if cfRay != "" {
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s %s" , cfRay , r . Status )
2018-05-01 23:45:06 +00:00
} else if lbProbe {
2019-01-28 20:05:59 +00:00
logger . Debugf ( "Response to Load Balancer health check %s" , r . Status )
} else {
logger . Infof ( "%s" , r . Status )
}
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s Response Headers %+v" , cfRay , r . Header )
2019-01-28 20:05:59 +00:00
if contentLen := r . ContentLength ; contentLen == - 1 {
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s Response content length unknown" , cfRay )
2018-05-01 23:45:06 +00:00
} else {
2020-04-29 20:51:32 +00:00
logger . Debugf ( "CF-RAY: %s Response content length %d" , cfRay , contentLen )
2018-05-01 23:45:06 +00:00
}
}
func ( h * TunnelHandler ) UpdateMetrics ( connectionID string ) {
h . metrics . updateMuxerMetrics ( connectionID , h . muxer . Metrics ( ) )
}
func uint8ToString ( input uint8 ) string {
return strconv . FormatUint ( uint64 ( input ) , 10 )
}
2018-10-08 19:20:28 +00:00
// Print out the given lines in a nice ASCII box.
func asciiBox ( lines [ ] string , padding int ) ( box [ ] string ) {
maxLen := maxLen ( lines )
spacer := strings . Repeat ( " " , padding )
border := "+" + strings . Repeat ( "-" , maxLen + ( padding * 2 ) ) + "+"
box = append ( box , border )
for _ , line := range lines {
box = append ( box , "|" + spacer + line + strings . Repeat ( " " , maxLen - len ( line ) ) + spacer + "|" )
}
box = append ( box , border )
return
}
func maxLen ( lines [ ] string ) int {
max := 0
for _ , line := range lines {
if len ( line ) > max {
max = len ( line )
}
}
return max
}
func trialZoneMsg ( url string ) [ ] string {
return [ ] string {
"Your free tunnel has started! Visit it:" ,
" " + url ,
}
}
2019-01-10 20:55:44 +00:00
func activeIncidentsMsg ( incidents [ ] Incident ) string {
preamble := "There is an active Cloudflare incident that may be related:"
if len ( incidents ) > 1 {
preamble = "There are active Cloudflare incidents that may be related:"
}
incidentStrings := [ ] string { }
for _ , incident := range incidents {
incidentString := fmt . Sprintf ( "%s (%s)" , incident . Name , incident . URL ( ) )
incidentStrings = append ( incidentStrings , incidentString )
}
return preamble + " " + strings . Join ( incidentStrings , "; " )
}
2020-05-29 09:21:03 +00:00
func findCfRayHeader ( h1 * http . Request ) string {
return h1 . Header . Get ( "Cf-Ray" )
}
func isLBProbeRequest ( req * http . Request ) bool {
return strings . HasPrefix ( req . UserAgent ( ) , lbProbeUserAgentPrefix )
}
2020-09-28 09:10:30 +00:00
func newTunnelRPCClient ( ctx context . Context , muxer * h2mux . Muxer , config * TunnelConfig , rpcName rpcName ) ( tunnelpogs . TunnelServer_PogsClient , error ) {
openStreamCtx , openStreamCancel := context . WithTimeout ( ctx , openStreamTimeout )
defer openStreamCancel ( )
stream , err := muxer . OpenRPCStream ( openStreamCtx )
if err != nil {
return tunnelpogs . TunnelServer_PogsClient { } , err
}
rpcClient , err := connection . NewTunnelRPCClient ( ctx , stream , config . TransportLogger )
if err != nil {
// RPC stream open error
return tunnelpogs . TunnelServer_PogsClient { } , newRPCError ( err , config . Metrics . rpcFail , rpcName )
}
return rpcClient , nil
}