package quic import ( "fmt" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" ) type datagramQueue struct { sendQueue chan *wire.DatagramFrame rcvQueue chan []byte closeErr error closed chan struct{} hasData func() dequeued chan error logger utils.Logger } func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { return &datagramQueue{ hasData: hasData, sendQueue: make(chan *wire.DatagramFrame, 1), rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), dequeued: make(chan error), closed: make(chan struct{}), logger: logger, } } // AddAndWait queues a new DATAGRAM frame for sending. // It blocks until the frame has been dequeued. func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { select { case h.sendQueue <- f: h.hasData() case <-h.closed: return h.closeErr } select { case err := <-h.dequeued: return err case <-h.closed: return h.closeErr } } // Get dequeues a DATAGRAM frame for sending. func (h *datagramQueue) Get(maxDatagramSize protocol.ByteCount, version protocol.VersionNumber) *wire.DatagramFrame { select { case f := <-h.sendQueue: datagramSize := f.Length(version) if datagramSize > maxDatagramSize { h.dequeued <- fmt.Errorf("datagram size %d exceed current limit of %d", datagramSize, maxDatagramSize) return nil } h.dequeued <- nil return f default: return nil } } // HandleDatagramFrame handles a received DATAGRAM frame. func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) { data := make([]byte, len(f.Data)) copy(data, f.Data) select { case h.rcvQueue <- data: default: h.logger.Debugf("Discarding DATAGRAM frame (%d bytes payload)", len(f.Data)) } } // Receive gets a received DATAGRAM frame. func (h *datagramQueue) Receive() ([]byte, error) { select { case data := <-h.rcvQueue: return data, nil case <-h.closed: return nil, h.closeErr } } func (h *datagramQueue) CloseWithError(e error) { h.closeErr = e close(h.closed) }