diff --git a/quic/v3/datagram.go b/quic/v3/datagram.go index a17804c2..36123d8d 100644 --- a/quic/v3/datagram.go +++ b/quic/v3/datagram.go @@ -284,6 +284,8 @@ const ( ResponseDestinationUnreachable SessionRegistrationResp = 0x01 // Session registration was unable to bind to a local UDP socket. ResponseUnableToBindSocket SessionRegistrationResp = 0x02 + // Session registration is already bound to another connection. + ResponseSessionAlreadyConnected SessionRegistrationResp = 0x03 // Session registration failed with an unexpected error but provided a message. ResponseErrorWithMsg SessionRegistrationResp = 0xff ) diff --git a/quic/v3/datagram_errors.go b/quic/v3/datagram_errors.go index 244915db..9d92b7ea 100644 --- a/quic/v3/datagram_errors.go +++ b/quic/v3/datagram_errors.go @@ -7,7 +7,7 @@ import ( var ( ErrInvalidDatagramType error = errors.New("invalid datagram type expected") - ErrDatagramHeaderTooSmall error = fmt.Errorf("datagram should have at least %d bytes", datagramTypeLen) + ErrDatagramHeaderTooSmall error = fmt.Errorf("datagram should have at least %d byte", datagramTypeLen) ErrDatagramPayloadTooLarge error = errors.New("payload length is too large to be bundled in datagram") ErrDatagramPayloadHeaderTooSmall error = errors.New("payload length is too small to fit the datagram header") ErrDatagramPayloadInvalidSize error = errors.New("datagram provided is an invalid size") diff --git a/quic/v3/muxer.go b/quic/v3/muxer.go index fda16bbe..e35f03a2 100644 --- a/quic/v3/muxer.go +++ b/quic/v3/muxer.go @@ -1,8 +1,227 @@ package v3 +import ( + "context" + "errors" + + "github.com/rs/zerolog" +) + +const ( + // Allocating a 16 channel buffer here allows for the writer to be slightly faster than the reader. + // This has worked previously well for datagramv2, so we will start with this as well + demuxChanCapacity = 16 +) + +// DatagramConn is the bridge that multiplexes writes and reads of datagrams for UDP sessions and ICMP packets to +// a connection. +type DatagramConn interface { + DatagramWriter + // Serve provides a server interface to process and handle incoming QUIC datagrams and demux their datagram v3 payloads. + Serve(context.Context) error +} + // DatagramWriter provides the Muxer interface to create proper Datagrams when sending over a connection. type DatagramWriter interface { SendUDPSessionDatagram(datagram []byte) error SendUDPSessionResponse(id RequestID, resp SessionRegistrationResp) error //SendICMPPacket(packet packet.IP) error } + +// QuicConnection provides an interface that matches [quic.Connection] for only the datagram operations. +// +// We currently rely on the mutex for the [quic.Connection.SendDatagram] and [quic.Connection.ReceiveDatagram] and +// do not have any locking for them. If the implementation in quic-go were to ever change, we would need to make +// sure that we lock properly on these operations. +type QuicConnection interface { + Context() context.Context + SendDatagram(payload []byte) error + ReceiveDatagram(context.Context) ([]byte, error) +} + +type datagramConn struct { + conn QuicConnection + sessionManager SessionManager + logger *zerolog.Logger + + datagrams chan []byte + readErrors chan error +} + +func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, logger *zerolog.Logger) DatagramConn { + log := logger.With().Uint8("datagramVersion", 3).Logger() + return &datagramConn{ + conn: conn, + sessionManager: sessionManager, + logger: &log, + datagrams: make(chan []byte, demuxChanCapacity), + readErrors: make(chan error, 2), + } +} + +func (c *datagramConn) SendUDPSessionDatagram(datagram []byte) error { + return c.conn.SendDatagram(datagram) +} + +func (c *datagramConn) SendUDPSessionResponse(id RequestID, resp SessionRegistrationResp) error { + datagram := UDPSessionRegistrationResponseDatagram{ + RequestID: id, + ResponseType: resp, + } + data, err := datagram.MarshalBinary() + if err != nil { + return err + } + return c.conn.SendDatagram(data) +} + +var errReadTimeout error = errors.New("receive datagram timeout") + +// pollDatagrams will read datagrams from the underlying connection until the provided context is done. +func (c *datagramConn) pollDatagrams(ctx context.Context) { + for ctx.Err() == nil { + datagram, err := c.conn.ReceiveDatagram(ctx) + // If the read returns an error, we want to return the failure to the channel. + if err != nil { + c.readErrors <- err + return + } + c.datagrams <- datagram + } + if ctx.Err() != nil { + c.readErrors <- ctx.Err() + } +} + +// Serve will begin the process of receiving datagrams from the [quic.Connection] and demuxing them to their destination. +// The [DatagramConn] when serving, will be responsible for the sessions it accepts. +func (c *datagramConn) Serve(ctx context.Context) error { + connCtx := c.conn.Context() + // We want to make sure that we cancel the reader context if the Serve method returns. This could also mean that the + // underlying connection is also closing, but that is handled outside of the context of the datagram muxer. + readCtx, cancel := context.WithCancel(connCtx) + defer cancel() + go c.pollDatagrams(readCtx) + for { + // We make sure to monitor the context of cloudflared and the underlying connection to return if any errors occur. + var datagram []byte + select { + // Monitor the context of cloudflared + case <-ctx.Done(): + return ctx.Err() + // Monitor the context of the underlying connection + case <-connCtx.Done(): + return connCtx.Err() + // Monitor for any hard errors from reading the connection + case err := <-c.readErrors: + return err + // Otherwise, wait and dequeue datagrams as they come in + case d := <-c.datagrams: + datagram = d + } + + // Each incoming datagram will be processed in a new go routine to handle the demuxing and action associated. + go func() { + typ, err := parseDatagramType(datagram) + if err != nil { + c.logger.Err(err).Msgf("unable to parse datagram type: %d", typ) + return + } + switch typ { + case UDPSessionRegistrationType: + reg := &UDPSessionRegistrationDatagram{} + err := reg.UnmarshalBinary(datagram) + if err != nil { + c.logger.Err(err).Msgf("unable to unmarshal session registration datagram") + return + } + // We bind the new session to the quic connection context instead of cloudflared context to allow for the + // quic connection to close and close only the sessions bound to it. Closing of cloudflared will also + // initiate the close of the quic connection, so we don't have to worry about the application context + // in the scope of a session. + c.handleSessionRegistrationDatagram(connCtx, reg) + case UDPSessionPayloadType: + payload := &UDPSessionPayloadDatagram{} + err := payload.UnmarshalBinary(datagram) + if err != nil { + c.logger.Err(err).Msgf("unable to unmarshal session payload datagram") + return + } + c.handleSessionPayloadDatagram(payload) + case UDPSessionRegistrationResponseType: + // cloudflared should never expect to receive UDP session responses as it will not initiate new + // sessions towards the edge. + c.logger.Error().Msgf("unexpected datagram type received: %d", UDPSessionRegistrationResponseType) + return + default: + c.logger.Error().Msgf("unknown datagram type received: %d", typ) + } + }() + } +} + +// This method handles new registrations of a session and the serve loop for the session. +func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram) { + session, err := c.sessionManager.RegisterSession(datagram, c) + if err != nil { + c.logger.Err(err).Msgf("session registration failure") + c.handleSessionRegistrationFailure(datagram.RequestID, err) + return + } + // Make sure to eventually remove the session from the session manager when the session is closed + defer c.sessionManager.UnregisterSession(session.ID()) + + // Respond that we are able to process the new session + err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk) + if err != nil { + c.logger.Err(err).Msgf("session registration failure: unable to send session registration response") + return + } + + // We bind the context of the session to the [quic.Connection] that initiated the session. + // [Session.Serve] is blocking and will continue this go routine till the end of the session lifetime. + err = session.Serve(ctx) + if err == nil { + // We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical + // expected error response. + c.logger.Warn().Msg("session was closed without explicit close or timeout") + return + } + // SessionIdleErr and SessionCloseErr are valid and successful error responses to end a session. + if errors.Is(err, SessionIdleErr{}) || errors.Is(err, SessionCloseErr) { + c.logger.Debug().Msg(err.Error()) + return + } + + // All other errors should be reported as errors + c.logger.Err(err).Msgf("session was closed with an error") +} + +func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, regErr error) { + var errResp SessionRegistrationResp + switch regErr { + case ErrSessionBoundToOtherConn: + errResp = ResponseSessionAlreadyConnected + default: + errResp = ResponseUnableToBindSocket + } + err := c.SendUDPSessionResponse(requestID, errResp) + if err != nil { + c.logger.Err(err).Msgf("unable to send session registration error response (%d)", errResp) + } +} + +// Handles incoming datagrams that need to be sent to a registered session. +func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram) { + s, err := c.sessionManager.GetSession(datagram.RequestID) + if err != nil { + c.logger.Err(err).Msgf("unable to find session") + return + } + // We ignore the bytes written to the socket because any partial write must return an error. + _, err = s.Write(datagram.Payload) + if err != nil { + c.logger.Err(err).Msgf("unable to write payload for unavailable session") + return + } +} diff --git a/quic/v3/muxer_test.go b/quic/v3/muxer_test.go index 552281a5..b2cb7e06 100644 --- a/quic/v3/muxer_test.go +++ b/quic/v3/muxer_test.go @@ -1,6 +1,21 @@ package v3_test -import v3 "github.com/cloudflare/cloudflared/quic/v3" +import ( + "bytes" + "context" + "errors" + "net" + "net/netip" + "slices" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + + "github.com/cloudflare/cloudflared/ingress" + v3 "github.com/cloudflare/cloudflared/quic/v3" +) type noopEyeball struct{} @@ -48,3 +63,435 @@ func (m *mockEyeball) SendUDPSessionResponse(id v3.RequestID, resp v3.SessionReg } return nil } + +func TestDatagramConn_New(t *testing.T) { + log := zerolog.Nop() + conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&log, ingress.DialUDPAddrPort), &log) + if conn == nil { + t.Fatal("expected valid connection") + } +} + +func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), &log) + + payload := []byte{0xef, 0xef} + conn.SendUDPSessionDatagram(payload) + p := <-quic.recv + if !slices.Equal(p, payload) { + t.Fatal("datagram sent does not match datagram received on quic side") + } +} + +func TestDatagramConn_SendUDPSessionResponse(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), &log) + + conn.SendUDPSessionResponse(testRequestID, v3.ResponseDestinationUnreachable) + resp := <-quic.recv + var response v3.UDPSessionRegistrationResponseDatagram + err := response.UnmarshalBinary(resp) + if err != nil { + t.Fatal(err) + } + expected := v3.UDPSessionRegistrationResponseDatagram{ + RequestID: testRequestID, + ResponseType: v3.ResponseDestinationUnreachable, + } + if response != expected { + t.Fatal("datagram response sent does not match expected datagram response received") + } +} + +func TestDatagramConnServe_ApplicationClosed(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), &log) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := conn.Serve(ctx) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatal(err) + } +} + +func TestDatagramConnServe_ConnectionClosed(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + quic.ctx = ctx + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), &log) + + err := conn.Serve(context.Background()) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatal(err) + } +} + +func TestDatagramConnServe_ReceiveDatagramError(t *testing.T) { + log := zerolog.Nop() + quic := &mockQuicConnReadError{err: net.ErrClosed} + conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&log, ingress.DialUDPAddrPort), &log) + + err := conn.Serve(context.Background()) + if !errors.Is(err, net.ErrClosed) { + t.Fatal(err) + } +} + +func TestDatagramConnServe_ErrorDatagramTypes(t *testing.T) { + for _, test := range []struct { + name string + input []byte + expected string + }{ + { + "empty", + []byte{}, + "{\"level\":\"error\",\"datagramVersion\":3,\"error\":\"datagram should have at least 1 byte\",\"message\":\"unable to parse datagram type: 0\"}\n", + }, + { + "unexpected", + []byte{byte(v3.UDPSessionRegistrationResponseType)}, + "{\"level\":\"error\",\"datagramVersion\":3,\"message\":\"unexpected datagram type received: 3\"}\n", + }, + { + "unknown", + []byte{99}, + "{\"level\":\"error\",\"datagramVersion\":3,\"message\":\"unknown datagram type received: 99\"}\n", + }, + } { + t.Run(test.name, func(t *testing.T) { + logOutput := new(LockedBuffer) + log := zerolog.New(logOutput) + quic := newMockQuicConn() + quic.send <- test.input + conn := v3.NewDatagramConn(quic, &mockSessionManager{}, &log) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := conn.Serve(ctx) + // we cancel the Serve method to check to see if the log output was written since the unsupported datagram + // is dropped with only a log message as a side-effect. + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatal(err) + } + + out := logOutput.String() + if out != test.expected { + t.Fatalf("incorrect log output expected: %s", out) + } + }) + } +} + +type LockedBuffer struct { + bytes.Buffer + l sync.Mutex +} + +func (b *LockedBuffer) Write(p []byte) (n int, err error) { + b.l.Lock() + defer b.l.Unlock() + return b.Buffer.Write(p) +} + +func (b *LockedBuffer) String() string { + b.l.Lock() + defer b.l.Unlock() + return b.Buffer.String() +} + +func TestDatagramConnServe_RegisterSession_SessionManagerError(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + expectedErr := errors.New("unable to register session") + sessionManager := mockSessionManager{expectedRegErr: expectedErr} + conn := v3.NewDatagramConn(quic, &sessionManager, &log) + + // Setup the muxer + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(errors.New("other error")) + done := make(chan error, 1) + go func() { + done <- conn.Serve(ctx) + }() + + // Send new session registration + datagram := newRegisterSessionDatagram(testRequestID) + quic.send <- datagram + + // Wait for session registration response with failure + datagram = <-quic.recv + var resp v3.UDPSessionRegistrationResponseDatagram + err := resp.UnmarshalBinary(datagram) + if err != nil { + t.Fatal(err) + } + + if resp.RequestID != testRequestID && resp.ResponseType != v3.ResponseUnableToBindSocket { + t.Fatalf("expected registration response failure") + } + + // Cancel the muxer Serve context and make sure it closes with the expected error + cancel(expectedContextCanceled) + err = <-done + if !errors.Is(err, context.Canceled) { + t.Fatal(err) + } + if !errors.Is(context.Cause(ctx), expectedContextCanceled) { + t.Fatal(err) + } +} + +func TestDatagramConnServe(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + session := newMockSession() + sessionManager := mockSessionManager{session: &session} + conn := v3.NewDatagramConn(quic, &sessionManager, &log) + + // Setup the muxer + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(errors.New("other error")) + done := make(chan error, 1) + go func() { + done <- conn.Serve(ctx) + }() + + // Send new session registration + datagram := newRegisterSessionDatagram(testRequestID) + quic.send <- datagram + + // Wait for session registration response with success + datagram = <-quic.recv + var resp v3.UDPSessionRegistrationResponseDatagram + err := resp.UnmarshalBinary(datagram) + if err != nil { + t.Fatal(err) + } + + if resp.RequestID != testRequestID && resp.ResponseType != v3.ResponseOk { + t.Fatalf("expected registration response ok") + } + + // We expect the session to be served + timer := time.NewTimer(15 * time.Second) + defer timer.Stop() + select { + case <-session.served: + break + case <-timer.C: + t.Fatalf("expected session serve to be called") + } + + // Cancel the muxer Serve context and make sure it closes with the expected error + cancel(expectedContextCanceled) + err = <-done + if !errors.Is(err, context.Canceled) { + t.Fatal(err) + } + if !errors.Is(context.Cause(ctx), expectedContextCanceled) { + t.Fatal(err) + } +} + +func TestDatagramConnServe_Payload_GetSessionError(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + sessionManager := mockSessionManager{session: nil, expectedGetErr: v3.ErrSessionNotFound} + conn := v3.NewDatagramConn(quic, &sessionManager, &log) + + // Setup the muxer + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(errors.New("other error")) + done := make(chan error, 1) + go func() { + done <- conn.Serve(ctx) + }() + + // Send new session registration + datagram := newSessionPayloadDatagram(testRequestID, []byte{0xef, 0xef}) + quic.send <- datagram + + // Cancel the muxer Serve context and make sure it closes with the expected error + cancel(expectedContextCanceled) + err := <-done + if !errors.Is(err, context.Canceled) { + t.Fatal(err) + } + if !errors.Is(context.Cause(ctx), expectedContextCanceled) { + t.Fatal(err) + } +} + +func TestDatagramConnServe_Payload(t *testing.T) { + log := zerolog.Nop() + quic := newMockQuicConn() + session := newMockSession() + sessionManager := mockSessionManager{session: &session} + conn := v3.NewDatagramConn(quic, &sessionManager, &log) + + // Setup the muxer + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(errors.New("other error")) + done := make(chan error, 1) + go func() { + done <- conn.Serve(ctx) + }() + + // Send new session registration + expectedPayload := []byte{0xef, 0xef} + datagram := newSessionPayloadDatagram(testRequestID, expectedPayload) + quic.send <- datagram + + // Session should receive the payload + payload := <-session.recv + if !slices.Equal(expectedPayload, payload) { + t.Fatalf("expected session receieve the payload sent via the muxer") + } + + // Cancel the muxer Serve context and make sure it closes with the expected error + cancel(expectedContextCanceled) + err := <-done + if !errors.Is(err, context.Canceled) { + t.Fatal(err) + } + if !errors.Is(context.Cause(ctx), expectedContextCanceled) { + t.Fatal(err) + } +} + +func newRegisterSessionDatagram(id v3.RequestID) []byte { + datagram := v3.UDPSessionRegistrationDatagram{ + RequestID: id, + Dest: netip.MustParseAddrPort("127.0.0.1:8080"), + IdleDurationHint: 5 * time.Second, + } + payload, err := datagram.MarshalBinary() + if err != nil { + panic(err) + } + return payload +} + +func newRegisterResponseSessionDatagram(id v3.RequestID, resp v3.SessionRegistrationResp) []byte { + datagram := v3.UDPSessionRegistrationResponseDatagram{ + RequestID: id, + ResponseType: resp, + } + payload, err := datagram.MarshalBinary() + if err != nil { + panic(err) + } + return payload +} + +func newSessionPayloadDatagram(id v3.RequestID, payload []byte) []byte { + datagram := make([]byte, len(payload)+17) + err := v3.MarshalPayloadHeaderTo(id, datagram[:]) + if err != nil { + panic(err) + } + copy(datagram[17:], payload) + return datagram +} + +type mockQuicConn struct { + ctx context.Context + send chan []byte + recv chan []byte +} + +func newMockQuicConn() *mockQuicConn { + return &mockQuicConn{ + ctx: context.Background(), + send: make(chan []byte, 1), + recv: make(chan []byte, 1), + } +} + +func (m *mockQuicConn) Context() context.Context { + return m.ctx +} + +func (m *mockQuicConn) SendDatagram(payload []byte) error { + b := make([]byte, len(payload)) + copy(b, payload) + m.recv <- b + return nil +} + +func (m *mockQuicConn) ReceiveDatagram(_ context.Context) ([]byte, error) { + return <-m.send, nil +} + +type mockQuicConnReadError struct { + err error +} + +func (m *mockQuicConnReadError) Context() context.Context { + return context.Background() +} + +func (m *mockQuicConnReadError) SendDatagram(payload []byte) error { + return nil +} + +func (m *mockQuicConnReadError) ReceiveDatagram(_ context.Context) ([]byte, error) { + return nil, m.err +} + +type mockSessionManager struct { + session v3.Session + + expectedRegErr error + expectedGetErr error +} + +func (m *mockSessionManager) RegisterSession(request *v3.UDPSessionRegistrationDatagram, conn v3.DatagramWriter) (v3.Session, error) { + return m.session, m.expectedRegErr +} + +func (m *mockSessionManager) GetSession(requestID v3.RequestID) (v3.Session, error) { + return m.session, m.expectedGetErr +} + +func (m *mockSessionManager) UnregisterSession(requestID v3.RequestID) {} + +type mockSession struct { + served chan struct{} + recv chan []byte +} + +func newMockSession() mockSession { + return mockSession{ + served: make(chan struct{}), + recv: make(chan []byte, 1), + } +} + +func (m *mockSession) ID() v3.RequestID { + return testRequestID +} + +func (m *mockSession) Serve(ctx context.Context) error { + close(m.served) + return v3.SessionCloseErr +} + +func (m *mockSession) Write(payload []byte) (n int, err error) { + b := make([]byte, len(payload)) + copy(b, payload) + m.recv <- b + return len(b), nil +} + +func (m *mockSession) Close() error { + return nil +}