2021-08-03 09:04:02 +00:00
|
|
|
package connection
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"crypto/tls"
|
2021-08-03 07:09:56 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
2021-08-03 09:04:02 +00:00
|
|
|
"net"
|
2021-08-03 07:09:56 +00:00
|
|
|
"net/http"
|
2022-09-02 16:29:50 +00:00
|
|
|
"net/netip"
|
2021-08-03 07:09:56 +00:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
2022-08-22 22:48:45 +00:00
|
|
|
"sync/atomic"
|
2021-12-02 11:02:27 +00:00
|
|
|
"time"
|
2021-08-03 09:04:02 +00:00
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
"github.com/google/uuid"
|
2021-08-03 09:04:02 +00:00
|
|
|
"github.com/lucas-clemente/quic-go"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/rs/zerolog"
|
2022-09-09 04:42:11 +00:00
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
2021-11-14 11:18:05 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
2021-08-03 09:04:02 +00:00
|
|
|
|
2021-11-23 12:45:59 +00:00
|
|
|
"github.com/cloudflare/cloudflared/datagramsession"
|
2021-11-30 10:27:33 +00:00
|
|
|
"github.com/cloudflare/cloudflared/ingress"
|
2022-08-17 15:46:49 +00:00
|
|
|
"github.com/cloudflare/cloudflared/packet"
|
2021-08-03 09:04:02 +00:00
|
|
|
quicpogs "github.com/cloudflare/cloudflared/quic"
|
2022-04-06 23:20:29 +00:00
|
|
|
"github.com/cloudflare/cloudflared/tracing"
|
2021-08-17 14:30:02 +00:00
|
|
|
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
2021-08-03 09:04:02 +00:00
|
|
|
)
|
|
|
|
|
2021-08-03 07:09:56 +00:00
|
|
|
const (
|
|
|
|
// HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP.
|
|
|
|
HTTPHeaderKey = "HttpHeader"
|
|
|
|
// HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP.
|
|
|
|
HTTPMethodKey = "HttpMethod"
|
|
|
|
// HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP.
|
2021-12-22 18:07:44 +00:00
|
|
|
HTTPHostKey = "HttpHost"
|
2022-06-09 12:55:26 +00:00
|
|
|
|
|
|
|
QUICMetadataFlowID = "FlowID"
|
2022-08-01 12:48:33 +00:00
|
|
|
// emperically this capacity has been working well
|
|
|
|
demuxChanCapacity = 16
|
2021-08-03 07:09:56 +00:00
|
|
|
)
|
|
|
|
|
2021-08-03 09:04:02 +00:00
|
|
|
// QUICConnection represents the type that facilitates Proxying via QUIC streams.
|
|
|
|
type QUICConnection struct {
|
2022-08-01 12:48:33 +00:00
|
|
|
session quic.Connection
|
|
|
|
logger *zerolog.Logger
|
|
|
|
orchestrator Orchestrator
|
|
|
|
// sessionManager tracks active sessions. It receives datagrams from quic connection via datagramMuxer
|
|
|
|
sessionManager datagramsession.Manager
|
|
|
|
// datagramMuxer mux/demux datagrams from quic connection
|
2022-09-29 14:42:30 +00:00
|
|
|
datagramMuxer *quicpogs.DatagramMuxerV2
|
2022-09-13 13:00:54 +00:00
|
|
|
packetRouter *packet.Router
|
2022-01-04 19:00:44 +00:00
|
|
|
controlStreamHandler ControlStreamHandler
|
2022-01-05 16:01:56 +00:00
|
|
|
connOptions *tunnelpogs.ConnectionOptions
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewQUICConnection returns a new instance of QUICConnection.
|
|
|
|
func NewQUICConnection(
|
|
|
|
quicConfig *quic.Config,
|
|
|
|
edgeAddr net.Addr,
|
|
|
|
tlsConfig *tls.Config,
|
2022-02-11 10:49:06 +00:00
|
|
|
orchestrator Orchestrator,
|
2021-08-17 14:30:02 +00:00
|
|
|
connOptions *tunnelpogs.ConnectionOptions,
|
|
|
|
controlStreamHandler ControlStreamHandler,
|
2022-01-04 19:00:44 +00:00
|
|
|
logger *zerolog.Logger,
|
2022-09-20 10:39:51 +00:00
|
|
|
packetRouterConfig *packet.GlobalRouterConfig,
|
2021-08-03 09:04:02 +00:00
|
|
|
) (*QUICConnection, error) {
|
|
|
|
session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig)
|
|
|
|
if err != nil {
|
2022-05-20 21:51:36 +00:00
|
|
|
return nil, &EdgeQuicDialError{Cause: err}
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
|
2022-08-17 17:23:04 +00:00
|
|
|
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
|
2022-09-20 10:39:51 +00:00
|
|
|
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
|
2022-08-17 17:23:04 +00:00
|
|
|
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
|
2022-09-29 14:42:30 +00:00
|
|
|
packetRouter := packet.NewRouter(packetRouterConfig, datagramMuxer, &returnPipe{muxer: datagramMuxer}, logger)
|
2022-09-20 10:39:51 +00:00
|
|
|
|
2021-08-03 09:04:02 +00:00
|
|
|
return &QUICConnection{
|
2022-01-04 19:00:44 +00:00
|
|
|
session: session,
|
2022-02-11 10:49:06 +00:00
|
|
|
orchestrator: orchestrator,
|
2022-01-04 19:00:44 +00:00
|
|
|
logger: logger,
|
|
|
|
sessionManager: sessionManager,
|
2022-08-01 12:48:33 +00:00
|
|
|
datagramMuxer: datagramMuxer,
|
2022-09-29 14:42:30 +00:00
|
|
|
packetRouter: packetRouter,
|
2022-01-04 19:00:44 +00:00
|
|
|
controlStreamHandler: controlStreamHandler,
|
2022-01-05 16:01:56 +00:00
|
|
|
connOptions: connOptions,
|
2021-08-03 09:04:02 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Serve starts a QUIC session that begins accepting streams.
|
|
|
|
func (q *QUICConnection) Serve(ctx context.Context) error {
|
2022-01-05 16:01:56 +00:00
|
|
|
// origintunneld assumes the first stream is used for the control plane
|
|
|
|
controlStream, err := q.session.OpenStream()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to open a registration control stream: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-01-04 19:00:44 +00:00
|
|
|
// If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
|
|
|
|
// as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
|
|
|
|
// connection).
|
|
|
|
// If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
|
|
|
|
// other goroutine as fast as possible.
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
2021-11-14 11:18:05 +00:00
|
|
|
errGroup, ctx := errgroup.WithContext(ctx)
|
2022-01-05 16:01:56 +00:00
|
|
|
|
|
|
|
// In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
|
|
|
|
// stream is already fully registered before the other goroutines can proceed.
|
|
|
|
errGroup.Go(func() error {
|
|
|
|
defer cancel()
|
|
|
|
return q.serveControlStream(ctx, controlStream)
|
|
|
|
})
|
2021-11-14 11:18:05 +00:00
|
|
|
errGroup.Go(func() error {
|
2022-01-04 19:00:44 +00:00
|
|
|
defer cancel()
|
2021-11-23 12:45:59 +00:00
|
|
|
return q.acceptStream(ctx)
|
2021-11-14 11:18:05 +00:00
|
|
|
})
|
|
|
|
errGroup.Go(func() error {
|
2022-01-04 19:00:44 +00:00
|
|
|
defer cancel()
|
2021-11-23 12:45:59 +00:00
|
|
|
return q.sessionManager.Serve(ctx)
|
2021-11-14 11:18:05 +00:00
|
|
|
})
|
2022-08-01 12:48:33 +00:00
|
|
|
errGroup.Go(func() error {
|
|
|
|
defer cancel()
|
|
|
|
return q.datagramMuxer.ServeReceive(ctx)
|
|
|
|
})
|
2022-09-29 14:42:30 +00:00
|
|
|
errGroup.Go(func() error {
|
|
|
|
defer cancel()
|
|
|
|
return q.packetRouter.Serve(ctx)
|
|
|
|
})
|
2022-08-01 12:48:33 +00:00
|
|
|
|
2021-11-14 11:18:05 +00:00
|
|
|
return errGroup.Wait()
|
|
|
|
}
|
|
|
|
|
2022-01-05 16:01:56 +00:00
|
|
|
func (q *QUICConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
|
|
|
|
// This blocks until the control plane is done.
|
2022-04-27 10:51:06 +00:00
|
|
|
err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions, q.orchestrator)
|
2022-01-05 16:01:56 +00:00
|
|
|
if err != nil {
|
|
|
|
// Not wrapping error here to be consistent with the http2 message.
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-06-13 17:45:40 +00:00
|
|
|
// Close closes the session with no errors specified.
|
|
|
|
func (q *QUICConnection) Close() {
|
|
|
|
q.session.CloseWithError(0, "")
|
|
|
|
}
|
|
|
|
|
2021-11-14 11:18:05 +00:00
|
|
|
func (q *QUICConnection) acceptStream(ctx context.Context) error {
|
2022-01-04 19:00:44 +00:00
|
|
|
defer q.Close()
|
2021-08-03 09:04:02 +00:00
|
|
|
for {
|
TUN-5621: Correctly manage QUIC stream closing
Until this PR, we were naively closing the quic.Stream whenever
the callstack for handling the request (HTTP or TCP) finished.
However, our proxy handler may still be reading or writing from
the quic.Stream at that point, because we return the callstack if
either side finishes, but not necessarily both.
This is a problem for quic-go library because quic.Stream#Close
cannot be called concurrently with quic.Stream#Write
Furthermore, we also noticed that quic.Stream#Close does nothing
to do receiving stream (since, underneath, quic.Stream has 2 streams,
1 for each direction), thus leaking memory, as explained in:
https://github.com/lucas-clemente/quic-go/issues/3322
This PR addresses both problems by wrapping the quic.Stream that
is passed down to the proxying logic and handle all these concerns.
2022-01-27 22:37:45 +00:00
|
|
|
quicStream, err := q.session.AcceptStream(ctx)
|
2021-08-03 09:04:02 +00:00
|
|
|
if err != nil {
|
2021-09-21 06:11:36 +00:00
|
|
|
// context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
|
2022-01-04 19:00:44 +00:00
|
|
|
if errors.Is(err, context.Canceled) || q.controlStreamHandler.IsStopped() {
|
2021-09-21 06:11:36 +00:00
|
|
|
return nil
|
|
|
|
}
|
2021-11-03 12:06:04 +00:00
|
|
|
return fmt.Errorf("failed to accept QUIC stream: %w", err)
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
2022-06-13 17:45:40 +00:00
|
|
|
go q.runStream(quicStream)
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-13 17:45:40 +00:00
|
|
|
func (q *QUICConnection) runStream(quicStream quic.Stream) {
|
2022-07-07 23:01:37 +00:00
|
|
|
ctx := quicStream.Context()
|
2022-06-13 17:45:40 +00:00
|
|
|
stream := quicpogs.NewSafeStreamCloser(quicStream)
|
|
|
|
defer stream.Close()
|
|
|
|
|
2022-08-09 17:10:51 +00:00
|
|
|
// we are going to fuse readers/writers from stream <- cloudflared -> origin, and we want to guarantee that
|
2022-08-22 22:48:45 +00:00
|
|
|
// code executed in the code path of handleStream don't trigger an earlier close to the downstream write stream.
|
|
|
|
// So, we wrap the stream with a no-op write closer and only this method can actually close write side of the stream.
|
|
|
|
// A call to close will simulate a close to the read-side, which will fail subsequent reads.
|
|
|
|
noCloseStream := &nopCloserReadWriter{ReadWriteCloser: stream}
|
2022-08-09 17:10:51 +00:00
|
|
|
if err := q.handleStream(ctx, noCloseStream); err != nil {
|
2022-06-13 17:45:40 +00:00
|
|
|
q.logger.Err(err).Msg("Failed to handle QUIC stream")
|
|
|
|
}
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
func (q *QUICConnection) handleStream(ctx context.Context, stream io.ReadWriteCloser) error {
|
2021-11-12 09:37:28 +00:00
|
|
|
signature, err := quicpogs.DetermineProtocol(stream)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
switch signature {
|
|
|
|
case quicpogs.DataStreamProtocolSignature:
|
|
|
|
reqServerStream, err := quicpogs.NewRequestServerStream(stream, signature)
|
|
|
|
if err != nil {
|
2022-06-13 17:46:52 +00:00
|
|
|
return err
|
2021-11-12 09:37:28 +00:00
|
|
|
}
|
2022-07-07 23:01:37 +00:00
|
|
|
return q.handleDataStream(ctx, reqServerStream)
|
2021-11-12 09:37:28 +00:00
|
|
|
case quicpogs.RPCStreamProtocolSignature:
|
|
|
|
rpcStream, err := quicpogs.NewRPCServerStream(stream, signature)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return q.handleRPCStream(rpcStream)
|
|
|
|
default:
|
2021-11-14 11:18:05 +00:00
|
|
|
return fmt.Errorf("unknown protocol %v", signature)
|
2021-11-12 09:37:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
func (q *QUICConnection) handleDataStream(ctx context.Context, stream *quicpogs.RequestServerStream) error {
|
2022-06-13 17:46:52 +00:00
|
|
|
request, err := stream.ReadConnectRequestData()
|
2021-08-03 09:04:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
if err := q.dispatchRequest(ctx, stream, err, request); err != nil {
|
2022-06-13 17:46:52 +00:00
|
|
|
_ = stream.WriteConnectResponseData(err)
|
|
|
|
q.logger.Err(err).Str("type", request.Type.String()).Str("dest", request.Dest).Msg("Request failed")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.RequestServerStream, err error, request *quicpogs.ConnectRequest) error {
|
2022-02-11 10:49:06 +00:00
|
|
|
originProxy, err := q.orchestrator.GetOriginProxy()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-06-09 12:55:26 +00:00
|
|
|
|
2022-06-13 17:46:52 +00:00
|
|
|
switch request.Type {
|
2021-08-03 09:04:02 +00:00
|
|
|
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
|
2022-07-26 21:00:53 +00:00
|
|
|
tracedReq, err := buildHTTPRequest(ctx, request, stream, q.logger)
|
2021-08-03 07:09:56 +00:00
|
|
|
if err != nil {
|
2021-08-03 09:04:02 +00:00
|
|
|
return err
|
|
|
|
}
|
2021-08-03 07:09:56 +00:00
|
|
|
w := newHTTPResponseAdapter(stream)
|
2022-06-13 17:46:52 +00:00
|
|
|
return originProxy.ProxyHTTP(w, tracedReq, request.Type == quicpogs.ConnectionTypeWebsocket)
|
|
|
|
|
2021-08-03 09:04:02 +00:00
|
|
|
case quicpogs.ConnectionTypeTCP:
|
2021-11-12 09:37:28 +00:00
|
|
|
rwa := &streamReadWriteAcker{stream}
|
2022-06-13 17:46:52 +00:00
|
|
|
metadata := request.MetadataMap()
|
2022-07-07 23:01:37 +00:00
|
|
|
return originProxy.ProxyTCP(ctx, rwa, &TCPRequest{
|
2022-07-26 21:00:53 +00:00
|
|
|
Dest: request.Dest,
|
|
|
|
FlowID: metadata[QUICMetadataFlowID],
|
|
|
|
CfTraceID: metadata[tracing.TracerContextName],
|
2022-06-13 17:46:52 +00:00
|
|
|
})
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2021-08-03 07:09:56 +00:00
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) error {
|
2022-02-02 12:27:49 +00:00
|
|
|
return rpcStream.Serve(q, q, q.logger)
|
2021-11-12 09:37:28 +00:00
|
|
|
}
|
|
|
|
|
2021-12-14 22:52:47 +00:00
|
|
|
// RegisterUdpSession is the RPC method invoked by edge to register and run a session
|
2022-09-09 04:42:11 +00:00
|
|
|
func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*tunnelpogs.RegisterUdpSessionResponse, error) {
|
|
|
|
traceCtx := tracing.NewTracedContext(ctx, traceContext, q.logger)
|
|
|
|
ctx, registerSpan := traceCtx.Tracer().Start(traceCtx, "register-session", trace.WithAttributes(
|
|
|
|
attribute.String("session-id", sessionID.String()),
|
|
|
|
attribute.String("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)),
|
|
|
|
))
|
2021-11-23 12:45:59 +00:00
|
|
|
// Each session is a series of datagram from an eyeball to a dstIP:dstPort.
|
|
|
|
// (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
|
2021-11-30 10:27:33 +00:00
|
|
|
originProxy, err := ingress.DialUDP(dstIP, dstPort)
|
2021-11-23 12:45:59 +00:00
|
|
|
if err != nil {
|
|
|
|
q.logger.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
|
2022-09-09 04:42:11 +00:00
|
|
|
tracing.EndWithErrorStatus(registerSpan, err)
|
|
|
|
return nil, err
|
2021-11-23 12:45:59 +00:00
|
|
|
}
|
2022-09-09 04:42:11 +00:00
|
|
|
registerSpan.SetAttributes(
|
|
|
|
attribute.Bool("socket-bind-success", true),
|
|
|
|
attribute.String("src", originProxy.LocalAddr().String()),
|
|
|
|
)
|
|
|
|
|
2021-11-23 12:45:59 +00:00
|
|
|
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
|
2021-11-14 11:18:05 +00:00
|
|
|
if err != nil {
|
2021-12-14 22:52:47 +00:00
|
|
|
q.logger.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
|
2022-09-09 04:42:11 +00:00
|
|
|
tracing.EndWithErrorStatus(registerSpan, err)
|
|
|
|
return nil, err
|
2021-11-14 11:18:05 +00:00
|
|
|
}
|
2021-12-14 22:52:47 +00:00
|
|
|
|
|
|
|
go q.serveUDPSession(session, closeAfterIdleHint)
|
|
|
|
|
2022-05-30 12:38:15 +00:00
|
|
|
q.logger.Debug().Str("sessionID", sessionID.String()).Str("src", originProxy.LocalAddr().String()).Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).Msgf("Registered session")
|
2022-09-09 04:42:11 +00:00
|
|
|
tracing.End(registerSpan)
|
|
|
|
|
|
|
|
resp := tunnelpogs.RegisterUdpSessionResponse{
|
|
|
|
Spans: traceCtx.GetProtoSpans(),
|
|
|
|
}
|
|
|
|
|
|
|
|
return &resp, nil
|
2021-11-14 11:18:05 +00:00
|
|
|
}
|
|
|
|
|
2021-12-14 22:52:47 +00:00
|
|
|
func (q *QUICConnection) serveUDPSession(session *datagramsession.Session, closeAfterIdleHint time.Duration) {
|
|
|
|
ctx := q.session.Context()
|
|
|
|
closedByRemote, err := session.Serve(ctx, closeAfterIdleHint)
|
|
|
|
// If session is terminated by remote, then we know it has been unregistered from session manager and edge
|
|
|
|
if !closedByRemote {
|
|
|
|
if err != nil {
|
|
|
|
q.closeUDPSession(ctx, session.ID, err.Error())
|
|
|
|
} else {
|
|
|
|
q.closeUDPSession(ctx, session.ID, "terminated without error")
|
|
|
|
}
|
|
|
|
}
|
2021-12-23 11:42:00 +00:00
|
|
|
q.logger.Debug().Err(err).Str("sessionID", session.ID.String()).Msg("Session terminated")
|
2021-12-14 22:52:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// closeUDPSession first unregisters the session from session manager, then it tries to unregister from edge
|
|
|
|
func (q *QUICConnection) closeUDPSession(ctx context.Context, sessionID uuid.UUID, message string) {
|
|
|
|
q.sessionManager.UnregisterSession(ctx, sessionID, message, false)
|
|
|
|
stream, err := q.session.OpenStream()
|
|
|
|
if err != nil {
|
|
|
|
// Log this at debug because this is not an error if session was closed due to lost connection
|
|
|
|
// with edge
|
|
|
|
q.logger.Debug().Err(err).Str("sessionID", sessionID.String()).
|
|
|
|
Msgf("Failed to open quic stream to unregister udp session with edge")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
rpcClientStream, err := quicpogs.NewRPCClientStream(ctx, stream, q.logger)
|
|
|
|
if err != nil {
|
|
|
|
// Log this at debug because this is not an error if session was closed due to lost connection
|
|
|
|
// with edge
|
|
|
|
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
|
|
|
Msgf("Failed to open rpc stream to unregister udp session with edge")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if err := rpcClientStream.UnregisterUdpSession(ctx, sessionID, message); err != nil {
|
|
|
|
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
|
|
|
Msgf("Failed to unregister udp session with edge")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnregisterUdpSession is the RPC method invoked by edge to unregister and terminate a sesssion
|
|
|
|
func (q *QUICConnection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
|
|
|
|
return q.sessionManager.UnregisterSession(ctx, sessionID, message, true)
|
2021-11-30 19:58:11 +00:00
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
|
2022-02-02 12:27:49 +00:00
|
|
|
// UpdateConfiguration is the RPC method invoked by edge when there is a new configuration
|
2022-02-11 15:46:04 +00:00
|
|
|
func (q *QUICConnection) UpdateConfiguration(ctx context.Context, version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
|
|
|
|
return q.orchestrator.UpdateConfig(version, config)
|
2022-02-02 12:27:49 +00:00
|
|
|
}
|
|
|
|
|
2021-08-17 14:30:02 +00:00
|
|
|
// streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
|
|
|
|
// the client.
|
|
|
|
type streamReadWriteAcker struct {
|
2021-11-12 09:37:28 +00:00
|
|
|
*quicpogs.RequestServerStream
|
2021-08-17 14:30:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// AckConnection acks response back to the proxy.
|
2022-07-26 21:00:53 +00:00
|
|
|
func (s *streamReadWriteAcker) AckConnection(tracePropagation string) error {
|
|
|
|
metadata := quicpogs.Metadata{
|
|
|
|
Key: tracing.CanonicalCloudflaredTracingHeader,
|
|
|
|
Val: tracePropagation,
|
|
|
|
}
|
|
|
|
return s.WriteConnectResponseData(nil, metadata)
|
2021-08-17 14:30:02 +00:00
|
|
|
}
|
|
|
|
|
2021-08-03 07:09:56 +00:00
|
|
|
// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
|
|
|
|
type httpResponseAdapter struct {
|
2021-11-12 09:37:28 +00:00
|
|
|
*quicpogs.RequestServerStream
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func newHTTPResponseAdapter(s *quicpogs.RequestServerStream) httpResponseAdapter {
|
|
|
|
return httpResponseAdapter{s}
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
2022-08-16 11:21:58 +00:00
|
|
|
func (hrw httpResponseAdapter) AddTrailer(trailerName, trailerValue string) {
|
|
|
|
// we do not support trailers over QUIC
|
|
|
|
}
|
|
|
|
|
2021-08-03 07:09:56 +00:00
|
|
|
func (hrw httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
|
|
|
|
metadata := make([]quicpogs.Metadata, 0)
|
|
|
|
metadata = append(metadata, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
|
|
|
|
for k, vv := range header {
|
|
|
|
for _, v := range vv {
|
|
|
|
httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k)
|
|
|
|
metadata = append(metadata, quicpogs.Metadata{Key: httpHeaderKey, Val: v})
|
|
|
|
}
|
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
return hrw.WriteConnectResponseData(nil, metadata...)
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (hrw httpResponseAdapter) WriteErrorResponse(err error) {
|
2021-11-12 09:37:28 +00:00
|
|
|
hrw.WriteConnectResponseData(err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
2022-07-26 21:00:53 +00:00
|
|
|
func buildHTTPRequest(
|
|
|
|
ctx context.Context,
|
|
|
|
connectRequest *quicpogs.ConnectRequest,
|
|
|
|
body io.ReadCloser,
|
|
|
|
log *zerolog.Logger,
|
|
|
|
) (*tracing.TracedHTTPRequest, error) {
|
2021-08-03 07:09:56 +00:00
|
|
|
metadata := connectRequest.MetadataMap()
|
|
|
|
dest := connectRequest.Dest
|
|
|
|
method := metadata[HTTPMethodKey]
|
|
|
|
host := metadata[HTTPHostKey]
|
2021-10-07 14:47:27 +00:00
|
|
|
isWebsocket := connectRequest.Type == quicpogs.ConnectionTypeWebsocket
|
2021-08-03 07:09:56 +00:00
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
req, err := http.NewRequestWithContext(ctx, method, dest, body)
|
2021-08-03 07:09:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
req.Host = host
|
|
|
|
for _, metadata := range connectRequest.Metadata {
|
|
|
|
if strings.Contains(metadata.Key, HTTPHeaderKey) {
|
|
|
|
// metadata.Key is off the format httpHeaderKey:<HTTPHeader>
|
|
|
|
httpHeaderKey := strings.Split(metadata.Key, ":")
|
|
|
|
if len(httpHeaderKey) != 2 {
|
2021-11-14 11:18:05 +00:00
|
|
|
return nil, fmt.Errorf("header Key: %s malformed", metadata.Key)
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
req.Header.Add(httpHeaderKey[1], metadata.Val)
|
|
|
|
}
|
|
|
|
}
|
2021-09-27 13:12:11 +00:00
|
|
|
// Go's http.Client automatically sends chunked request body if this value is not set on the
|
|
|
|
// *http.Request struct regardless of header:
|
|
|
|
// https://go.googlesource.com/go/+/go1.8rc2/src/net/http/transfer.go#154.
|
|
|
|
if err := setContentLength(req); err != nil {
|
|
|
|
return nil, fmt.Errorf("Error setting content-length: %w", err)
|
|
|
|
}
|
2021-10-07 14:47:27 +00:00
|
|
|
|
|
|
|
// Go's client defaults to chunked encoding after a 200ms delay if the following cases are true:
|
|
|
|
// * the request body blocks
|
|
|
|
// * the content length is not set (or set to -1)
|
|
|
|
// * the method doesn't usually have a body (GET, HEAD, DELETE, ...)
|
|
|
|
// * there is no transfer-encoding=chunked already set.
|
|
|
|
// So, if transfer cannot be chunked and content length is 0, we dont set a request body.
|
|
|
|
if !isWebsocket && !isTransferEncodingChunked(req) && req.ContentLength == 0 {
|
2022-03-05 18:05:07 +00:00
|
|
|
req.Body = http.NoBody
|
2021-10-07 14:47:27 +00:00
|
|
|
}
|
2021-08-03 07:09:56 +00:00
|
|
|
stripWebsocketUpgradeHeader(req)
|
2022-04-06 23:20:29 +00:00
|
|
|
|
|
|
|
// Check for tracing on request
|
2022-07-26 21:00:53 +00:00
|
|
|
tracedReq := tracing.NewTracedHTTPRequest(req, log)
|
2022-04-06 23:20:29 +00:00
|
|
|
return tracedReq, err
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
2021-09-27 13:12:11 +00:00
|
|
|
|
|
|
|
func setContentLength(req *http.Request) error {
|
|
|
|
var err error
|
|
|
|
if contentLengthStr := req.Header.Get("Content-Length"); contentLengthStr != "" {
|
|
|
|
req.ContentLength, err = strconv.ParseInt(contentLengthStr, 10, 64)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
2021-10-07 14:47:27 +00:00
|
|
|
|
|
|
|
func isTransferEncodingChunked(req *http.Request) bool {
|
|
|
|
transferEncodingVal := req.Header.Get("Transfer-Encoding")
|
|
|
|
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding suggests that this can be a comma
|
|
|
|
// separated value as well.
|
|
|
|
return strings.Contains(strings.ToLower(transferEncodingVal), "chunked")
|
|
|
|
}
|
2022-08-09 17:10:51 +00:00
|
|
|
|
2022-08-22 22:48:45 +00:00
|
|
|
// A helper struct that guarantees a call to close only affects read side, but not write side.
|
2022-08-09 17:10:51 +00:00
|
|
|
type nopCloserReadWriter struct {
|
|
|
|
io.ReadWriteCloser
|
2022-08-22 22:48:45 +00:00
|
|
|
|
|
|
|
// for use by Read only
|
|
|
|
// we don't need a memory barrier here because there is an implicit assumption that
|
|
|
|
// Read calls can't happen concurrently by different go-routines.
|
|
|
|
sawEOF bool
|
|
|
|
// should be updated and read using atomic primitives.
|
|
|
|
// value is read in Read method and written in Close method, which could be done by different
|
|
|
|
// go-routines.
|
|
|
|
closed uint32
|
|
|
|
}
|
|
|
|
|
|
|
|
func (np *nopCloserReadWriter) Read(p []byte) (n int, err error) {
|
|
|
|
if np.sawEOF {
|
|
|
|
return 0, io.EOF
|
|
|
|
}
|
|
|
|
|
|
|
|
if atomic.LoadUint32(&np.closed) > 0 {
|
|
|
|
return 0, fmt.Errorf("closed by handler")
|
|
|
|
}
|
|
|
|
|
|
|
|
n, err = np.ReadWriteCloser.Read(p)
|
|
|
|
if err == io.EOF {
|
|
|
|
np.sawEOF = true
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
2022-08-09 17:10:51 +00:00
|
|
|
}
|
|
|
|
|
2022-08-22 22:48:45 +00:00
|
|
|
func (np *nopCloserReadWriter) Close() error {
|
|
|
|
atomic.StoreUint32(&np.closed, 1)
|
|
|
|
|
2022-08-09 17:10:51 +00:00
|
|
|
return nil
|
|
|
|
}
|
2022-09-13 13:00:54 +00:00
|
|
|
|
|
|
|
// returnPipe wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
|
|
|
|
type returnPipe struct {
|
|
|
|
muxer *quicpogs.DatagramMuxerV2
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rp *returnPipe) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
|
|
|
|
return rp.muxer.SendPacket(pk)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rp *returnPipe) Close() error {
|
|
|
|
return nil
|
|
|
|
}
|