diff --git a/datagramsession/manager.go b/datagramsession/manager.go index aa1b6644..f934c15f 100644 --- a/datagramsession/manager.go +++ b/datagramsession/manager.go @@ -95,7 +95,7 @@ func (m *manager) RegisterSession(ctx context.Context, sessionID uuid.UUID, orig } func (m *manager) registerSession(ctx context.Context, registration *registerSessionEvent) { - session := newSession(registration.sessionID, m.transport, registration.originProxy) + session := newSession(registration.sessionID, m.transport, registration.originProxy, m.log) m.sessions[registration.sessionID] = session registration.resultChan <- session } diff --git a/datagramsession/session.go b/datagramsession/session.go index 59f6c3e2..7e3e0711 100644 --- a/datagramsession/session.go +++ b/datagramsession/session.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "github.com/rs/zerolog" ) const ( @@ -17,7 +18,7 @@ func SessionIdleErr(timeout time.Duration) error { return fmt.Errorf("session idle for %v", timeout) } -// Each Session is a bidirectional pipe of datagrams between transport and dstConn +// Session is a bidirectional pipe of datagrams between transport and dstConn // Currently the only implementation of transport is quic DatagramMuxer // Destination can be a connection with origin or with eyeball // When the destination is origin: @@ -35,9 +36,10 @@ type Session struct { // activeAtChan is used to communicate the last read/write time activeAtChan chan time.Time closeChan chan error + log *zerolog.Logger } -func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser) *Session { +func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser, log *zerolog.Logger) *Session { return &Session{ ID: id, transport: transport, @@ -47,6 +49,7 @@ func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser) * activeAtChan: make(chan time.Time, 2), // capacity is 2 because close() and dstToTransport routine in Serve() can write to this channel closeChan: make(chan error, 2), + log: log, } } @@ -54,7 +57,8 @@ func (s *Session) Serve(ctx context.Context, closeAfterIdle time.Duration) (clos go func() { // QUIC implementation copies data to another buffer before returning https://github.com/lucas-clemente/quic-go/blob/v0.24.0/session.go#L1967-L1975 // This makes it safe to share readBuffer between iterations - readBuffer := make([]byte, s.transport.ReceiveMTU()) + const maxPacketSize = 1500 + readBuffer := make([]byte, maxPacketSize) for { if err := s.dstToTransport(readBuffer); err != nil { s.closeChan <- err @@ -103,8 +107,15 @@ func (s *Session) dstToTransport(buffer []byte) error { n, err := s.dstConn.Read(buffer) s.markActive() if n > 0 { - if err := s.transport.SendTo(s.ID, buffer[:n]); err != nil { - return err + if n <= int(s.transport.MTU()) { + err = s.transport.SendTo(s.ID, buffer[:n]) + } else { + // drop packet for now, eventually reply with ICMP for PMTUD + s.log.Debug(). + Str("session", s.ID.String()). + Int("len", n). + Uint("mtu", s.transport.MTU()). + Msg("dropped packet exceeding MTU") } } return err diff --git a/datagramsession/session_test.go b/datagramsession/session_test.go index 48686136..591fa3c6 100644 --- a/datagramsession/session_test.go +++ b/datagramsession/session_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/uuid" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -44,7 +45,8 @@ func testSessionReturns(t *testing.T, closeBy closeMethod, closeAfterIdle time.D reqChan: newDatagramChannel(1), respChan: newDatagramChannel(1), } - session := newSession(sessionID, transport, cfdConn) + log := zerolog.Nop() + session := newSession(sessionID, transport, cfdConn, &log) ctx, cancel := context.WithCancel(context.Background()) sessionDone := make(chan struct{}) @@ -119,7 +121,8 @@ func testActiveSessionNotClosed(t *testing.T, readFromDst bool, writeToDst bool) reqChan: newDatagramChannel(100), respChan: newDatagramChannel(100), } - session := newSession(sessionID, transport, cfdConn) + log := zerolog.Nop() + session := newSession(sessionID, transport, cfdConn, &log) startTime := time.Now() activeUntil := startTime.Add(activeTime) @@ -181,7 +184,7 @@ func testActiveSessionNotClosed(t *testing.T, readFromDst bool, writeToDst bool) func TestMarkActiveNotBlocking(t *testing.T) { const concurrentCalls = 50 - session := newSession(uuid.New(), nil, nil) + session := newSession(uuid.New(), nil, nil, nil) var wg sync.WaitGroup wg.Add(concurrentCalls) for i := 0; i < concurrentCalls; i++ { diff --git a/datagramsession/transport.go b/datagramsession/transport.go index aad1b475..262f2347 100644 --- a/datagramsession/transport.go +++ b/datagramsession/transport.go @@ -9,5 +9,5 @@ type transport interface { // ReceiveFrom reads the next datagram from the transport ReceiveFrom() (uuid.UUID, []byte, error) // Max transmission unit to receive from the transport - ReceiveMTU() uint + MTU() uint } diff --git a/datagramsession/transport_test.go b/datagramsession/transport_test.go index 2b18197d..453d29f5 100644 --- a/datagramsession/transport_test.go +++ b/datagramsession/transport_test.go @@ -22,7 +22,7 @@ func (mt *mockQUICTransport) ReceiveFrom() (uuid.UUID, []byte, error) { return mt.reqChan.Receive(context.Background()) } -func (mt *mockQUICTransport) ReceiveMTU() uint { +func (mt *mockQUICTransport) MTU() uint { return 1217 } diff --git a/quic/datagram.go b/quic/datagram.go index ad273913..e1b0f425 100644 --- a/quic/datagram.go +++ b/quic/datagram.go @@ -36,7 +36,7 @@ func NewDatagramMuxer(quicSession quic.Session) (*DatagramMuxer, error) { func (dm *DatagramMuxer) SendTo(sessionID uuid.UUID, payload []byte) error { if len(payload) > MaxDatagramFrameSize-sessionIDLen { // TODO: TUN-5302 return ICMP packet too big message - return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(payload), dm.SendMTU()) + return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(payload), dm.MTU()) } msgWithID, err := SuffixSessionID(sessionID, payload) if err != nil { @@ -59,16 +59,11 @@ func (dm *DatagramMuxer) ReceiveFrom() (uuid.UUID, []byte, error) { return ExtractSessionID(msg) } -// Maximum application payload to send through QUIC datagram frame -func (dm *DatagramMuxer) SendMTU() uint { +// Maximum application payload to send to / receive from QUIC datagram frame +func (dm *DatagramMuxer) MTU() uint { return uint(MaxDatagramFrameSize - sessionIDLen) } -// Maximum expected bytes to read from QUIC datagram frame -func (dm *DatagramMuxer) ReceiveMTU() uint { - return MaxDatagramFrameSize -} - // Each QUIC datagram should be suffixed with session ID. // ExtractSessionID extracts the session ID and a slice with only the payload func ExtractSessionID(b []byte) (uuid.UUID, []byte, error) {