2021-07-08 09:29:49 +00:00
|
|
|
package quic
|
|
|
|
|
|
|
|
import (
|
2021-11-12 09:37:28 +00:00
|
|
|
"context"
|
2021-07-08 09:29:49 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
2021-11-12 09:37:28 +00:00
|
|
|
"net"
|
2021-12-02 11:02:27 +00:00
|
|
|
"time"
|
2021-07-08 09:29:49 +00:00
|
|
|
|
|
|
|
capnp "zombiezen.com/go/capnproto2"
|
2021-11-12 09:37:28 +00:00
|
|
|
"zombiezen.com/go/capnproto2/rpc"
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
|
|
|
|
"github.com/cloudflare/cloudflared/tunnelrpc"
|
|
|
|
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
2021-07-08 09:29:49 +00:00
|
|
|
)
|
|
|
|
|
TUN-5621: Correctly manage QUIC stream closing
Until this PR, we were naively closing the quic.Stream whenever
the callstack for handling the request (HTTP or TCP) finished.
However, our proxy handler may still be reading or writing from
the quic.Stream at that point, because we return the callstack if
either side finishes, but not necessarily both.
This is a problem for quic-go library because quic.Stream#Close
cannot be called concurrently with quic.Stream#Write
Furthermore, we also noticed that quic.Stream#Close does nothing
to do receiving stream (since, underneath, quic.Stream has 2 streams,
1 for each direction), thus leaking memory, as explained in:
https://github.com/lucas-clemente/quic-go/issues/3322
This PR addresses both problems by wrapping the quic.Stream that
is passed down to the proxying logic and handle all these concerns.
2022-01-27 22:37:45 +00:00
|
|
|
// ProtocolSignature defines the first 6 bytes of the stream, which is used to distinguish the type of stream. It
|
|
|
|
// ensures whoever performs a handshake does not write data before writing the metadata.
|
2021-11-12 09:37:28 +00:00
|
|
|
type ProtocolSignature [6]byte
|
|
|
|
|
|
|
|
var (
|
|
|
|
// DataStreamProtocolSignature is a custom protocol signature for data stream
|
|
|
|
DataStreamProtocolSignature = ProtocolSignature{0x0A, 0x36, 0xCD, 0x12, 0xA1, 0x3E}
|
|
|
|
|
|
|
|
// RPCStreamProtocolSignature is a custom protocol signature for RPC stream
|
|
|
|
RPCStreamProtocolSignature = ProtocolSignature{0x52, 0xBB, 0x82, 0x5C, 0xDB, 0x65}
|
|
|
|
)
|
2021-07-08 09:29:49 +00:00
|
|
|
|
2021-08-21 19:44:22 +00:00
|
|
|
type protocolVersion string
|
|
|
|
|
|
|
|
const (
|
|
|
|
protocolV1 protocolVersion = "01"
|
TUN-5621: Correctly manage QUIC stream closing
Until this PR, we were naively closing the quic.Stream whenever
the callstack for handling the request (HTTP or TCP) finished.
However, our proxy handler may still be reading or writing from
the quic.Stream at that point, because we return the callstack if
either side finishes, but not necessarily both.
This is a problem for quic-go library because quic.Stream#Close
cannot be called concurrently with quic.Stream#Write
Furthermore, we also noticed that quic.Stream#Close does nothing
to do receiving stream (since, underneath, quic.Stream has 2 streams,
1 for each direction), thus leaking memory, as explained in:
https://github.com/lucas-clemente/quic-go/issues/3322
This PR addresses both problems by wrapping the quic.Stream that
is passed down to the proxying logic and handle all these concerns.
2022-01-27 22:37:45 +00:00
|
|
|
|
|
|
|
protocolVersionLength = 2
|
|
|
|
|
|
|
|
HandshakeIdleTimeout = 5 * time.Second
|
|
|
|
MaxIdleTimeout = 15 * time.Second
|
2021-08-21 19:44:22 +00:00
|
|
|
)
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
// RequestServerStream is a stream to serve requests
|
|
|
|
type RequestServerStream struct {
|
|
|
|
io.ReadWriteCloser
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewRequestServerStream(stream io.ReadWriteCloser, signature ProtocolSignature) (*RequestServerStream, error) {
|
|
|
|
if signature != DataStreamProtocolSignature {
|
|
|
|
return nil, fmt.Errorf("RequestClientStream can only be created from data stream")
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
return &RequestServerStream{stream}, nil
|
|
|
|
}
|
2021-07-08 09:29:49 +00:00
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
// ReadConnectRequestData reads the handshake data from a QUIC stream.
|
|
|
|
func (rss *RequestServerStream) ReadConnectRequestData() (*ConnectRequest, error) {
|
2021-08-21 19:44:22 +00:00
|
|
|
// This is a NO-OP for now. We could cause a branching if we wanted to use multiple versions.
|
2021-11-12 09:37:28 +00:00
|
|
|
if _, err := readVersion(rss); err != nil {
|
2021-08-21 19:44:22 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
msg, err := capnp.NewDecoder(rss).Decode()
|
2021-07-08 09:29:49 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
r := &ConnectRequest{}
|
|
|
|
if err := r.fromPogs(msg); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return r, nil
|
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
// WriteConnectResponseData writes response to a QUIC stream.
|
|
|
|
func (rss *RequestServerStream) WriteConnectResponseData(respErr error, metadata ...Metadata) error {
|
|
|
|
var connectResponse *ConnectResponse
|
|
|
|
if respErr != nil {
|
|
|
|
connectResponse = &ConnectResponse{
|
|
|
|
Error: respErr.Error(),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
connectResponse = &ConnectResponse{
|
|
|
|
Metadata: metadata,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
msg, err := connectResponse.toPogs()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := writeDataStreamPreamble(rss); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return capnp.NewEncoder(rss).Encode(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
type RequestClientStream struct {
|
|
|
|
io.ReadWriteCloser
|
|
|
|
}
|
|
|
|
|
2021-07-08 09:29:49 +00:00
|
|
|
// WriteConnectRequestData writes requestMeta to a stream.
|
2021-11-12 09:37:28 +00:00
|
|
|
func (rcs *RequestClientStream) WriteConnectRequestData(dest string, connectionType ConnectionType, metadata ...Metadata) error {
|
2021-07-08 09:29:49 +00:00
|
|
|
connectRequest := &ConnectRequest{
|
|
|
|
Dest: dest,
|
|
|
|
Type: connectionType,
|
|
|
|
Metadata: metadata,
|
|
|
|
}
|
|
|
|
|
|
|
|
msg, err := connectRequest.toPogs()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
if err := writeDataStreamPreamble(rcs); err != nil {
|
2021-07-08 09:29:49 +00:00
|
|
|
return err
|
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
return capnp.NewEncoder(rcs).Encode(msg)
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ReadConnectResponseData reads the response to a RequestMeta in a stream.
|
2021-11-12 09:37:28 +00:00
|
|
|
func (rcs *RequestClientStream) ReadConnectResponseData() (*ConnectResponse, error) {
|
|
|
|
signature, err := DetermineProtocol(rcs)
|
|
|
|
if err != nil {
|
2021-07-08 09:29:49 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
if signature != DataStreamProtocolSignature {
|
|
|
|
return nil, fmt.Errorf("Wrong protocol signature %v", signature)
|
|
|
|
}
|
2021-07-08 09:29:49 +00:00
|
|
|
|
2021-08-21 19:44:22 +00:00
|
|
|
// This is a NO-OP for now. We could cause a branching if we wanted to use multiple versions.
|
2021-11-12 09:37:28 +00:00
|
|
|
if _, err := readVersion(rcs); err != nil {
|
2021-08-21 19:44:22 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
msg, err := capnp.NewDecoder(rcs).Decode()
|
2021-07-08 09:29:49 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
r := &ConnectResponse{}
|
|
|
|
if err := r.fromPogs(msg); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return r, nil
|
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
// RPCServerStream is a stream to serve RPCs. It is closed when the RPC client is done
|
|
|
|
type RPCServerStream struct {
|
|
|
|
io.ReadWriteCloser
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewRPCServerStream(stream io.ReadWriteCloser, protocol ProtocolSignature) (*RPCServerStream, error) {
|
|
|
|
if protocol != RPCStreamProtocolSignature {
|
|
|
|
return nil, fmt.Errorf("RPCStream can only be created from rpc stream")
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
return &RPCServerStream{stream}, nil
|
|
|
|
}
|
2021-07-08 09:29:49 +00:00
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func (s *RPCServerStream) Serve(sessionManager tunnelpogs.SessionManager, logger *zerolog.Logger) error {
|
2021-12-20 10:58:06 +00:00
|
|
|
// RPC logs are very robust, create a new logger that only logs error to reduce noise
|
|
|
|
rpcLogger := logger.Level(zerolog.ErrorLevel)
|
|
|
|
rpcTransport := tunnelrpc.NewTransportLogger(&rpcLogger, rpc.StreamTransport(s))
|
2021-11-12 09:37:28 +00:00
|
|
|
defer rpcTransport.Close()
|
|
|
|
|
|
|
|
main := tunnelpogs.SessionManager_ServerToClient(sessionManager)
|
|
|
|
rpcConn := rpc.NewConn(
|
|
|
|
rpcTransport,
|
|
|
|
rpc.MainInterface(main.Client),
|
2021-12-20 10:58:06 +00:00
|
|
|
tunnelrpc.ConnLog(&rpcLogger),
|
2021-11-12 09:37:28 +00:00
|
|
|
)
|
|
|
|
defer rpcConn.Close()
|
|
|
|
|
|
|
|
return rpcConn.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
func DetermineProtocol(stream io.Reader) (ProtocolSignature, error) {
|
|
|
|
signature, err := readSignature(stream)
|
2021-07-08 09:29:49 +00:00
|
|
|
if err != nil {
|
2021-11-12 09:37:28 +00:00
|
|
|
return ProtocolSignature{}, err
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
switch signature {
|
|
|
|
case DataStreamProtocolSignature:
|
|
|
|
return DataStreamProtocolSignature, nil
|
|
|
|
case RPCStreamProtocolSignature:
|
|
|
|
return RPCStreamProtocolSignature, nil
|
|
|
|
default:
|
2021-12-20 10:58:06 +00:00
|
|
|
return ProtocolSignature{}, fmt.Errorf("unknown signature %v", signature)
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func writeDataStreamPreamble(stream io.Writer) error {
|
|
|
|
if err := writeSignature(stream, DataStreamProtocolSignature); err != nil {
|
2021-08-21 19:44:22 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return writeVersion(stream)
|
|
|
|
}
|
|
|
|
|
|
|
|
func writeVersion(stream io.Writer) error {
|
|
|
|
_, err := stream.Write([]byte(protocolV1)[:protocolVersionLength])
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func readVersion(stream io.Reader) (string, error) {
|
|
|
|
version := make([]byte, protocolVersionLength)
|
|
|
|
_, err := stream.Read(version)
|
|
|
|
return string(version), err
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func readSignature(stream io.Reader) (ProtocolSignature, error) {
|
|
|
|
var signature ProtocolSignature
|
|
|
|
if _, err := io.ReadFull(stream, signature[:]); err != nil {
|
|
|
|
return ProtocolSignature{}, err
|
|
|
|
}
|
|
|
|
return signature, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func writeSignature(stream io.Writer, signature ProtocolSignature) error {
|
|
|
|
_, err := stream.Write(signature[:])
|
2021-07-08 09:29:49 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
// RPCClientStream is a stream to call methods of SessionManager
|
|
|
|
type RPCClientStream struct {
|
|
|
|
client tunnelpogs.SessionManager_PogsClient
|
|
|
|
transport rpc.Transport
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewRPCClientStream(ctx context.Context, stream io.ReadWriteCloser, logger *zerolog.Logger) (*RPCClientStream, error) {
|
|
|
|
n, err := stream.Write(RPCStreamProtocolSignature[:])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
if n != len(RPCStreamProtocolSignature) {
|
|
|
|
return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(RPCStreamProtocolSignature), n)
|
|
|
|
}
|
|
|
|
transport := tunnelrpc.NewTransportLogger(logger, rpc.StreamTransport(stream))
|
|
|
|
conn := rpc.NewConn(
|
|
|
|
transport,
|
|
|
|
tunnelrpc.ConnLog(logger),
|
|
|
|
)
|
|
|
|
return &RPCClientStream{
|
|
|
|
client: tunnelpogs.SessionManager_PogsClient{Client: conn.Bootstrap(ctx), Conn: conn},
|
|
|
|
transport: transport,
|
|
|
|
}, nil
|
|
|
|
}
|
2021-07-08 09:29:49 +00:00
|
|
|
|
2021-12-02 11:02:27 +00:00
|
|
|
func (rcs *RPCClientStream) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration) error {
|
|
|
|
resp, err := rcs.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint)
|
2021-11-12 09:37:28 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|
2021-11-12 09:37:28 +00:00
|
|
|
return resp.Err
|
|
|
|
}
|
2021-07-08 09:29:49 +00:00
|
|
|
|
2021-12-14 22:52:47 +00:00
|
|
|
func (rcs *RPCClientStream) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
|
|
|
|
return rcs.client.UnregisterUdpSession(ctx, sessionID, message)
|
2021-11-30 19:58:11 +00:00
|
|
|
}
|
|
|
|
|
2021-11-12 09:37:28 +00:00
|
|
|
func (rcs *RPCClientStream) Close() {
|
|
|
|
_ = rcs.client.Close()
|
|
|
|
_ = rcs.transport.Close()
|
2021-07-08 09:29:49 +00:00
|
|
|
}
|