package streamhandler import ( "context" "fmt" "net/http" "strconv" "github.com/cloudflare/cloudflared/h2mux" "github.com/cloudflare/cloudflared/tunnelhostnamemapper" "github.com/cloudflare/cloudflared/tunnelrpc" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" "github.com/pkg/errors" "github.com/sirupsen/logrus" "zombiezen.com/go/capnproto2/rpc" ) const ( statusPseudoHeader = ":status" ) type httpErrorStatus struct { status string text []byte } var ( statusBadRequest = newHTTPErrorStatus(http.StatusBadRequest) statusNotFound = newHTTPErrorStatus(http.StatusNotFound) statusBadGateway = newHTTPErrorStatus(http.StatusBadGateway) ) func newHTTPErrorStatus(status int) *httpErrorStatus { return &httpErrorStatus{ status: strconv.Itoa(status), text: []byte(http.StatusText(status)), } } // StreamHandler handles new stream opened by the edge. The streams can be used to proxy requests or make RPC. type StreamHandler struct { // newConfigChan is a send-only channel to notify Supervisor of a new ClientConfig newConfigChan chan<- *pogs.ClientConfig // useConfigResultChan is a receive-only channel for Supervisor to communicate the result of applying a new ClientConfig useConfigResultChan <-chan *pogs.UseConfigurationResult // originMapper maps tunnel hostname to origin service tunnelHostnameMapper *tunnelhostnamemapper.TunnelHostnameMapper logger *logrus.Entry } // NewStreamHandler creates a new StreamHandler func NewStreamHandler(newConfigChan chan<- *pogs.ClientConfig, useConfigResultChan <-chan *pogs.UseConfigurationResult, logger *logrus.Logger, ) *StreamHandler { return &StreamHandler{ newConfigChan: newConfigChan, useConfigResultChan: useConfigResultChan, tunnelHostnameMapper: tunnelhostnamemapper.NewTunnelHostnameMapper(), logger: logger.WithField("subsystem", "streamHandler"), } } // UseConfiguration implements ClientService func (s *StreamHandler) UseConfiguration(ctx context.Context, config *pogs.ClientConfig) (*pogs.UseConfigurationResult, error) { select { case <-ctx.Done(): err := fmt.Errorf("Timeout while sending new config to Supervisor") s.logger.Error(err) return nil, err case s.newConfigChan <- config: } select { case <-ctx.Done(): err := fmt.Errorf("Timeout applying new configuration") s.logger.Error(err) return nil, err case result := <-s.useConfigResultChan: return result, nil } } // UpdateConfig replaces current originmapper mapping with mappings from newConfig func (s *StreamHandler) UpdateConfig(newConfig []*pogs.ReverseProxyConfig) (failedConfigs []*pogs.FailedConfig) { // Delete old configs that aren't in the `newConfig` toRemove := s.tunnelHostnameMapper.ToRemove(newConfig) for _, hostnameToRemove := range toRemove { s.tunnelHostnameMapper.Delete(hostnameToRemove) } // Add new configs that weren't in the old mapper toAdd := s.tunnelHostnameMapper.ToAdd(newConfig) for _, tunnelConfig := range toAdd { tunnelHostname := tunnelConfig.TunnelHostname originSerice, err := tunnelConfig.OriginConfig.Service() if err != nil { s.logger.WithField("tunnelHostname", tunnelHostname).WithError(err).Error("Invalid origin service config") failedConfigs = append(failedConfigs, &pogs.FailedConfig{ Config: tunnelConfig, Reason: tunnelConfig.FailReason(err), }) continue } s.tunnelHostnameMapper.Add(tunnelConfig.TunnelHostname, originSerice) s.logger.WithField("tunnelHostname", tunnelHostname).Infof("New origin service config: %v", originSerice.Summary()) } return } // ServeStream implements MuxedStreamHandler interface func (s *StreamHandler) ServeStream(stream *h2mux.MuxedStream) error { if stream.IsRPCStream() { return s.serveRPC(stream) } if err := s.serveRequest(stream); err != nil { s.logger.Error(err) return err } return nil } func (s *StreamHandler) serveRPC(stream *h2mux.MuxedStream) error { stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "200"}}) main := pogs.ClientService_ServerToClient(s) rpcLogger := s.logger.WithField("subsystem", "clientserver-rpc") rpcConn := rpc.NewConn( tunnelrpc.NewTransportLogger(rpcLogger, rpc.StreamTransport(stream)), rpc.MainInterface(main.Client), tunnelrpc.ConnLog(s.logger.WithField("subsystem", "clientserver-rpc-transport")), ) return rpcConn.Wait() } func (s *StreamHandler) serveRequest(stream *h2mux.MuxedStream) error { tunnelHostname := stream.TunnelHostname() if !tunnelHostname.IsSet() { s.writeErrorStatus(stream, statusBadRequest) return fmt.Errorf("stream doesn't have tunnelHostname") } originService, ok := s.tunnelHostnameMapper.Get(tunnelHostname) if !ok { s.writeErrorStatus(stream, statusNotFound) return fmt.Errorf("cannot map tunnel hostname %s to origin", tunnelHostname) } req, err := createRequest(stream, originService.URL()) if err != nil { s.writeErrorStatus(stream, statusBadRequest) return errors.Wrap(err, "cannot create request") } logger := s.requestLogger(req, tunnelHostname) logger.Debugf("Request Headers %+v", req.Header) resp, err := originService.Proxy(stream, req) if err != nil { s.writeErrorStatus(stream, statusBadGateway) return errors.Wrap(err, "cannot proxy request") } logger.WithField("status", resp.Status).Debugf("Response Headers %+v", resp.Header) return nil } func (s *StreamHandler) requestLogger(req *http.Request, tunnelHostname h2mux.TunnelHostname) *logrus.Entry { cfRay := FindCfRayHeader(req) lbProbe := IsLBProbeRequest(req) logger := s.logger.WithField("tunnelHostname", tunnelHostname) if cfRay != "" { logger = logger.WithField("CF-RAY", cfRay) logger.Debugf("%s %s %s", req.Method, req.URL, req.Proto) } else if lbProbe { logger.Debugf("Load Balancer health check %s %s %s", req.Method, req.URL, req.Proto) } else { logger.Warnf("Requests %v does not have CF-RAY header. Please open a support ticket with Cloudflare.", req) } return logger } func (s *StreamHandler) writeErrorStatus(stream *h2mux.MuxedStream, status *httpErrorStatus) { stream.WriteHeaders([]h2mux.Header{ { Name: statusPseudoHeader, Value: status.status, }, }) stream.Write(status.text) }