358 lines
9.9 KiB
Go
358 lines
9.9 KiB
Go
package h2mux
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
type ReadWriteLengther interface {
|
|
io.ReadWriter
|
|
Reset()
|
|
Len() int
|
|
}
|
|
|
|
type ReadWriteClosedCloser interface {
|
|
io.ReadWriteCloser
|
|
Closed() bool
|
|
}
|
|
|
|
// MuxedStream is logically an HTTP/2 stream, with an additional buffer for outgoing data.
|
|
type MuxedStream struct {
|
|
streamID uint32
|
|
|
|
// The "Receive" end of the stream
|
|
readBufferLock sync.RWMutex
|
|
readBuffer ReadWriteClosedCloser
|
|
// This is the amount of bytes that are in our receive window
|
|
// (how much data we can receive into this stream).
|
|
receiveWindow uint32
|
|
// current receive window size limit. Exponentially increase it when it's exhausted
|
|
receiveWindowCurrentMax uint32
|
|
// hard limit set in http2 spec. 2^31-1
|
|
receiveWindowMax uint32
|
|
// The desired size increment for receiveWindow.
|
|
// If this is nonzero, a WINDOW_UPDATE frame needs to be sent.
|
|
windowUpdate uint32
|
|
// The headers that were most recently received.
|
|
// Particularly:
|
|
// * for an eyeball-initiated stream (as passed to TunnelHandler::ServeStream),
|
|
// these are the request headers
|
|
// * for a cloudflared-initiated stream (as created by Register/UnregisterTunnel),
|
|
// these are the response headers.
|
|
// They are useful in both of these contexts; hence `Headers` is public.
|
|
Headers []Header
|
|
// For use in the context of a cloudflared-initiated stream.
|
|
responseHeadersReceived chan struct{}
|
|
|
|
// The "Send" end of the stream
|
|
writeLock sync.Mutex
|
|
writeBuffer ReadWriteLengther
|
|
// The maximum capacity that the send buffer should grow to.
|
|
writeBufferMaxLen int
|
|
// A channel to be notified when the send buffer is not full.
|
|
writeBufferHasSpace chan struct{}
|
|
// This is the amount of bytes that are in the peer's receive window
|
|
// (how much data we can send from this stream).
|
|
sendWindow uint32
|
|
// Reference to the muxer's readyList; signal this for stream data to be sent.
|
|
readyList *ReadyList
|
|
// The headers that should be sent, and a flag so we only send them once.
|
|
headersSent bool
|
|
writeHeaders []Header
|
|
|
|
// EOF-related fields
|
|
// true if the write end of this stream has been closed
|
|
writeEOF bool
|
|
// true if we have sent EOF to the peer
|
|
sentEOF bool
|
|
// true if the peer sent us an EOF
|
|
receivedEOF bool
|
|
|
|
// Compression-related fields
|
|
receivedUseDict bool
|
|
method string
|
|
contentType string
|
|
path string
|
|
dictionaries h2Dictionaries
|
|
}
|
|
|
|
func (s *MuxedStream) Read(p []byte) (n int, err error) {
|
|
var readBuffer ReadWriteClosedCloser
|
|
if s.dictionaries.read != nil {
|
|
s.readBufferLock.RLock()
|
|
readBuffer = s.readBuffer
|
|
s.readBufferLock.RUnlock()
|
|
} else {
|
|
readBuffer = s.readBuffer
|
|
}
|
|
n, err = readBuffer.Read(p)
|
|
s.replenishReceiveWindow(uint32(n))
|
|
return
|
|
}
|
|
|
|
// Blocks until len(p) bytes have been written to the buffer
|
|
func (s *MuxedStream) Write(p []byte) (int, error) {
|
|
// If assignDictToStream returns success, then it will have acquired the
|
|
// writeLock. Otherwise we must acquire it ourselves.
|
|
ok := assignDictToStream(s, p)
|
|
if !ok {
|
|
s.writeLock.Lock()
|
|
}
|
|
defer s.writeLock.Unlock()
|
|
|
|
if s.writeEOF {
|
|
return 0, io.EOF
|
|
}
|
|
totalWritten := 0
|
|
for totalWritten < len(p) {
|
|
// If the buffer is full, block till there is more room.
|
|
// Use a loop to recheck the buffer size after the lock is reacquired.
|
|
for s.writeBufferMaxLen <= s.writeBuffer.Len() {
|
|
s.writeLock.Unlock()
|
|
<-s.writeBufferHasSpace
|
|
s.writeLock.Lock()
|
|
}
|
|
amountToWrite := len(p) - totalWritten
|
|
spaceAvailable := s.writeBufferMaxLen - s.writeBuffer.Len()
|
|
if spaceAvailable < amountToWrite {
|
|
amountToWrite = spaceAvailable
|
|
}
|
|
amountWritten, err := s.writeBuffer.Write(p[totalWritten : totalWritten+amountToWrite])
|
|
totalWritten += amountWritten
|
|
if err != nil {
|
|
return totalWritten, err
|
|
}
|
|
s.writeNotify()
|
|
}
|
|
return totalWritten, nil
|
|
}
|
|
|
|
func (s *MuxedStream) Close() error {
|
|
// TUN-115: Close the write buffer before the read buffer.
|
|
// In the case of shutdown, read will not get new data, but the write buffer can still receive
|
|
// new data. Closing read before write allows application to race between a failed read and a
|
|
// successful write, even though this close should appear to be atomic.
|
|
// This can't happen the other way because reads may succeed after a failed write; if we read
|
|
// past EOF the application will block until we close the buffer.
|
|
err := s.CloseWrite()
|
|
if err != nil {
|
|
if s.CloseRead() == nil {
|
|
// don't bother the caller with errors if at least one close succeeded
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return s.CloseRead()
|
|
}
|
|
|
|
func (s *MuxedStream) CloseRead() error {
|
|
return s.readBuffer.Close()
|
|
}
|
|
|
|
func (s *MuxedStream) CloseWrite() error {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
if s.writeEOF {
|
|
return io.EOF
|
|
}
|
|
s.writeEOF = true
|
|
if c, ok := s.writeBuffer.(io.Closer); ok {
|
|
c.Close()
|
|
}
|
|
s.writeNotify()
|
|
return nil
|
|
}
|
|
|
|
func (s *MuxedStream) WriteHeaders(headers []Header) error {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
if s.writeHeaders != nil {
|
|
return ErrStreamHeadersSent
|
|
}
|
|
|
|
if s.dictionaries.write != nil {
|
|
dictWriter := s.dictionaries.write.getDictWriter(s, headers)
|
|
if dictWriter != nil {
|
|
s.writeBuffer = dictWriter
|
|
}
|
|
|
|
}
|
|
|
|
s.writeHeaders = headers
|
|
s.headersSent = false
|
|
s.writeNotify()
|
|
return nil
|
|
}
|
|
|
|
func (s *MuxedStream) getReceiveWindow() uint32 {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
return s.receiveWindow
|
|
}
|
|
|
|
func (s *MuxedStream) getSendWindow() uint32 {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
return s.sendWindow
|
|
}
|
|
|
|
// writeNotify must happen while holding writeLock.
|
|
func (s *MuxedStream) writeNotify() {
|
|
s.readyList.Signal(s.streamID)
|
|
}
|
|
|
|
// Call by muxreader when it gets a WindowUpdateFrame. This is an update of the peer's
|
|
// receive window (how much data we can send).
|
|
func (s *MuxedStream) replenishSendWindow(bytes uint32) {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
s.sendWindow += bytes
|
|
s.writeNotify()
|
|
}
|
|
|
|
// Call by muxreader when it receives a data frame
|
|
func (s *MuxedStream) consumeReceiveWindow(bytes uint32) bool {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
// received data size is greater than receive window/buffer
|
|
if s.receiveWindow < bytes {
|
|
return false
|
|
}
|
|
s.receiveWindow -= bytes
|
|
if s.receiveWindow < s.receiveWindowCurrentMax/2 && s.receiveWindowCurrentMax < s.receiveWindowMax {
|
|
// exhausting client send window (how much data client can send)
|
|
// and there is room to grow the receive window
|
|
newMax := s.receiveWindowCurrentMax << 1
|
|
if newMax > s.receiveWindowMax {
|
|
newMax = s.receiveWindowMax
|
|
}
|
|
s.windowUpdate += newMax - s.receiveWindowCurrentMax
|
|
s.receiveWindowCurrentMax = newMax
|
|
// notify MuxWriter to write WINDOW_UPDATE frame
|
|
s.writeNotify()
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Arranges for the MuxWriter to send a WINDOW_UPDATE
|
|
// Called by MuxedStream::Read when data has left the read buffer.
|
|
func (s *MuxedStream) replenishReceiveWindow(bytes uint32) {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
s.windowUpdate += bytes
|
|
s.writeNotify()
|
|
}
|
|
|
|
// receiveEOF should be called when the peer indicates no more data will be sent.
|
|
// Returns true if the socket is now closed (i.e. the write side is already closed).
|
|
func (s *MuxedStream) receiveEOF() (closed bool) {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
s.receivedEOF = true
|
|
s.CloseRead()
|
|
return s.writeEOF && s.writeBuffer.Len() == 0
|
|
}
|
|
|
|
func (s *MuxedStream) gotReceiveEOF() bool {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
return s.receivedEOF
|
|
}
|
|
|
|
// MuxedStreamReader implements io.ReadCloser for the read end of the stream.
|
|
// This is useful for passing to functions that close the object after it is done reading,
|
|
// but you still want to be able to write data afterwards (e.g. http.Client).
|
|
type MuxedStreamReader struct {
|
|
*MuxedStream
|
|
}
|
|
|
|
func (s MuxedStreamReader) Read(p []byte) (n int, err error) {
|
|
return s.MuxedStream.Read(p)
|
|
}
|
|
|
|
func (s MuxedStreamReader) Close() error {
|
|
return s.MuxedStream.CloseRead()
|
|
}
|
|
|
|
// streamChunk represents a chunk of data to be written.
|
|
type streamChunk struct {
|
|
streamID uint32
|
|
// true if a HEADERS frame should be sent
|
|
sendHeaders bool
|
|
headers []Header
|
|
// nonzero if a WINDOW_UPDATE frame should be sent;
|
|
// in that case, it is the increment value to use
|
|
windowUpdate uint32
|
|
// true if data frames should be sent
|
|
sendData bool
|
|
eof bool
|
|
buffer bytes.Buffer
|
|
}
|
|
|
|
// getChunk atomically extracts a chunk of data to be written by MuxWriter.
|
|
// The data returned will not exceed the send window for this stream.
|
|
func (s *MuxedStream) getChunk() *streamChunk {
|
|
s.writeLock.Lock()
|
|
defer s.writeLock.Unlock()
|
|
|
|
chunk := &streamChunk{
|
|
streamID: s.streamID,
|
|
sendHeaders: !s.headersSent,
|
|
headers: s.writeHeaders,
|
|
windowUpdate: s.windowUpdate,
|
|
sendData: !s.sentEOF,
|
|
eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow,
|
|
}
|
|
|
|
// Copy at most s.sendWindow bytes, adjust the sendWindow accordingly
|
|
writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow))
|
|
s.sendWindow -= uint32(writeLen)
|
|
|
|
// Non-blocking channel send. This will allow MuxedStream::Write() to continue, if needed
|
|
if s.writeBuffer.Len() < s.writeBufferMaxLen {
|
|
select {
|
|
case s.writeBufferHasSpace <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// When we write the chunk, we'll write the WINDOW_UPDATE frame if needed
|
|
s.receiveWindow += s.windowUpdate
|
|
s.windowUpdate = 0
|
|
|
|
// When we write the chunk, we'll write the headers if needed
|
|
s.headersSent = true
|
|
|
|
// if this chunk contains the end of the stream, close the stream now
|
|
if chunk.sendData && chunk.eof {
|
|
s.sentEOF = true
|
|
}
|
|
|
|
return chunk
|
|
}
|
|
|
|
func (c *streamChunk) sendHeadersFrame() bool {
|
|
return c.sendHeaders
|
|
}
|
|
|
|
func (c *streamChunk) sendWindowUpdateFrame() bool {
|
|
return c.windowUpdate > 0
|
|
}
|
|
|
|
func (c *streamChunk) sendDataFrame() bool {
|
|
return c.sendData
|
|
}
|
|
|
|
func (c *streamChunk) nextDataFrame(frameSize int) (payload []byte, endStream bool) {
|
|
payload = c.buffer.Next(frameSize)
|
|
if c.buffer.Len() == 0 {
|
|
// this is the last data frame in this chunk
|
|
c.sendData = false
|
|
if c.eof {
|
|
endStream = true
|
|
}
|
|
}
|
|
return
|
|
}
|