2024-10-31 21:05:15 +00:00
|
|
|
package v3_test
|
|
|
|
|
2024-11-04 19:20:35 +00:00
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"net"
|
|
|
|
"net/netip"
|
|
|
|
"slices"
|
|
|
|
"sync"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
|
|
|
|
"github.com/cloudflare/cloudflared/ingress"
|
|
|
|
v3 "github.com/cloudflare/cloudflared/quic/v3"
|
|
|
|
)
|
2024-10-31 21:05:15 +00:00
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
type noopEyeball struct {
|
|
|
|
connID uint8
|
2024-10-31 21:05:15 +00:00
|
|
|
}
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
func (noopEyeball) Serve(ctx context.Context) error { return nil }
|
|
|
|
func (n noopEyeball) ID() uint8 { return n.connID }
|
|
|
|
func (noopEyeball) SendUDPSessionDatagram(datagram []byte) error { return nil }
|
2024-10-31 21:05:15 +00:00
|
|
|
func (noopEyeball) SendUDPSessionResponse(id v3.RequestID, resp v3.SessionRegistrationResp) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type mockEyeball struct {
|
2024-11-06 20:06:07 +00:00
|
|
|
connID uint8
|
2024-10-31 21:05:15 +00:00
|
|
|
// datagram sent via SendUDPSessionDatagram
|
|
|
|
recvData chan []byte
|
|
|
|
// responses sent via SendUDPSessionResponse
|
|
|
|
recvResp chan struct {
|
|
|
|
id v3.RequestID
|
|
|
|
resp v3.SessionRegistrationResp
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newMockEyeball() mockEyeball {
|
|
|
|
return mockEyeball{
|
2024-11-06 20:06:07 +00:00
|
|
|
connID: 0,
|
2024-10-31 21:05:15 +00:00
|
|
|
recvData: make(chan []byte, 1),
|
|
|
|
recvResp: make(chan struct {
|
|
|
|
id v3.RequestID
|
|
|
|
resp v3.SessionRegistrationResp
|
|
|
|
}, 1),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
func (mockEyeball) Serve(ctx context.Context) error { return nil }
|
|
|
|
func (m *mockEyeball) ID() uint8 { return m.connID }
|
|
|
|
|
2024-10-31 21:05:15 +00:00
|
|
|
func (m *mockEyeball) SendUDPSessionDatagram(datagram []byte) error {
|
|
|
|
b := make([]byte, len(datagram))
|
|
|
|
copy(b, datagram)
|
|
|
|
m.recvData <- b
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockEyeball) SendUDPSessionResponse(id v3.RequestID, resp v3.SessionRegistrationResp) error {
|
|
|
|
m.recvResp <- struct {
|
|
|
|
id v3.RequestID
|
|
|
|
resp v3.SessionRegistrationResp
|
|
|
|
}{
|
|
|
|
id, resp,
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
func TestDatagramConn_New(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(newMockQuicConn(), v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
if conn == nil {
|
|
|
|
t.Fatal("expected valid connection")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConn_SendUDPSessionDatagram(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
payload := []byte{0xef, 0xef}
|
|
|
|
conn.SendUDPSessionDatagram(payload)
|
|
|
|
p := <-quic.recv
|
|
|
|
if !slices.Equal(p, payload) {
|
|
|
|
t.Fatal("datagram sent does not match datagram received on quic side")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConn_SendUDPSessionResponse(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
conn.SendUDPSessionResponse(testRequestID, v3.ResponseDestinationUnreachable)
|
|
|
|
resp := <-quic.recv
|
|
|
|
var response v3.UDPSessionRegistrationResponseDatagram
|
|
|
|
err := response.UnmarshalBinary(resp)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
expected := v3.UDPSessionRegistrationResponseDatagram{
|
|
|
|
RequestID: testRequestID,
|
|
|
|
ResponseType: v3.ResponseDestinationUnreachable,
|
|
|
|
}
|
|
|
|
if response != expected {
|
|
|
|
t.Fatal("datagram response sent does not match expected datagram response received")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_ApplicationClosed(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
|
|
defer cancel()
|
|
|
|
err := conn.Serve(ctx)
|
|
|
|
if !errors.Is(err, context.DeadlineExceeded) {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_ConnectionClosed(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
|
|
defer cancel()
|
|
|
|
quic.ctx = ctx
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
err := conn.Serve(context.Background())
|
|
|
|
if !errors.Is(err, context.DeadlineExceeded) {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_ReceiveDatagramError(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := &mockQuicConnReadError{err: net.ErrClosed}
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort), 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
err := conn.Serve(context.Background())
|
|
|
|
if !errors.Is(err, net.ErrClosed) {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_ErrorDatagramTypes(t *testing.T) {
|
|
|
|
for _, test := range []struct {
|
|
|
|
name string
|
|
|
|
input []byte
|
|
|
|
expected string
|
|
|
|
}{
|
|
|
|
{
|
|
|
|
"empty",
|
|
|
|
[]byte{},
|
|
|
|
"{\"level\":\"error\",\"datagramVersion\":3,\"error\":\"datagram should have at least 1 byte\",\"message\":\"unable to parse datagram type: 0\"}\n",
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"unexpected",
|
|
|
|
[]byte{byte(v3.UDPSessionRegistrationResponseType)},
|
|
|
|
"{\"level\":\"error\",\"datagramVersion\":3,\"message\":\"unexpected datagram type received: 3\"}\n",
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"unknown",
|
|
|
|
[]byte{99},
|
|
|
|
"{\"level\":\"error\",\"datagramVersion\":3,\"message\":\"unknown datagram type received: 99\"}\n",
|
|
|
|
},
|
|
|
|
} {
|
|
|
|
t.Run(test.name, func(t *testing.T) {
|
|
|
|
logOutput := new(LockedBuffer)
|
|
|
|
log := zerolog.New(logOutput)
|
|
|
|
quic := newMockQuicConn()
|
|
|
|
quic.send <- test.input
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, &mockSessionManager{}, 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
|
|
defer cancel()
|
|
|
|
err := conn.Serve(ctx)
|
|
|
|
// we cancel the Serve method to check to see if the log output was written since the unsupported datagram
|
|
|
|
// is dropped with only a log message as a side-effect.
|
|
|
|
if !errors.Is(err, context.DeadlineExceeded) {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
out := logOutput.String()
|
|
|
|
if out != test.expected {
|
|
|
|
t.Fatalf("incorrect log output expected: %s", out)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type LockedBuffer struct {
|
|
|
|
bytes.Buffer
|
|
|
|
l sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *LockedBuffer) Write(p []byte) (n int, err error) {
|
|
|
|
b.l.Lock()
|
|
|
|
defer b.l.Unlock()
|
|
|
|
return b.Buffer.Write(p)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *LockedBuffer) String() string {
|
|
|
|
b.l.Lock()
|
|
|
|
defer b.l.Unlock()
|
|
|
|
return b.Buffer.String()
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_RegisterSession_SessionManagerError(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
|
|
|
expectedErr := errors.New("unable to register session")
|
|
|
|
sessionManager := mockSessionManager{expectedRegErr: expectedErr}
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
// Setup the muxer
|
|
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
|
|
defer cancel(errors.New("other error"))
|
|
|
|
done := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done <- conn.Serve(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Send new session registration
|
|
|
|
datagram := newRegisterSessionDatagram(testRequestID)
|
|
|
|
quic.send <- datagram
|
|
|
|
|
|
|
|
// Wait for session registration response with failure
|
|
|
|
datagram = <-quic.recv
|
|
|
|
var resp v3.UDPSessionRegistrationResponseDatagram
|
|
|
|
err := resp.UnmarshalBinary(datagram)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
if resp.RequestID != testRequestID || resp.ResponseType != v3.ResponseUnableToBindSocket {
|
2024-11-04 19:20:35 +00:00
|
|
|
t.Fatalf("expected registration response failure")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cancel the muxer Serve context and make sure it closes with the expected error
|
2024-11-06 20:06:07 +00:00
|
|
|
assertContextClosed(t, ctx, done, cancel)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
|
|
|
session := newMockSession()
|
|
|
|
sessionManager := mockSessionManager{session: &session}
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log)
|
2024-11-06 20:06:07 +00:00
|
|
|
|
|
|
|
// Setup the muxer
|
|
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
|
|
defer cancel(errors.New("other error"))
|
|
|
|
done := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done <- conn.Serve(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Send new session registration
|
|
|
|
datagram := newRegisterSessionDatagram(testRequestID)
|
|
|
|
quic.send <- datagram
|
|
|
|
|
|
|
|
// Wait for session registration response with success
|
|
|
|
datagram = <-quic.recv
|
|
|
|
var resp v3.UDPSessionRegistrationResponseDatagram
|
|
|
|
err := resp.UnmarshalBinary(datagram)
|
|
|
|
if err != nil {
|
2024-11-04 19:20:35 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
2024-11-06 20:06:07 +00:00
|
|
|
|
|
|
|
if resp.RequestID != testRequestID || resp.ResponseType != v3.ResponseOk {
|
|
|
|
t.Fatalf("expected registration response ok")
|
|
|
|
}
|
|
|
|
|
|
|
|
// We expect the session to be served
|
|
|
|
timer := time.NewTimer(15 * time.Second)
|
|
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
|
|
case <-session.served:
|
|
|
|
break
|
|
|
|
case <-timer.C:
|
|
|
|
t.Fatalf("expected session serve to be called")
|
2024-11-04 19:20:35 +00:00
|
|
|
}
|
2024-11-06 20:06:07 +00:00
|
|
|
|
|
|
|
// Cancel the muxer Serve context and make sure it closes with the expected error
|
|
|
|
assertContextClosed(t, ctx, done, cancel)
|
2024-11-04 19:20:35 +00:00
|
|
|
}
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
func TestDatagramConnServe_RegisterTwice(t *testing.T) {
|
2024-11-04 19:20:35 +00:00
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
|
|
|
session := newMockSession()
|
|
|
|
sessionManager := mockSessionManager{session: &session}
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
// Setup the muxer
|
|
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
|
|
defer cancel(errors.New("other error"))
|
|
|
|
done := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done <- conn.Serve(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Send new session registration
|
|
|
|
datagram := newRegisterSessionDatagram(testRequestID)
|
|
|
|
quic.send <- datagram
|
|
|
|
|
|
|
|
// Wait for session registration response with success
|
|
|
|
datagram = <-quic.recv
|
|
|
|
var resp v3.UDPSessionRegistrationResponseDatagram
|
|
|
|
err := resp.UnmarshalBinary(datagram)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
if resp.RequestID != testRequestID || resp.ResponseType != v3.ResponseOk {
|
|
|
|
t.Fatalf("expected registration response ok")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set the session manager to return already registered
|
|
|
|
sessionManager.expectedRegErr = v3.ErrSessionAlreadyRegistered
|
|
|
|
// Send the registration again as if we didn't receive it at the edge
|
|
|
|
datagram = newRegisterSessionDatagram(testRequestID)
|
|
|
|
quic.send <- datagram
|
|
|
|
|
|
|
|
// Wait for session registration response with success
|
|
|
|
datagram = <-quic.recv
|
|
|
|
err = resp.UnmarshalBinary(datagram)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.RequestID != testRequestID || resp.ResponseType != v3.ResponseOk {
|
2024-11-04 19:20:35 +00:00
|
|
|
t.Fatalf("expected registration response ok")
|
|
|
|
}
|
|
|
|
|
|
|
|
// We expect the session to be served
|
|
|
|
timer := time.NewTimer(15 * time.Second)
|
|
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
|
|
case <-session.served:
|
|
|
|
break
|
|
|
|
case <-timer.C:
|
|
|
|
t.Fatalf("expected session serve to be called")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cancel the muxer Serve context and make sure it closes with the expected error
|
2024-11-06 20:06:07 +00:00
|
|
|
assertContextClosed(t, ctx, done, cancel)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_MigrateConnection(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
|
|
|
session := newMockSession()
|
|
|
|
sessionManager := mockSessionManager{session: &session}
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log)
|
2024-11-06 20:06:07 +00:00
|
|
|
quic2 := newMockQuicConn()
|
2024-11-07 19:02:55 +00:00
|
|
|
conn2 := v3.NewDatagramConn(quic2, &sessionManager, 1, &noopMetrics{}, &log)
|
2024-11-06 20:06:07 +00:00
|
|
|
|
|
|
|
// Setup the muxer
|
|
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
|
|
defer cancel(errors.New("other error"))
|
|
|
|
done := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done <- conn.Serve(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
ctx2, cancel2 := context.WithCancelCause(context.Background())
|
|
|
|
defer cancel2(errors.New("other error"))
|
|
|
|
done2 := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done2 <- conn2.Serve(ctx2)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Send new session registration
|
|
|
|
datagram := newRegisterSessionDatagram(testRequestID)
|
|
|
|
quic.send <- datagram
|
|
|
|
|
|
|
|
// Wait for session registration response with success
|
|
|
|
datagram = <-quic.recv
|
|
|
|
var resp v3.UDPSessionRegistrationResponseDatagram
|
|
|
|
err := resp.UnmarshalBinary(datagram)
|
|
|
|
if err != nil {
|
2024-11-04 19:20:35 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
2024-11-06 20:06:07 +00:00
|
|
|
|
|
|
|
if resp.RequestID != testRequestID || resp.ResponseType != v3.ResponseOk {
|
|
|
|
t.Fatalf("expected registration response ok")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set the session manager to return already registered to another connection
|
|
|
|
sessionManager.expectedRegErr = v3.ErrSessionBoundToOtherConn
|
|
|
|
// Send the registration again as if we didn't receive it at the edge for a new connection
|
|
|
|
datagram = newRegisterSessionDatagram(testRequestID)
|
|
|
|
quic2.send <- datagram
|
|
|
|
|
|
|
|
// Wait for session registration response with success
|
|
|
|
datagram = <-quic2.recv
|
|
|
|
err = resp.UnmarshalBinary(datagram)
|
|
|
|
if err != nil {
|
2024-11-04 19:20:35 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
2024-11-06 20:06:07 +00:00
|
|
|
|
|
|
|
if resp.RequestID != testRequestID || resp.ResponseType != v3.ResponseOk {
|
|
|
|
t.Fatalf("expected registration response ok")
|
|
|
|
}
|
|
|
|
|
|
|
|
// We expect the session to be served
|
|
|
|
timer := time.NewTimer(15 * time.Second)
|
|
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
|
|
case <-session.served:
|
|
|
|
break
|
|
|
|
case <-timer.C:
|
|
|
|
t.Fatalf("expected session serve to be called")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Expect session to be migrated
|
|
|
|
select {
|
|
|
|
case id := <-session.migrated:
|
|
|
|
if id != conn2.ID() {
|
|
|
|
t.Fatalf("expected session to be migrated to connection 2")
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
|
|
|
t.Fatalf("expected session migration to be called")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cancel the muxer Serve context and make sure it closes with the expected error
|
|
|
|
assertContextClosed(t, ctx, done, cancel)
|
|
|
|
// Cancel the second muxer Serve context and make sure it closes with the expected error
|
|
|
|
assertContextClosed(t, ctx2, done2, cancel2)
|
2024-11-04 19:20:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_Payload_GetSessionError(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
2024-11-06 20:06:07 +00:00
|
|
|
// mockSessionManager will return the ErrSessionNotFound for any session attempting to be queried by the muxer
|
2024-11-04 19:20:35 +00:00
|
|
|
sessionManager := mockSessionManager{session: nil, expectedGetErr: v3.ErrSessionNotFound}
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
// Setup the muxer
|
|
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
|
|
defer cancel(errors.New("other error"))
|
|
|
|
done := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done <- conn.Serve(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Send new session registration
|
|
|
|
datagram := newSessionPayloadDatagram(testRequestID, []byte{0xef, 0xef})
|
|
|
|
quic.send <- datagram
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
// Since the muxer should eventually discard a failed registration request, there is no side-effect
|
|
|
|
// that the registration was failed beyond the muxer accepting the registration request. As such, the
|
|
|
|
// test can only ensure that the quic.send channel was consumed and that the muxer closes normally
|
|
|
|
// afterwards with the expected context cancelled trigger.
|
|
|
|
|
2024-11-04 19:20:35 +00:00
|
|
|
// Cancel the muxer Serve context and make sure it closes with the expected error
|
2024-11-06 20:06:07 +00:00
|
|
|
assertContextClosed(t, ctx, done, cancel)
|
2024-11-04 19:20:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func TestDatagramConnServe_Payload(t *testing.T) {
|
|
|
|
log := zerolog.Nop()
|
|
|
|
quic := newMockQuicConn()
|
|
|
|
session := newMockSession()
|
|
|
|
sessionManager := mockSessionManager{session: &session}
|
2024-11-07 19:02:55 +00:00
|
|
|
conn := v3.NewDatagramConn(quic, &sessionManager, 0, &noopMetrics{}, &log)
|
2024-11-04 19:20:35 +00:00
|
|
|
|
|
|
|
// Setup the muxer
|
|
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
|
|
defer cancel(errors.New("other error"))
|
|
|
|
done := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done <- conn.Serve(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Send new session registration
|
|
|
|
expectedPayload := []byte{0xef, 0xef}
|
|
|
|
datagram := newSessionPayloadDatagram(testRequestID, expectedPayload)
|
|
|
|
quic.send <- datagram
|
|
|
|
|
|
|
|
// Session should receive the payload
|
|
|
|
payload := <-session.recv
|
|
|
|
if !slices.Equal(expectedPayload, payload) {
|
|
|
|
t.Fatalf("expected session receieve the payload sent via the muxer")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cancel the muxer Serve context and make sure it closes with the expected error
|
2024-11-06 20:06:07 +00:00
|
|
|
assertContextClosed(t, ctx, done, cancel)
|
2024-11-04 19:20:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func newRegisterSessionDatagram(id v3.RequestID) []byte {
|
|
|
|
datagram := v3.UDPSessionRegistrationDatagram{
|
|
|
|
RequestID: id,
|
|
|
|
Dest: netip.MustParseAddrPort("127.0.0.1:8080"),
|
|
|
|
IdleDurationHint: 5 * time.Second,
|
|
|
|
}
|
|
|
|
payload, err := datagram.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
return payload
|
|
|
|
}
|
|
|
|
|
|
|
|
func newRegisterResponseSessionDatagram(id v3.RequestID, resp v3.SessionRegistrationResp) []byte {
|
|
|
|
datagram := v3.UDPSessionRegistrationResponseDatagram{
|
|
|
|
RequestID: id,
|
|
|
|
ResponseType: resp,
|
|
|
|
}
|
|
|
|
payload, err := datagram.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
return payload
|
|
|
|
}
|
|
|
|
|
|
|
|
func newSessionPayloadDatagram(id v3.RequestID, payload []byte) []byte {
|
|
|
|
datagram := make([]byte, len(payload)+17)
|
|
|
|
err := v3.MarshalPayloadHeaderTo(id, datagram[:])
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
copy(datagram[17:], payload)
|
|
|
|
return datagram
|
|
|
|
}
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
// Cancel the provided context and make sure it closes with the expected cancellation error
|
|
|
|
func assertContextClosed(t *testing.T, ctx context.Context, done <-chan error, cancel context.CancelCauseFunc) {
|
|
|
|
cancel(expectedContextCanceled)
|
|
|
|
err := <-done
|
|
|
|
if !errors.Is(err, context.Canceled) {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
if !errors.Is(context.Cause(ctx), expectedContextCanceled) {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-11-04 19:20:35 +00:00
|
|
|
type mockQuicConn struct {
|
|
|
|
ctx context.Context
|
|
|
|
send chan []byte
|
|
|
|
recv chan []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
func newMockQuicConn() *mockQuicConn {
|
|
|
|
return &mockQuicConn{
|
|
|
|
ctx: context.Background(),
|
|
|
|
send: make(chan []byte, 1),
|
|
|
|
recv: make(chan []byte, 1),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockQuicConn) Context() context.Context {
|
|
|
|
return m.ctx
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockQuicConn) SendDatagram(payload []byte) error {
|
|
|
|
b := make([]byte, len(payload))
|
|
|
|
copy(b, payload)
|
|
|
|
m.recv <- b
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockQuicConn) ReceiveDatagram(_ context.Context) ([]byte, error) {
|
|
|
|
return <-m.send, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type mockQuicConnReadError struct {
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockQuicConnReadError) Context() context.Context {
|
|
|
|
return context.Background()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockQuicConnReadError) SendDatagram(payload []byte) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockQuicConnReadError) ReceiveDatagram(_ context.Context) ([]byte, error) {
|
|
|
|
return nil, m.err
|
|
|
|
}
|
|
|
|
|
|
|
|
type mockSessionManager struct {
|
|
|
|
session v3.Session
|
|
|
|
|
|
|
|
expectedRegErr error
|
|
|
|
expectedGetErr error
|
|
|
|
}
|
|
|
|
|
2024-11-06 20:06:07 +00:00
|
|
|
func (m *mockSessionManager) RegisterSession(request *v3.UDPSessionRegistrationDatagram, conn v3.DatagramConn) (v3.Session, error) {
|
2024-11-04 19:20:35 +00:00
|
|
|
return m.session, m.expectedRegErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockSessionManager) GetSession(requestID v3.RequestID) (v3.Session, error) {
|
|
|
|
return m.session, m.expectedGetErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockSessionManager) UnregisterSession(requestID v3.RequestID) {}
|
|
|
|
|
|
|
|
type mockSession struct {
|
2024-11-06 20:06:07 +00:00
|
|
|
served chan struct{}
|
|
|
|
migrated chan uint8
|
|
|
|
recv chan []byte
|
2024-11-04 19:20:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func newMockSession() mockSession {
|
|
|
|
return mockSession{
|
2024-11-06 20:06:07 +00:00
|
|
|
served: make(chan struct{}),
|
|
|
|
migrated: make(chan uint8, 2),
|
|
|
|
recv: make(chan []byte, 1),
|
2024-11-04 19:20:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-11-21 20:56:47 +00:00
|
|
|
func (m *mockSession) ID() v3.RequestID { return testRequestID }
|
|
|
|
func (m *mockSession) RemoteAddr() net.Addr { return testOriginAddr }
|
|
|
|
func (m *mockSession) LocalAddr() net.Addr { return testLocalAddr }
|
|
|
|
func (m *mockSession) ConnectionID() uint8 { return 0 }
|
|
|
|
func (m *mockSession) Migrate(conn v3.DatagramConn, ctx context.Context, log *zerolog.Logger) {
|
|
|
|
m.migrated <- conn.ID()
|
|
|
|
}
|
|
|
|
func (m *mockSession) ResetIdleTimer() {}
|
2024-11-06 20:06:07 +00:00
|
|
|
|
2024-11-04 19:20:35 +00:00
|
|
|
func (m *mockSession) Serve(ctx context.Context) error {
|
|
|
|
close(m.served)
|
|
|
|
return v3.SessionCloseErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockSession) Write(payload []byte) (n int, err error) {
|
|
|
|
b := make([]byte, len(payload))
|
|
|
|
copy(b, payload)
|
|
|
|
m.recv <- b
|
|
|
|
return len(b), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockSession) Close() error {
|
|
|
|
return nil
|
|
|
|
}
|