2021-08-03 09:04:02 +00:00
|
|
|
package connection
|
|
|
|
|
|
|
|
import (
|
2023-03-29 16:21:19 +00:00
|
|
|
"bufio"
|
2021-08-03 09:04:02 +00:00
|
|
|
"context"
|
|
|
|
"crypto/tls"
|
2021-08-03 07:09:56 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
2021-08-03 09:04:02 +00:00
|
|
|
"net"
|
2021-08-03 07:09:56 +00:00
|
|
|
"net/http"
|
2022-09-02 16:29:50 +00:00
|
|
|
"net/netip"
|
2021-08-03 07:09:56 +00:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
2022-10-12 16:01:25 +00:00
|
|
|
"sync"
|
2022-08-22 22:48:45 +00:00
|
|
|
"sync/atomic"
|
2021-12-02 11:02:27 +00:00
|
|
|
"time"
|
2021-08-03 09:04:02 +00:00
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
"github.com/google/uuid"
|
2021-08-03 09:04:02 +00:00
|
|
|
"github.com/pkg/errors"
|
2023-05-06 00:42:41 +00:00
|
|
|
"github.com/quic-go/quic-go"
|
2021-08-03 09:04:02 +00:00
|
|
|
"github.com/rs/zerolog"
|
2022-09-09 04:42:11 +00:00
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
2021-11-14 11:18:05 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
2021-08-03 09:04:02 +00:00
|
|
|
|
2021-11-23 12:45:59 +00:00
|
|
|
"github.com/cloudflare/cloudflared/datagramsession"
|
2021-11-30 10:27:33 +00:00
|
|
|
"github.com/cloudflare/cloudflared/ingress"
|
2023-04-17 22:03:34 +00:00
|
|
|
"github.com/cloudflare/cloudflared/management"
|
2022-08-17 15:46:49 +00:00
|
|
|
"github.com/cloudflare/cloudflared/packet"
|
2021-08-03 09:04:02 +00:00
|
|
|
quicpogs "github.com/cloudflare/cloudflared/quic"
|
2022-04-06 23:20:29 +00:00
|
|
|
"github.com/cloudflare/cloudflared/tracing"
|
2021-08-17 14:30:02 +00:00
|
|
|
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
2021-08-03 09:04:02 +00:00
|
|
|
)
|
|
|
|
|
2021-08-03 07:09:56 +00:00
|
|
|
const (
|
|
|
|
// HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP.
|
|
|
|
HTTPHeaderKey = "HttpHeader"
|
|
|
|
// HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP.
|
|
|
|
HTTPMethodKey = "HttpMethod"
|
|
|
|
// HTTPHostKey is used to get or set http Method in QUIC ALPN if the underlying proxy connection type is HTTP.
|
2021-12-22 18:07:44 +00:00
|
|
|
HTTPHostKey = "HttpHost"
|
2022-06-09 12:55:26 +00:00
|
|
|
|
|
|
|
QUICMetadataFlowID = "FlowID"
|
2022-08-01 12:48:33 +00:00
|
|
|
// emperically this capacity has been working well
|
|
|
|
demuxChanCapacity = 16
|
2021-08-03 07:09:56 +00:00
|
|
|
)
|
|
|
|
|
2022-10-12 16:01:25 +00:00
|
|
|
var (
|
|
|
|
portForConnIndex = make(map[uint8]int, 0)
|
|
|
|
portMapMutex sync.Mutex
|
|
|
|
)
|
|
|
|
|
2021-08-03 09:04:02 +00:00
|
|
|
// QUICConnection represents the type that facilitates Proxying via QUIC streams.
|
|
|
|
type QUICConnection struct {
|
2022-08-01 12:48:33 +00:00
|
|
|
session quic.Connection
|
|
|
|
logger *zerolog.Logger
|
|
|
|
orchestrator Orchestrator
|
|
|
|
// sessionManager tracks active sessions. It receives datagrams from quic connection via datagramMuxer
|
|
|
|
sessionManager datagramsession.Manager
|
|
|
|
// datagramMuxer mux/demux datagrams from quic connection
|
2022-09-29 14:42:30 +00:00
|
|
|
datagramMuxer *quicpogs.DatagramMuxerV2
|
2022-10-13 10:01:25 +00:00
|
|
|
packetRouter *ingress.PacketRouter
|
2022-01-04 19:00:44 +00:00
|
|
|
controlStreamHandler ControlStreamHandler
|
2022-01-05 16:01:56 +00:00
|
|
|
connOptions *tunnelpogs.ConnectionOptions
|
2023-02-22 14:52:44 +00:00
|
|
|
connIndex uint8
|
2023-06-19 16:03:11 +00:00
|
|
|
|
|
|
|
udpUnregisterTimeout time.Duration
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewQUICConnection returns a new instance of QUICConnection.
|
|
|
|
func NewQUICConnection(
|
2023-05-06 00:42:41 +00:00
|
|
|
ctx context.Context,
|
2021-08-03 09:04:02 +00:00
|
|
|
quicConfig *quic.Config,
|
|
|
|
edgeAddr net.Addr,
|
2023-02-28 16:11:42 +00:00
|
|
|
localAddr net.IP,
|
2022-10-12 16:01:25 +00:00
|
|
|
connIndex uint8,
|
2021-08-03 09:04:02 +00:00
|
|
|
tlsConfig *tls.Config,
|
2022-02-11 10:49:06 +00:00
|
|
|
orchestrator Orchestrator,
|
2021-08-17 14:30:02 +00:00
|
|
|
connOptions *tunnelpogs.ConnectionOptions,
|
|
|
|
controlStreamHandler ControlStreamHandler,
|
2022-01-04 19:00:44 +00:00
|
|
|
logger *zerolog.Logger,
|
2022-10-13 10:01:25 +00:00
|
|
|
packetRouterConfig *ingress.GlobalRouterConfig,
|
2023-06-19 16:03:11 +00:00
|
|
|
udpUnregisterTimeout time.Duration,
|
2021-08-03 09:04:02 +00:00
|
|
|
) (*QUICConnection, error) {
|
2023-02-28 16:11:42 +00:00
|
|
|
udpConn, err := createUDPConnForConnIndex(connIndex, localAddr, logger)
|
2022-10-12 16:01:25 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-05-06 00:42:41 +00:00
|
|
|
session, err := quic.Dial(ctx, udpConn, edgeAddr, tlsConfig, quicConfig)
|
2021-08-03 09:04:02 +00:00
|
|
|
if err != nil {
|
2022-11-29 15:13:34 +00:00
|
|
|
// close the udp server socket in case of error connecting to the edge
|
|
|
|
udpConn.Close()
|
2022-05-20 21:51:36 +00:00
|
|
|
return nil, &EdgeQuicDialError{Cause: err}
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
|
2022-10-12 16:01:25 +00:00
|
|
|
// wrap the session, so that the UDPConn is closed after session is closed.
|
|
|
|
session = &wrapCloseableConnQuicConnection{
|
|
|
|
session,
|
|
|
|
udpConn,
|
|
|
|
}
|
|
|
|
|
2022-08-17 17:23:04 +00:00
|
|
|
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
|
2022-09-20 10:39:51 +00:00
|
|
|
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
|
2022-08-17 17:23:04 +00:00
|
|
|
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
|
2023-09-08 17:05:13 +00:00
|
|
|
packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger)
|
2022-09-20 10:39:51 +00:00
|
|
|
|
2021-08-03 09:04:02 +00:00
|
|
|
return &QUICConnection{
|
2022-01-04 19:00:44 +00:00
|
|
|
session: session,
|
2022-02-11 10:49:06 +00:00
|
|
|
orchestrator: orchestrator,
|
2022-01-04 19:00:44 +00:00
|
|
|
logger: logger,
|
|
|
|
sessionManager: sessionManager,
|
2022-08-01 12:48:33 +00:00
|
|
|
datagramMuxer: datagramMuxer,
|
2022-09-29 14:42:30 +00:00
|
|
|
packetRouter: packetRouter,
|
2022-01-04 19:00:44 +00:00
|
|
|
controlStreamHandler: controlStreamHandler,
|
2022-01-05 16:01:56 +00:00
|
|
|
connOptions: connOptions,
|
2023-02-22 14:52:44 +00:00
|
|
|
connIndex: connIndex,
|
2023-06-19 16:03:11 +00:00
|
|
|
udpUnregisterTimeout: udpUnregisterTimeout,
|
2021-08-03 09:04:02 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Serve starts a QUIC session that begins accepting streams.
|
|
|
|
func (q *QUICConnection) Serve(ctx context.Context) error {
|
2022-01-05 16:01:56 +00:00
|
|
|
// origintunneld assumes the first stream is used for the control plane
|
|
|
|
controlStream, err := q.session.OpenStream()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to open a registration control stream: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-01-04 19:00:44 +00:00
|
|
|
// If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
|
|
|
|
// as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
|
|
|
|
// connection).
|
|
|
|
// If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
|
|
|
|
// other goroutine as fast as possible.
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
2021-11-14 11:18:05 +00:00
|
|
|
errGroup, ctx := errgroup.WithContext(ctx)
|
2022-01-05 16:01:56 +00:00
|
|
|
|
|
|
|
// In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
|
|
|
|
// stream is already fully registered before the other goroutines can proceed.
|
|
|
|
errGroup.Go(func() error {
|
|
|
|
defer cancel()
|
|
|
|
return q.serveControlStream(ctx, controlStream)
|
|
|
|
})
|
2021-11-14 11:18:05 +00:00
|
|
|
errGroup.Go(func() error {
|
2022-01-04 19:00:44 +00:00
|
|
|
defer cancel()
|
2021-11-23 12:45:59 +00:00
|
|
|
return q.acceptStream(ctx)
|
2021-11-14 11:18:05 +00:00
|
|
|
})
|
|
|
|
errGroup.Go(func() error {
|
2022-01-04 19:00:44 +00:00
|
|
|
defer cancel()
|
2021-11-23 12:45:59 +00:00
|
|
|
return q.sessionManager.Serve(ctx)
|
2021-11-14 11:18:05 +00:00
|
|
|
})
|
2022-08-01 12:48:33 +00:00
|
|
|
errGroup.Go(func() error {
|
|
|
|
defer cancel()
|
|
|
|
return q.datagramMuxer.ServeReceive(ctx)
|
|
|
|
})
|
2022-09-29 14:42:30 +00:00
|
|
|
errGroup.Go(func() error {
|
|
|
|
defer cancel()
|
|
|
|
return q.packetRouter.Serve(ctx)
|
|
|
|
})
|
2022-08-01 12:48:33 +00:00
|
|
|
|
2021-11-14 11:18:05 +00:00
|
|
|
return errGroup.Wait()
|
|
|
|
}
|
|
|
|
|
2022-01-05 16:01:56 +00:00
|
|
|
func (q *QUICConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
|
|
|
|
// This blocks until the control plane is done.
|
2022-04-27 10:51:06 +00:00
|
|
|
err := q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions, q.orchestrator)
|
2022-01-05 16:01:56 +00:00
|
|
|
if err != nil {
|
|
|
|
// Not wrapping error here to be consistent with the http2 message.
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-06-13 17:45:40 +00:00
|
|
|
// Close closes the session with no errors specified.
|
|
|
|
func (q *QUICConnection) Close() {
|
|
|
|
q.session.CloseWithError(0, "")
|
|
|
|
}
|
|
|
|
|
2021-11-14 11:18:05 +00:00
|
|
|
func (q *QUICConnection) acceptStream(ctx context.Context) error {
|
2022-01-04 19:00:44 +00:00
|
|
|
defer q.Close()
|
2021-08-03 09:04:02 +00:00
|
|
|
for {
|
TUN-5621: Correctly manage QUIC stream closing
Until this PR, we were naively closing the quic.Stream whenever
the callstack for handling the request (HTTP or TCP) finished.
However, our proxy handler may still be reading or writing from
the quic.Stream at that point, because we return the callstack if
either side finishes, but not necessarily both.
This is a problem for quic-go library because quic.Stream#Close
cannot be called concurrently with quic.Stream#Write
Furthermore, we also noticed that quic.Stream#Close does nothing
to do receiving stream (since, underneath, quic.Stream has 2 streams,
1 for each direction), thus leaking memory, as explained in:
https://github.com/lucas-clemente/quic-go/issues/3322
This PR addresses both problems by wrapping the quic.Stream that
is passed down to the proxying logic and handle all these concerns.
2022-01-27 22:37:45 +00:00
|
|
|
quicStream, err := q.session.AcceptStream(ctx)
|
2021-08-03 09:04:02 +00:00
|
|
|
if err != nil {
|
2021-09-21 06:11:36 +00:00
|
|
|
// context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
|
2022-01-04 19:00:44 +00:00
|
|
|
if errors.Is(err, context.Canceled) || q.controlStreamHandler.IsStopped() {
|
2021-09-21 06:11:36 +00:00
|
|
|
return nil
|
|
|
|
}
|
2021-11-03 12:06:04 +00:00
|
|
|
return fmt.Errorf("failed to accept QUIC stream: %w", err)
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
2022-06-13 17:45:40 +00:00
|
|
|
go q.runStream(quicStream)
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-13 17:45:40 +00:00
|
|
|
func (q *QUICConnection) runStream(quicStream quic.Stream) {
|
2022-07-07 23:01:37 +00:00
|
|
|
ctx := quicStream.Context()
|
2022-06-13 17:45:40 +00:00
|
|
|
stream := quicpogs.NewSafeStreamCloser(quicStream)
|
|
|
|
defer stream.Close()
|
|
|
|
|
2022-08-09 17:10:51 +00:00
|
|
|
// we are going to fuse readers/writers from stream <- cloudflared -> origin, and we want to guarantee that
|
2022-08-22 22:48:45 +00:00
|
|
|
// code executed in the code path of handleStream don't trigger an earlier close to the downstream write stream.
|
|
|
|
// So, we wrap the stream with a no-op write closer and only this method can actually close write side of the stream.
|
|
|
|
// A call to close will simulate a close to the read-side, which will fail subsequent reads.
|
|
|
|
noCloseStream := &nopCloserReadWriter{ReadWriteCloser: stream}
|
2022-08-09 17:10:51 +00:00
|
|
|
if err := q.handleStream(ctx, noCloseStream); err != nil {
|
2023-01-16 12:42:59 +00:00
|
|
|
q.logger.Debug().Err(err).Msg("Failed to handle QUIC stream")
|
|
|
|
|
|
|
|
// if we received an error at this level, then close write side of stream with an error, which will result in
|
|
|
|
// RST_STREAM frame.
|
|
|
|
quicStream.CancelWrite(0)
|
2022-06-13 17:45:40 +00:00
|
|
|
}
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
func (q *QUICConnection) handleStream(ctx context.Context, stream io.ReadWriteCloser) error {
|
2021-11-12 09:37:28 +00:00
|
|
|
signature, err := quicpogs.DetermineProtocol(stream)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
switch signature {
|
|
|
|
case quicpogs.DataStreamProtocolSignature:
|
|
|
|
reqServerStream, err := quicpogs.NewRequestServerStream(stream, signature)
|
|
|
|
if err != nil {
|
2022-06-13 17:46:52 +00:00
|
|
|
return err
|
2021-11-12 09:37:28 +00:00
|
|
|
}
|
2022-07-07 23:01:37 +00:00
|
|
|
return q.handleDataStream(ctx, reqServerStream)
|
2021-11-12 09:37:28 +00:00
|
|
|
case quicpogs.RPCStreamProtocolSignature:
|
|
|
|
rpcStream, err := quicpogs.NewRPCServerStream(stream, signature)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return q.handleRPCStream(rpcStream)
|
|
|
|
default:
|
2021-11-14 11:18:05 +00:00
|
|
|
return fmt.Errorf("unknown protocol %v", signature)
|
2021-11-12 09:37:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
func (q *QUICConnection) handleDataStream(ctx context.Context, stream *quicpogs.RequestServerStream) error {
|
2022-06-13 17:46:52 +00:00
|
|
|
request, err := stream.ReadConnectRequestData()
|
2021-08-03 09:04:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-01-16 12:42:59 +00:00
|
|
|
if err, connectResponseSent := q.dispatchRequest(ctx, stream, err, request); err != nil {
|
2022-06-13 17:46:52 +00:00
|
|
|
q.logger.Err(err).Str("type", request.Type.String()).Str("dest", request.Dest).Msg("Request failed")
|
2023-01-16 12:42:59 +00:00
|
|
|
|
|
|
|
// if the connectResponse was already sent and we had an error, we need to propagate it up, so that the stream is
|
|
|
|
// closed with an RST_STREAM frame
|
|
|
|
if connectResponseSent {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if writeRespErr := stream.WriteConnectResponseData(err); writeRespErr != nil {
|
|
|
|
return writeRespErr
|
|
|
|
}
|
2022-06-13 17:46:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-01-16 12:42:59 +00:00
|
|
|
// dispatchRequest will dispatch the request depending on the type and returns an error if it occurs.
|
|
|
|
// More importantly, it also tells if the during processing of the request the ConnectResponse metadata was sent downstream.
|
|
|
|
// This is important since it informs
|
|
|
|
func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.RequestServerStream, err error, request *quicpogs.ConnectRequest) (error, bool) {
|
2022-02-11 10:49:06 +00:00
|
|
|
originProxy, err := q.orchestrator.GetOriginProxy()
|
|
|
|
if err != nil {
|
2023-01-16 12:42:59 +00:00
|
|
|
return err, false
|
2022-02-11 10:49:06 +00:00
|
|
|
}
|
2022-06-09 12:55:26 +00:00
|
|
|
|
2022-06-13 17:46:52 +00:00
|
|
|
switch request.Type {
|
2021-08-03 09:04:02 +00:00
|
|
|
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
|
2023-02-22 14:52:44 +00:00
|
|
|
tracedReq, err := buildHTTPRequest(ctx, request, stream, q.connIndex, q.logger)
|
2021-08-03 07:09:56 +00:00
|
|
|
if err != nil {
|
2023-01-16 12:42:59 +00:00
|
|
|
return err, false
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
2021-08-03 07:09:56 +00:00
|
|
|
w := newHTTPResponseAdapter(stream)
|
2023-01-16 12:42:59 +00:00
|
|
|
return originProxy.ProxyHTTP(&w, tracedReq, request.Type == quicpogs.ConnectionTypeWebsocket), w.connectResponseSent
|
2022-06-13 17:46:52 +00:00
|
|
|
|
2021-08-03 09:04:02 +00:00
|
|
|
case quicpogs.ConnectionTypeTCP:
|
2023-01-16 12:42:59 +00:00
|
|
|
rwa := &streamReadWriteAcker{RequestServerStream: stream}
|
2022-06-13 17:46:52 +00:00
|
|
|
metadata := request.MetadataMap()
|
2022-07-07 23:01:37 +00:00
|
|
|
return originProxy.ProxyTCP(ctx, rwa, &TCPRequest{
|
2022-07-26 21:00:53 +00:00
|
|
|
Dest: request.Dest,
|
|
|
|
FlowID: metadata[QUICMetadataFlowID],
|
|
|
|
CfTraceID: metadata[tracing.TracerContextName],
|
2023-02-22 14:52:44 +00:00
|
|
|
ConnIndex: q.connIndex,
|
2023-01-16 12:42:59 +00:00
|
|
|
}), rwa.connectResponseSent
|
|
|
|
default:
|
|
|
|
return errors.Errorf("unsupported error type: %s", request.Type), false
|
2021-08-03 09:04:02 +00:00
|
|
|
}
|
|
|
|
}
|
2021-08-03 07:09:56 +00:00
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) error {
|
2023-01-16 12:42:59 +00:00
|
|
|
if err := rpcStream.Serve(q, q, q.logger); err != nil {
|
|
|
|
q.logger.Err(err).Msg("failed handling RPC stream")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2021-11-12 09:37:28 +00:00
|
|
|
}
|
|
|
|
|
2021-12-14 22:52:47 +00:00
|
|
|
// RegisterUdpSession is the RPC method invoked by edge to register and run a session
|
2022-09-09 04:42:11 +00:00
|
|
|
func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*tunnelpogs.RegisterUdpSessionResponse, error) {
|
|
|
|
traceCtx := tracing.NewTracedContext(ctx, traceContext, q.logger)
|
|
|
|
ctx, registerSpan := traceCtx.Tracer().Start(traceCtx, "register-session", trace.WithAttributes(
|
|
|
|
attribute.String("session-id", sessionID.String()),
|
|
|
|
attribute.String("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)),
|
|
|
|
))
|
2023-04-17 22:03:34 +00:00
|
|
|
log := q.logger.With().Int(management.EventTypeKey, int(management.UDP)).Logger()
|
2021-11-23 12:45:59 +00:00
|
|
|
// Each session is a series of datagram from an eyeball to a dstIP:dstPort.
|
|
|
|
// (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
|
2021-11-30 10:27:33 +00:00
|
|
|
originProxy, err := ingress.DialUDP(dstIP, dstPort)
|
2021-11-23 12:45:59 +00:00
|
|
|
if err != nil {
|
2023-04-17 22:03:34 +00:00
|
|
|
log.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
|
2022-09-09 04:42:11 +00:00
|
|
|
tracing.EndWithErrorStatus(registerSpan, err)
|
|
|
|
return nil, err
|
2021-11-23 12:45:59 +00:00
|
|
|
}
|
2022-09-09 04:42:11 +00:00
|
|
|
registerSpan.SetAttributes(
|
|
|
|
attribute.Bool("socket-bind-success", true),
|
|
|
|
attribute.String("src", originProxy.LocalAddr().String()),
|
|
|
|
)
|
|
|
|
|
2021-11-23 12:45:59 +00:00
|
|
|
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
|
2021-11-14 11:18:05 +00:00
|
|
|
if err != nil {
|
2023-04-17 22:03:34 +00:00
|
|
|
log.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
|
2022-09-09 04:42:11 +00:00
|
|
|
tracing.EndWithErrorStatus(registerSpan, err)
|
|
|
|
return nil, err
|
2021-11-14 11:18:05 +00:00
|
|
|
}
|
2021-12-14 22:52:47 +00:00
|
|
|
|
|
|
|
go q.serveUDPSession(session, closeAfterIdleHint)
|
|
|
|
|
2023-04-17 22:03:34 +00:00
|
|
|
log.Debug().
|
|
|
|
Str("sessionID", sessionID.String()).
|
|
|
|
Str("src", originProxy.LocalAddr().String()).
|
|
|
|
Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).
|
|
|
|
Msgf("Registered session")
|
2022-09-09 04:42:11 +00:00
|
|
|
tracing.End(registerSpan)
|
|
|
|
|
|
|
|
resp := tunnelpogs.RegisterUdpSessionResponse{
|
|
|
|
Spans: traceCtx.GetProtoSpans(),
|
|
|
|
}
|
|
|
|
|
|
|
|
return &resp, nil
|
2021-11-14 11:18:05 +00:00
|
|
|
}
|
|
|
|
|
2021-12-14 22:52:47 +00:00
|
|
|
func (q *QUICConnection) serveUDPSession(session *datagramsession.Session, closeAfterIdleHint time.Duration) {
|
|
|
|
ctx := q.session.Context()
|
|
|
|
closedByRemote, err := session.Serve(ctx, closeAfterIdleHint)
|
|
|
|
// If session is terminated by remote, then we know it has been unregistered from session manager and edge
|
|
|
|
if !closedByRemote {
|
|
|
|
if err != nil {
|
|
|
|
q.closeUDPSession(ctx, session.ID, err.Error())
|
|
|
|
} else {
|
|
|
|
q.closeUDPSession(ctx, session.ID, "terminated without error")
|
|
|
|
}
|
|
|
|
}
|
2023-04-17 22:03:34 +00:00
|
|
|
q.logger.Debug().Err(err).
|
|
|
|
Int(management.EventTypeKey, int(management.UDP)).
|
|
|
|
Str("sessionID", session.ID.String()).
|
|
|
|
Msg("Session terminated")
|
2021-12-14 22:52:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// closeUDPSession first unregisters the session from session manager, then it tries to unregister from edge
|
|
|
|
func (q *QUICConnection) closeUDPSession(ctx context.Context, sessionID uuid.UUID, message string) {
|
|
|
|
q.sessionManager.UnregisterSession(ctx, sessionID, message, false)
|
2023-06-15 09:44:47 +00:00
|
|
|
quicStream, err := q.session.OpenStream()
|
2021-12-14 22:52:47 +00:00
|
|
|
if err != nil {
|
|
|
|
// Log this at debug because this is not an error if session was closed due to lost connection
|
|
|
|
// with edge
|
2023-04-17 22:03:34 +00:00
|
|
|
q.logger.Debug().Err(err).
|
|
|
|
Int(management.EventTypeKey, int(management.UDP)).
|
|
|
|
Str("sessionID", sessionID.String()).
|
2021-12-14 22:52:47 +00:00
|
|
|
Msgf("Failed to open quic stream to unregister udp session with edge")
|
|
|
|
return
|
|
|
|
}
|
2023-06-15 09:44:47 +00:00
|
|
|
|
|
|
|
stream := quicpogs.NewSafeStreamCloser(quicStream)
|
|
|
|
defer stream.Close()
|
2023-06-19 16:03:11 +00:00
|
|
|
rpcClientStream, err := quicpogs.NewRPCClientStream(ctx, stream, q.udpUnregisterTimeout, q.logger)
|
2021-12-14 22:52:47 +00:00
|
|
|
if err != nil {
|
|
|
|
// Log this at debug because this is not an error if session was closed due to lost connection
|
|
|
|
// with edge
|
|
|
|
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
|
|
|
Msgf("Failed to open rpc stream to unregister udp session with edge")
|
|
|
|
return
|
|
|
|
}
|
2023-06-15 09:44:47 +00:00
|
|
|
defer rpcClientStream.Close()
|
|
|
|
|
2021-12-14 22:52:47 +00:00
|
|
|
if err := rpcClientStream.UnregisterUdpSession(ctx, sessionID, message); err != nil {
|
|
|
|
q.logger.Err(err).Str("sessionID", sessionID.String()).
|
|
|
|
Msgf("Failed to unregister udp session with edge")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnregisterUdpSession is the RPC method invoked by edge to unregister and terminate a sesssion
|
|
|
|
func (q *QUICConnection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
|
|
|
|
return q.sessionManager.UnregisterSession(ctx, sessionID, message, true)
|
2021-11-30 19:58:11 +00:00
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
|
2022-02-02 12:27:49 +00:00
|
|
|
// UpdateConfiguration is the RPC method invoked by edge when there is a new configuration
|
2022-02-11 15:46:04 +00:00
|
|
|
func (q *QUICConnection) UpdateConfiguration(ctx context.Context, version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
|
|
|
|
return q.orchestrator.UpdateConfig(version, config)
|
2022-02-02 12:27:49 +00:00
|
|
|
}
|
|
|
|
|
2021-08-17 14:30:02 +00:00
|
|
|
// streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
|
|
|
|
// the client.
|
|
|
|
type streamReadWriteAcker struct {
|
2021-11-12 09:37:28 +00:00
|
|
|
*quicpogs.RequestServerStream
|
2023-01-16 12:42:59 +00:00
|
|
|
connectResponseSent bool
|
2021-08-17 14:30:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// AckConnection acks response back to the proxy.
|
2022-07-26 21:00:53 +00:00
|
|
|
func (s *streamReadWriteAcker) AckConnection(tracePropagation string) error {
|
2023-02-04 02:01:27 +00:00
|
|
|
metadata := []quicpogs.Metadata{}
|
|
|
|
// Only add tracing if provided by origintunneld
|
|
|
|
if tracePropagation != "" {
|
|
|
|
metadata = append(metadata, quicpogs.Metadata{
|
|
|
|
Key: tracing.CanonicalCloudflaredTracingHeader,
|
|
|
|
Val: tracePropagation,
|
|
|
|
})
|
2022-07-26 21:00:53 +00:00
|
|
|
}
|
2023-01-16 12:42:59 +00:00
|
|
|
s.connectResponseSent = true
|
2023-02-04 02:01:27 +00:00
|
|
|
return s.WriteConnectResponseData(nil, metadata...)
|
2021-08-17 14:30:02 +00:00
|
|
|
}
|
|
|
|
|
2021-08-03 07:09:56 +00:00
|
|
|
// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
|
|
|
|
type httpResponseAdapter struct {
|
2021-11-12 09:37:28 +00:00
|
|
|
*quicpogs.RequestServerStream
|
2023-03-07 18:41:15 +00:00
|
|
|
headers http.Header
|
2023-01-16 12:42:59 +00:00
|
|
|
connectResponseSent bool
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func newHTTPResponseAdapter(s *quicpogs.RequestServerStream) httpResponseAdapter {
|
2023-03-21 18:42:25 +00:00
|
|
|
return httpResponseAdapter{RequestServerStream: s, headers: make(http.Header)}
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
2023-01-16 12:42:59 +00:00
|
|
|
func (hrw *httpResponseAdapter) AddTrailer(trailerName, trailerValue string) {
|
2022-08-16 11:21:58 +00:00
|
|
|
// we do not support trailers over QUIC
|
|
|
|
}
|
|
|
|
|
2023-01-16 12:42:59 +00:00
|
|
|
func (hrw *httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
|
2021-08-03 07:09:56 +00:00
|
|
|
metadata := make([]quicpogs.Metadata, 0)
|
|
|
|
metadata = append(metadata, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
|
|
|
|
for k, vv := range header {
|
|
|
|
for _, v := range vv {
|
|
|
|
httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k)
|
|
|
|
metadata = append(metadata, quicpogs.Metadata{Key: httpHeaderKey, Val: v})
|
|
|
|
}
|
|
|
|
}
|
2023-01-16 12:42:59 +00:00
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
return hrw.WriteConnectResponseData(nil, metadata...)
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
2023-06-30 18:27:37 +00:00
|
|
|
func (hrw *httpResponseAdapter) Write(p []byte) (int, error) {
|
|
|
|
// Make sure to send WriteHeader response if not called yet
|
|
|
|
if !hrw.connectResponseSent {
|
|
|
|
hrw.WriteRespHeaders(http.StatusOK, hrw.headers)
|
|
|
|
}
|
|
|
|
return hrw.RequestServerStream.Write(p)
|
|
|
|
}
|
|
|
|
|
2023-03-07 18:41:15 +00:00
|
|
|
func (hrw *httpResponseAdapter) Header() http.Header {
|
|
|
|
return hrw.headers
|
|
|
|
}
|
|
|
|
|
2023-07-06 13:42:44 +00:00
|
|
|
// This is a no-op Flush because this adapter is over a quic.Stream and we don't need Flush here.
|
|
|
|
func (hrw *httpResponseAdapter) Flush() {}
|
|
|
|
|
2023-03-07 18:41:15 +00:00
|
|
|
func (hrw *httpResponseAdapter) WriteHeader(status int) {
|
|
|
|
hrw.WriteRespHeaders(status, hrw.headers)
|
|
|
|
}
|
|
|
|
|
2023-03-29 16:21:19 +00:00
|
|
|
func (hrw *httpResponseAdapter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
|
|
|
conn := &localProxyConnection{hrw.ReadWriteCloser}
|
|
|
|
readWriter := bufio.NewReadWriter(
|
|
|
|
bufio.NewReader(hrw.ReadWriteCloser),
|
|
|
|
bufio.NewWriter(hrw.ReadWriteCloser),
|
|
|
|
)
|
|
|
|
return conn, readWriter, nil
|
|
|
|
}
|
|
|
|
|
2023-01-16 12:42:59 +00:00
|
|
|
func (hrw *httpResponseAdapter) WriteErrorResponse(err error) {
|
2021-11-12 09:37:28 +00:00
|
|
|
hrw.WriteConnectResponseData(err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
|
2023-01-16 12:42:59 +00:00
|
|
|
func (hrw *httpResponseAdapter) WriteConnectResponseData(respErr error, metadata ...quicpogs.Metadata) error {
|
|
|
|
hrw.connectResponseSent = true
|
|
|
|
return hrw.RequestServerStream.WriteConnectResponseData(respErr, metadata...)
|
|
|
|
}
|
|
|
|
|
2022-07-26 21:00:53 +00:00
|
|
|
func buildHTTPRequest(
|
|
|
|
ctx context.Context,
|
|
|
|
connectRequest *quicpogs.ConnectRequest,
|
|
|
|
body io.ReadCloser,
|
2023-02-22 14:52:44 +00:00
|
|
|
connIndex uint8,
|
2022-07-26 21:00:53 +00:00
|
|
|
log *zerolog.Logger,
|
|
|
|
) (*tracing.TracedHTTPRequest, error) {
|
2021-08-03 07:09:56 +00:00
|
|
|
metadata := connectRequest.MetadataMap()
|
|
|
|
dest := connectRequest.Dest
|
|
|
|
method := metadata[HTTPMethodKey]
|
|
|
|
host := metadata[HTTPHostKey]
|
2021-10-07 14:47:27 +00:00
|
|
|
isWebsocket := connectRequest.Type == quicpogs.ConnectionTypeWebsocket
|
2021-08-03 07:09:56 +00:00
|
|
|
|
2022-07-07 23:01:37 +00:00
|
|
|
req, err := http.NewRequestWithContext(ctx, method, dest, body)
|
2021-08-03 07:09:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
req.Host = host
|
|
|
|
for _, metadata := range connectRequest.Metadata {
|
|
|
|
if strings.Contains(metadata.Key, HTTPHeaderKey) {
|
|
|
|
// metadata.Key is off the format httpHeaderKey:<HTTPHeader>
|
|
|
|
httpHeaderKey := strings.Split(metadata.Key, ":")
|
|
|
|
if len(httpHeaderKey) != 2 {
|
2021-11-14 11:18:05 +00:00
|
|
|
return nil, fmt.Errorf("header Key: %s malformed", metadata.Key)
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
|
|
|
req.Header.Add(httpHeaderKey[1], metadata.Val)
|
|
|
|
}
|
|
|
|
}
|
2021-09-27 13:12:11 +00:00
|
|
|
// Go's http.Client automatically sends chunked request body if this value is not set on the
|
|
|
|
// *http.Request struct regardless of header:
|
|
|
|
// https://go.googlesource.com/go/+/go1.8rc2/src/net/http/transfer.go#154.
|
|
|
|
if err := setContentLength(req); err != nil {
|
|
|
|
return nil, fmt.Errorf("Error setting content-length: %w", err)
|
|
|
|
}
|
2021-10-07 14:47:27 +00:00
|
|
|
|
|
|
|
// Go's client defaults to chunked encoding after a 200ms delay if the following cases are true:
|
|
|
|
// * the request body blocks
|
|
|
|
// * the content length is not set (or set to -1)
|
|
|
|
// * the method doesn't usually have a body (GET, HEAD, DELETE, ...)
|
|
|
|
// * there is no transfer-encoding=chunked already set.
|
|
|
|
// So, if transfer cannot be chunked and content length is 0, we dont set a request body.
|
|
|
|
if !isWebsocket && !isTransferEncodingChunked(req) && req.ContentLength == 0 {
|
2022-03-05 18:05:07 +00:00
|
|
|
req.Body = http.NoBody
|
2021-10-07 14:47:27 +00:00
|
|
|
}
|
2021-08-03 07:09:56 +00:00
|
|
|
stripWebsocketUpgradeHeader(req)
|
2022-04-06 23:20:29 +00:00
|
|
|
|
|
|
|
// Check for tracing on request
|
2023-02-22 14:52:44 +00:00
|
|
|
tracedReq := tracing.NewTracedHTTPRequest(req, connIndex, log)
|
2022-04-06 23:20:29 +00:00
|
|
|
return tracedReq, err
|
2021-08-03 07:09:56 +00:00
|
|
|
}
|
2021-09-27 13:12:11 +00:00
|
|
|
|
|
|
|
func setContentLength(req *http.Request) error {
|
|
|
|
var err error
|
|
|
|
if contentLengthStr := req.Header.Get("Content-Length"); contentLengthStr != "" {
|
|
|
|
req.ContentLength, err = strconv.ParseInt(contentLengthStr, 10, 64)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
2021-10-07 14:47:27 +00:00
|
|
|
|
|
|
|
func isTransferEncodingChunked(req *http.Request) bool {
|
|
|
|
transferEncodingVal := req.Header.Get("Transfer-Encoding")
|
|
|
|
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding suggests that this can be a comma
|
|
|
|
// separated value as well.
|
|
|
|
return strings.Contains(strings.ToLower(transferEncodingVal), "chunked")
|
|
|
|
}
|
2022-08-09 17:10:51 +00:00
|
|
|
|
2022-08-22 22:48:45 +00:00
|
|
|
// A helper struct that guarantees a call to close only affects read side, but not write side.
|
2022-08-09 17:10:51 +00:00
|
|
|
type nopCloserReadWriter struct {
|
|
|
|
io.ReadWriteCloser
|
2022-08-22 22:48:45 +00:00
|
|
|
|
|
|
|
// for use by Read only
|
|
|
|
// we don't need a memory barrier here because there is an implicit assumption that
|
|
|
|
// Read calls can't happen concurrently by different go-routines.
|
|
|
|
sawEOF bool
|
|
|
|
// should be updated and read using atomic primitives.
|
|
|
|
// value is read in Read method and written in Close method, which could be done by different
|
|
|
|
// go-routines.
|
|
|
|
closed uint32
|
|
|
|
}
|
|
|
|
|
|
|
|
func (np *nopCloserReadWriter) Read(p []byte) (n int, err error) {
|
|
|
|
if np.sawEOF {
|
|
|
|
return 0, io.EOF
|
|
|
|
}
|
|
|
|
|
|
|
|
if atomic.LoadUint32(&np.closed) > 0 {
|
|
|
|
return 0, fmt.Errorf("closed by handler")
|
|
|
|
}
|
|
|
|
|
|
|
|
n, err = np.ReadWriteCloser.Read(p)
|
|
|
|
if err == io.EOF {
|
|
|
|
np.sawEOF = true
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
2022-08-09 17:10:51 +00:00
|
|
|
}
|
|
|
|
|
2022-08-22 22:48:45 +00:00
|
|
|
func (np *nopCloserReadWriter) Close() error {
|
|
|
|
atomic.StoreUint32(&np.closed, 1)
|
|
|
|
|
2022-08-09 17:10:51 +00:00
|
|
|
return nil
|
|
|
|
}
|
2022-09-13 13:00:54 +00:00
|
|
|
|
2022-10-13 20:30:43 +00:00
|
|
|
// muxerWrapper wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
|
|
|
|
type muxerWrapper struct {
|
2022-09-13 13:00:54 +00:00
|
|
|
muxer *quicpogs.DatagramMuxerV2
|
|
|
|
}
|
|
|
|
|
2022-10-13 20:30:43 +00:00
|
|
|
func (rp *muxerWrapper) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
|
|
|
|
return rp.muxer.SendPacket(quicpogs.RawPacket(pk))
|
2022-09-13 13:00:54 +00:00
|
|
|
}
|
|
|
|
|
2022-10-13 20:30:43 +00:00
|
|
|
func (rp *muxerWrapper) ReceivePacket(ctx context.Context) (packet.RawPacket, error) {
|
|
|
|
pk, err := rp.muxer.ReceivePacket(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return packet.RawPacket{}, err
|
|
|
|
}
|
|
|
|
rawPacket, ok := pk.(quicpogs.RawPacket)
|
|
|
|
if ok {
|
|
|
|
return packet.RawPacket(rawPacket), nil
|
|
|
|
}
|
|
|
|
return packet.RawPacket{}, fmt.Errorf("unexpected packet type %+v", pk)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rp *muxerWrapper) Close() error {
|
2022-09-13 13:00:54 +00:00
|
|
|
return nil
|
|
|
|
}
|
2022-10-12 16:01:25 +00:00
|
|
|
|
2023-02-28 16:11:42 +00:00
|
|
|
func createUDPConnForConnIndex(connIndex uint8, localIP net.IP, logger *zerolog.Logger) (*net.UDPConn, error) {
|
2022-10-12 16:01:25 +00:00
|
|
|
portMapMutex.Lock()
|
|
|
|
defer portMapMutex.Unlock()
|
|
|
|
|
2023-02-28 16:11:42 +00:00
|
|
|
if localIP == nil {
|
|
|
|
localIP = net.IPv4zero
|
|
|
|
}
|
|
|
|
|
2022-10-12 16:01:25 +00:00
|
|
|
// if port was not set yet, it will be zero, so bind will randomly allocate one.
|
|
|
|
if port, ok := portForConnIndex[connIndex]; ok {
|
2023-02-28 16:11:42 +00:00
|
|
|
udpConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: localIP, Port: port})
|
2022-10-12 16:01:25 +00:00
|
|
|
// if there wasn't an error, or if port was 0 (independently of error or not, just return)
|
|
|
|
if err == nil {
|
|
|
|
return udpConn, nil
|
|
|
|
} else {
|
|
|
|
logger.Debug().Err(err).Msgf("Unable to reuse port %d for connIndex %d. Falling back to random allocation.", port, connIndex)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// if we reached here, then there was an error or port as not been allocated it.
|
2023-02-28 16:11:42 +00:00
|
|
|
udpConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: localIP, Port: 0})
|
2022-10-12 16:01:25 +00:00
|
|
|
if err == nil {
|
|
|
|
udpAddr, ok := (udpConn.LocalAddr()).(*net.UDPAddr)
|
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("unable to cast to udpConn")
|
|
|
|
}
|
|
|
|
portForConnIndex[connIndex] = udpAddr.Port
|
|
|
|
} else {
|
|
|
|
delete(portForConnIndex, connIndex)
|
|
|
|
}
|
|
|
|
|
|
|
|
return udpConn, err
|
|
|
|
}
|
|
|
|
|
|
|
|
type wrapCloseableConnQuicConnection struct {
|
|
|
|
quic.Connection
|
|
|
|
udpConn *net.UDPConn
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *wrapCloseableConnQuicConnection) CloseWithError(errorCode quic.ApplicationErrorCode, reason string) error {
|
|
|
|
err := w.Connection.CloseWithError(errorCode, reason)
|
|
|
|
w.udpConn.Close()
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|