TUN-968: Flow control for large requests/responses
This commit is contained in:
parent
9fe21fa906
commit
f6014cb2b4
4
Makefile
4
Makefile
|
@ -23,6 +23,10 @@ endif
|
||||||
.PHONY: all
|
.PHONY: all
|
||||||
all: cloudflared test
|
all: cloudflared test
|
||||||
|
|
||||||
|
.PHONY: clean
|
||||||
|
clean:
|
||||||
|
go clean
|
||||||
|
|
||||||
.PHONY: cloudflared
|
.PHONY: cloudflared
|
||||||
cloudflared:
|
cloudflared:
|
||||||
go build -v $(VERSION_FLAGS) $(IMPORT_PATH)/cmd/cloudflared
|
go build -v $(VERSION_FLAGS) $(IMPORT_PATH)/cmd/cloudflared
|
||||||
|
|
|
@ -15,11 +15,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultFrameSize uint32 = 1 << 14 // Minimum frame size in http2 spec
|
defaultFrameSize uint32 = 1 << 14 // Minimum frame size in http2 spec
|
||||||
defaultWindowSize uint32 = 65535
|
defaultWindowSize uint32 = (1 << 16) - 1 // Minimum window size in http2 spec
|
||||||
maxWindowSize uint32 = (1 << 31) - 1 // 2^31-1 = 2147483647, max window size specified in http2 spec
|
maxWindowSize uint32 = (1 << 31) - 1 // 2^31-1 = 2147483647, max window size in http2 spec
|
||||||
defaultTimeout time.Duration = 5 * time.Second
|
defaultTimeout time.Duration = 5 * time.Second
|
||||||
defaultRetries uint64 = 5
|
defaultRetries uint64 = 5
|
||||||
|
defaultWriteBufferMaxLen int = 1024 * 1024 * 512 // 500mb
|
||||||
|
|
||||||
SettingMuxerMagic http2.SettingID = 0x42db
|
SettingMuxerMagic http2.SettingID = 0x42db
|
||||||
MuxerMagicOrigin uint32 = 0xa2e43c8b
|
MuxerMagicOrigin uint32 = 0xa2e43c8b
|
||||||
|
@ -49,6 +50,12 @@ type MuxerConfig struct {
|
||||||
// Logger to use
|
// Logger to use
|
||||||
Logger *log.Entry
|
Logger *log.Entry
|
||||||
CompressionQuality CompressionSetting
|
CompressionQuality CompressionSetting
|
||||||
|
// Initial size for HTTP2 flow control windows
|
||||||
|
DefaultWindowSize uint32
|
||||||
|
// Largest allowable size for HTTP2 flow control windows
|
||||||
|
MaxWindowSize uint32
|
||||||
|
// Largest allowable capacity for the buffer of data to be sent
|
||||||
|
StreamWriteBufferMaxLen int
|
||||||
}
|
}
|
||||||
|
|
||||||
type Muxer struct {
|
type Muxer struct {
|
||||||
|
@ -98,6 +105,15 @@ func Handshake(
|
||||||
if config.Timeout == 0 {
|
if config.Timeout == 0 {
|
||||||
config.Timeout = defaultTimeout
|
config.Timeout = defaultTimeout
|
||||||
}
|
}
|
||||||
|
if config.DefaultWindowSize == 0 {
|
||||||
|
config.DefaultWindowSize = defaultWindowSize
|
||||||
|
}
|
||||||
|
if config.MaxWindowSize == 0 {
|
||||||
|
config.MaxWindowSize = maxWindowSize
|
||||||
|
}
|
||||||
|
if config.StreamWriteBufferMaxLen == 0 {
|
||||||
|
config.StreamWriteBufferMaxLen = defaultWriteBufferMaxLen
|
||||||
|
}
|
||||||
// Initialise connection state fields
|
// Initialise connection state fields
|
||||||
m := &Muxer{
|
m := &Muxer{
|
||||||
f: http2.NewFramer(w, r), // A framer that writes to w and reads from r
|
f: http2.NewFramer(w, r), // A framer that writes to w and reads from r
|
||||||
|
@ -179,8 +195,9 @@ func Handshake(
|
||||||
abortChan: m.abortChan,
|
abortChan: m.abortChan,
|
||||||
pingTimestamp: pingTimestamp,
|
pingTimestamp: pingTimestamp,
|
||||||
connActive: connActive,
|
connActive: connActive,
|
||||||
initialStreamWindow: defaultWindowSize,
|
initialStreamWindow: m.config.DefaultWindowSize,
|
||||||
streamWindowMax: maxWindowSize,
|
streamWindowMax: m.config.MaxWindowSize,
|
||||||
|
streamWriteBufferMaxLen: m.config.StreamWriteBufferMaxLen,
|
||||||
r: m.r,
|
r: m.r,
|
||||||
updateRTTChan: updateRTTChan,
|
updateRTTChan: updateRTTChan,
|
||||||
updateReceiveWindowChan: updateReceiveWindowChan,
|
updateReceiveWindowChan: updateReceiveWindowChan,
|
||||||
|
@ -375,10 +392,12 @@ func (m *Muxer) OpenStream(headers []Header, body io.Reader) (*MuxedStream, erro
|
||||||
responseHeadersReceived: make(chan struct{}),
|
responseHeadersReceived: make(chan struct{}),
|
||||||
readBuffer: NewSharedBuffer(),
|
readBuffer: NewSharedBuffer(),
|
||||||
writeBuffer: &bytes.Buffer{},
|
writeBuffer: &bytes.Buffer{},
|
||||||
receiveWindow: defaultWindowSize,
|
writeBufferMaxLen: m.config.StreamWriteBufferMaxLen,
|
||||||
receiveWindowCurrentMax: defaultWindowSize, // Initial window size limit. exponentially increase it when receiveWindow is exhausted
|
writeBufferHasSpace: make(chan struct{}, 1),
|
||||||
receiveWindowMax: maxWindowSize,
|
receiveWindow: m.config.DefaultWindowSize,
|
||||||
sendWindow: defaultWindowSize,
|
receiveWindowCurrentMax: m.config.DefaultWindowSize,
|
||||||
|
receiveWindowMax: m.config.MaxWindowSize,
|
||||||
|
sendWindow: m.config.DefaultWindowSize,
|
||||||
readyList: m.readyList,
|
readyList: m.readyList,
|
||||||
writeHeaders: headers,
|
writeHeaders: headers,
|
||||||
dictionaries: m.muxReader.dictionaries,
|
dictionaries: m.muxReader.dictionaries,
|
||||||
|
|
|
@ -39,20 +39,26 @@ func NewDefaultMuxerPair() *DefaultMuxerPair {
|
||||||
origin, edge := net.Pipe()
|
origin, edge := net.Pipe()
|
||||||
return &DefaultMuxerPair{
|
return &DefaultMuxerPair{
|
||||||
OriginMuxConfig: MuxerConfig{
|
OriginMuxConfig: MuxerConfig{
|
||||||
Timeout: time.Second,
|
Timeout: time.Second,
|
||||||
IsClient: true,
|
IsClient: true,
|
||||||
Name: "origin",
|
Name: "origin",
|
||||||
Logger: log.NewEntry(log.New()),
|
Logger: log.NewEntry(log.New()),
|
||||||
|
DefaultWindowSize: (1 << 8) - 1,
|
||||||
|
MaxWindowSize: (1 << 15) - 1,
|
||||||
|
StreamWriteBufferMaxLen: 1024,
|
||||||
},
|
},
|
||||||
OriginConn: origin,
|
OriginConn: origin,
|
||||||
EdgeMuxConfig: MuxerConfig{
|
EdgeMuxConfig: MuxerConfig{
|
||||||
Timeout: time.Second,
|
Timeout: time.Second,
|
||||||
IsClient: false,
|
IsClient: false,
|
||||||
Name: "edge",
|
Name: "edge",
|
||||||
Logger: log.NewEntry(log.New()),
|
Logger: log.NewEntry(log.New()),
|
||||||
|
DefaultWindowSize: (1 << 8) - 1,
|
||||||
|
MaxWindowSize: (1 << 15) - 1,
|
||||||
|
StreamWriteBufferMaxLen: 1024,
|
||||||
},
|
},
|
||||||
EdgeConn: edge,
|
EdgeConn: edge,
|
||||||
doneC: make(chan struct{}),
|
doneC: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,22 +66,22 @@ func NewCompressedMuxerPair(quality CompressionSetting) *DefaultMuxerPair {
|
||||||
origin, edge := net.Pipe()
|
origin, edge := net.Pipe()
|
||||||
return &DefaultMuxerPair{
|
return &DefaultMuxerPair{
|
||||||
OriginMuxConfig: MuxerConfig{
|
OriginMuxConfig: MuxerConfig{
|
||||||
Timeout: time.Second,
|
Timeout: time.Second,
|
||||||
IsClient: true,
|
IsClient: true,
|
||||||
Name: "origin",
|
Name: "origin",
|
||||||
CompressionQuality: quality,
|
CompressionQuality: quality,
|
||||||
Logger: log.NewEntry(log.New()),
|
Logger: log.NewEntry(log.New()),
|
||||||
},
|
},
|
||||||
OriginConn: origin,
|
OriginConn: origin,
|
||||||
EdgeMuxConfig: MuxerConfig{
|
EdgeMuxConfig: MuxerConfig{
|
||||||
Timeout: time.Second,
|
Timeout: time.Second,
|
||||||
IsClient: false,
|
IsClient: false,
|
||||||
Name: "edge",
|
Name: "edge",
|
||||||
CompressionQuality: quality,
|
CompressionQuality: quality,
|
||||||
Logger: log.NewEntry(log.New()),
|
Logger: log.NewEntry(log.New()),
|
||||||
},
|
},
|
||||||
EdgeConn: edge,
|
EdgeConn: edge,
|
||||||
doneC: make(chan struct{}),
|
doneC: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +236,6 @@ func TestSingleStream(t *testing.T) {
|
||||||
func TestSingleStreamLargeResponseBody(t *testing.T) {
|
func TestSingleStreamLargeResponseBody(t *testing.T) {
|
||||||
muxPair := NewDefaultMuxerPair()
|
muxPair := NewDefaultMuxerPair()
|
||||||
bodySize := 1 << 24
|
bodySize := 1 << 24
|
||||||
streamReady := make(chan struct{})
|
|
||||||
muxPair.OriginMuxConfig.Handler = MuxedStreamFunc(func(stream *MuxedStream) error {
|
muxPair.OriginMuxConfig.Handler = MuxedStreamFunc(func(stream *MuxedStream) error {
|
||||||
if len(stream.Headers) != 1 {
|
if len(stream.Headers) != 1 {
|
||||||
t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
|
t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
|
||||||
|
@ -257,8 +262,6 @@ func TestSingleStreamLargeResponseBody(t *testing.T) {
|
||||||
if n != len(payload) {
|
if n != len(payload) {
|
||||||
t.Fatalf("origin short write: %d/%d bytes", n, len(payload))
|
t.Fatalf("origin short write: %d/%d bytes", n, len(payload))
|
||||||
}
|
}
|
||||||
t.Log("Payload written; signaling that the stream is ready")
|
|
||||||
streamReady <- struct{}{}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -282,9 +285,6 @@ func TestSingleStreamLargeResponseBody(t *testing.T) {
|
||||||
}
|
}
|
||||||
responseBody := make([]byte, bodySize)
|
responseBody := make([]byte, bodySize)
|
||||||
|
|
||||||
<-streamReady
|
|
||||||
t.Log("Received stream ready signal; resuming the test")
|
|
||||||
|
|
||||||
n, err := io.ReadFull(stream, responseBody)
|
n, err := io.ReadFull(stream, responseBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error from (*MuxedStream).Read: %s", err)
|
t.Fatalf("error from (*MuxedStream).Read: %s", err)
|
||||||
|
@ -367,14 +367,13 @@ func TestMultipleStreams(t *testing.T) {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
if testFail {
|
if testFail {
|
||||||
t.Fatalf("TestMultipleStreamsFlowControl failed")
|
t.Fatalf("TestMultipleStreams failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMultipleStreamsFlowControl(t *testing.T) {
|
func TestMultipleStreamsFlowControl(t *testing.T) {
|
||||||
maxStreams := 32
|
maxStreams := 32
|
||||||
errorsC := make(chan error, maxStreams)
|
errorsC := make(chan error, maxStreams)
|
||||||
streamReady := make(chan struct{})
|
|
||||||
responseSizes := make([]int32, maxStreams)
|
responseSizes := make([]int32, maxStreams)
|
||||||
for i := 0; i < maxStreams; i++ {
|
for i := 0; i < maxStreams; i++ {
|
||||||
responseSizes[i] = rand.Int31n(int32(defaultWindowSize << 4))
|
responseSizes[i] = rand.Int31n(int32(defaultWindowSize << 4))
|
||||||
|
@ -398,7 +397,6 @@ func TestMultipleStreamsFlowControl(t *testing.T) {
|
||||||
payload[i] = byte(i % 256)
|
payload[i] = byte(i % 256)
|
||||||
}
|
}
|
||||||
n, err := stream.Write(payload)
|
n, err := stream.Write(payload)
|
||||||
streamReady <- struct{}{}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("origin write error: %s", err)
|
t.Fatalf("origin write error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -435,7 +433,6 @@ func TestMultipleStreamsFlowControl(t *testing.T) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
<-streamReady
|
|
||||||
responseBody := make([]byte, responseSizes[(stream.streamID-2)/2])
|
responseBody := make([]byte, responseSizes[(stream.streamID-2)/2])
|
||||||
n, err := io.ReadFull(stream, responseBody)
|
n, err := io.ReadFull(stream, responseBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -782,9 +779,11 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(len(paths))
|
wg.Add(len(paths))
|
||||||
|
errorsC := make(chan error, len(paths))
|
||||||
|
|
||||||
for i, s := range paths {
|
for i, s := range paths {
|
||||||
go func(i int, path string) {
|
go func(i int, path string) {
|
||||||
|
defer wg.Done()
|
||||||
stream, err := muxPair.EdgeMux.OpenStream(
|
stream, err := muxPair.EdgeMux.OpenStream(
|
||||||
[]Header{
|
[]Header{
|
||||||
{Name: ":method", Value: "GET"},
|
{Name: ":method", Value: "GET"},
|
||||||
|
@ -805,22 +804,30 @@ func TestMultipleStreamsWithDictionaries(t *testing.T) {
|
||||||
responseBody := make([]byte, len(expectBody)*2)
|
responseBody := make([]byte, len(expectBody)*2)
|
||||||
n, err := stream.Read(responseBody)
|
n, err := stream.Read(responseBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error from (*MuxedStream).Read: %s", err)
|
errorsC <- fmt.Errorf("stream %d error from (*MuxedStream).Read: %s", stream.streamID, err)
|
||||||
t.Fatalf("error from (*MuxedStream).Read: %s", err)
|
return
|
||||||
}
|
}
|
||||||
if n != len(expectBody) {
|
if n != len(expectBody) {
|
||||||
log.Printf("expected response body to have %d bytes, got %d", len(expectBody), n)
|
errorsC <- fmt.Errorf("stream %d expected response body to have %d bytes, got %d", stream.streamID, len(expectBody), n)
|
||||||
t.Fatalf("expected response body to have %d bytes, got %d", len(expectBody), n)
|
return
|
||||||
}
|
}
|
||||||
if string(responseBody[:n]) != expectBody {
|
if string(responseBody[:n]) != expectBody {
|
||||||
log.Printf("expected response body %s, got %s", expectBody, responseBody[:n])
|
errorsC <- fmt.Errorf("stream %d expected response body %s, got %s", stream.streamID, expectBody, responseBody[:n])
|
||||||
t.Fatalf("expected response body %s, got %s", expectBody, responseBody[:n])
|
return
|
||||||
}
|
}
|
||||||
wg.Done()
|
|
||||||
}(i, s)
|
}(i, s)
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(errorsC)
|
||||||
|
testFail := false
|
||||||
|
for err := range errorsC {
|
||||||
|
testFail = true
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
if testFail {
|
||||||
|
t.Fatalf("TestMultipleStreams failed")
|
||||||
|
}
|
||||||
|
|
||||||
if q > CompressionNone && muxPair.OriginMux.muxMetricsUpdater.compBytesBefore.Value() <= 10*muxPair.OriginMux.muxMetricsUpdater.compBytesAfter.Value() {
|
if q > CompressionNone && muxPair.OriginMux.muxMetricsUpdater.compBytesBefore.Value() <= 10*muxPair.OriginMux.muxMetricsUpdater.compBytesAfter.Value() {
|
||||||
t.Fatalf("Cross-stream compression is expected to give a better compression ratio")
|
t.Fatalf("Cross-stream compression is expected to give a better compression ratio")
|
||||||
|
|
|
@ -17,32 +17,51 @@ type ReadWriteClosedCloser interface {
|
||||||
Closed() bool
|
Closed() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MuxedStream is logically an HTTP/2 stream, with an additional buffer for outgoing data.
|
||||||
type MuxedStream struct {
|
type MuxedStream struct {
|
||||||
Headers []Header
|
|
||||||
|
|
||||||
streamID uint32
|
streamID uint32
|
||||||
|
|
||||||
|
// The "Receive" end of the stream
|
||||||
|
readBufferLock sync.RWMutex
|
||||||
|
readBuffer ReadWriteClosedCloser
|
||||||
|
// This is the amount of bytes that are in our receive window
|
||||||
|
// (how much data we can receive into this stream).
|
||||||
|
receiveWindow uint32
|
||||||
|
// current receive window size limit. Exponentially increase it when it's exhausted
|
||||||
|
receiveWindowCurrentMax uint32
|
||||||
|
// hard limit set in http2 spec. 2^31-1
|
||||||
|
receiveWindowMax uint32
|
||||||
|
// The desired size increment for receiveWindow.
|
||||||
|
// If this is nonzero, a WINDOW_UPDATE frame needs to be sent.
|
||||||
|
windowUpdate uint32
|
||||||
|
// The headers that were most recently received.
|
||||||
|
// Particularly:
|
||||||
|
// * for an eyeball-initiated stream (as passed to TunnelHandler::ServeStream),
|
||||||
|
// these are the request headers
|
||||||
|
// * for a cloudflared-initiated stream (as created by Register/UnregisterTunnel),
|
||||||
|
// these are the response headers.
|
||||||
|
// They are useful in both of these contexts; hence `Headers` is public.
|
||||||
|
Headers []Header
|
||||||
|
// For use in the context of a cloudflared-initiated stream.
|
||||||
responseHeadersReceived chan struct{}
|
responseHeadersReceived chan struct{}
|
||||||
|
|
||||||
readBuffer ReadWriteClosedCloser
|
// The "Send" end of the stream
|
||||||
receiveWindow uint32
|
writeLock sync.Mutex
|
||||||
// current window size limit. Exponentially increase it when it's exhausted
|
|
||||||
receiveWindowCurrentMax uint32
|
|
||||||
// limit set in http2 spec. 2^31-1
|
|
||||||
receiveWindowMax uint32
|
|
||||||
|
|
||||||
// nonzero if a WINDOW_UPDATE frame for a stream needs to be sent
|
|
||||||
windowUpdate uint32
|
|
||||||
|
|
||||||
writeLock sync.Mutex
|
|
||||||
// The zero value for Buffer is an empty buffer ready to use.
|
|
||||||
writeBuffer ReadWriteLengther
|
writeBuffer ReadWriteLengther
|
||||||
|
// The maximum capacity that the send buffer should grow to.
|
||||||
|
writeBufferMaxLen int
|
||||||
|
// A channel to be notified when the send buffer is not full.
|
||||||
|
writeBufferHasSpace chan struct{}
|
||||||
|
// This is the amount of bytes that are in the peer's receive window
|
||||||
|
// (how much data we can send from this stream).
|
||||||
sendWindow uint32
|
sendWindow uint32
|
||||||
|
// Reference to the muxer's readyList; signal this for stream data to be sent.
|
||||||
readyList *ReadyList
|
readyList *ReadyList
|
||||||
|
// The headers that should be sent, and a flag so we only send them once.
|
||||||
headersSent bool
|
headersSent bool
|
||||||
writeHeaders []Header
|
writeHeaders []Header
|
||||||
|
|
||||||
|
// EOF-related fields
|
||||||
// true if the write end of this stream has been closed
|
// true if the write end of this stream has been closed
|
||||||
writeEOF bool
|
writeEOF bool
|
||||||
// true if we have sent EOF to the peer
|
// true if we have sent EOF to the peer
|
||||||
|
@ -50,40 +69,63 @@ type MuxedStream struct {
|
||||||
// true if the peer sent us an EOF
|
// true if the peer sent us an EOF
|
||||||
receivedEOF bool
|
receivedEOF bool
|
||||||
|
|
||||||
// dictionary that was used to compress the stream
|
// Compression-related fields
|
||||||
receivedUseDict bool
|
receivedUseDict bool
|
||||||
method string
|
method string
|
||||||
contentType string
|
contentType string
|
||||||
path string
|
path string
|
||||||
dictionaries h2Dictionaries
|
dictionaries h2Dictionaries
|
||||||
readBufferLock sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MuxedStream) Read(p []byte) (n int, err error) {
|
func (s *MuxedStream) Read(p []byte) (n int, err error) {
|
||||||
|
var readBuffer ReadWriteClosedCloser
|
||||||
if s.dictionaries.read != nil {
|
if s.dictionaries.read != nil {
|
||||||
s.readBufferLock.RLock()
|
s.readBufferLock.RLock()
|
||||||
b := s.readBuffer
|
readBuffer = s.readBuffer
|
||||||
s.readBufferLock.RUnlock()
|
s.readBufferLock.RUnlock()
|
||||||
return b.Read(p)
|
} else {
|
||||||
|
readBuffer = s.readBuffer
|
||||||
}
|
}
|
||||||
return s.readBuffer.Read(p)
|
n, err = readBuffer.Read(p)
|
||||||
|
s.replenishReceiveWindow(uint32(n))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MuxedStream) Write(p []byte) (n int, err error) {
|
// Blocks until len(p) bytes have been written to the buffer
|
||||||
|
func (s *MuxedStream) Write(p []byte) (int, error) {
|
||||||
|
// If assignDictToStream returns success, then it will have acquired the
|
||||||
|
// writeLock. Otherwise we must acquire it ourselves.
|
||||||
ok := assignDictToStream(s, p)
|
ok := assignDictToStream(s, p)
|
||||||
if !ok {
|
if !ok {
|
||||||
s.writeLock.Lock()
|
s.writeLock.Lock()
|
||||||
}
|
}
|
||||||
defer s.writeLock.Unlock()
|
defer s.writeLock.Unlock()
|
||||||
|
|
||||||
if s.writeEOF {
|
if s.writeEOF {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
n, err = s.writeBuffer.Write(p)
|
totalWritten := 0
|
||||||
if n != len(p) || err != nil {
|
for totalWritten < len(p) {
|
||||||
return n, err
|
// If the buffer is full, block till there is more room.
|
||||||
|
// Use a loop to recheck the buffer size after the lock is reacquired.
|
||||||
|
for s.writeBufferMaxLen <= s.writeBuffer.Len() {
|
||||||
|
s.writeLock.Unlock()
|
||||||
|
<-s.writeBufferHasSpace
|
||||||
|
s.writeLock.Lock()
|
||||||
|
}
|
||||||
|
amountToWrite := len(p) - totalWritten
|
||||||
|
spaceAvailable := s.writeBufferMaxLen - s.writeBuffer.Len()
|
||||||
|
if spaceAvailable < amountToWrite {
|
||||||
|
amountToWrite = spaceAvailable
|
||||||
|
}
|
||||||
|
amountWritten, err := s.writeBuffer.Write(p[totalWritten : totalWritten+amountToWrite])
|
||||||
|
totalWritten += amountWritten
|
||||||
|
if err != nil {
|
||||||
|
return totalWritten, err
|
||||||
|
}
|
||||||
|
s.writeNotify()
|
||||||
}
|
}
|
||||||
s.writeNotify()
|
return totalWritten, nil
|
||||||
return n, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MuxedStream) Close() error {
|
func (s *MuxedStream) Close() error {
|
||||||
|
@ -164,9 +206,9 @@ func (s *MuxedStream) writeNotify() {
|
||||||
// receive window (how much data we can send).
|
// receive window (how much data we can send).
|
||||||
func (s *MuxedStream) replenishSendWindow(bytes uint32) {
|
func (s *MuxedStream) replenishSendWindow(bytes uint32) {
|
||||||
s.writeLock.Lock()
|
s.writeLock.Lock()
|
||||||
|
defer s.writeLock.Unlock()
|
||||||
s.sendWindow += bytes
|
s.sendWindow += bytes
|
||||||
s.writeNotify()
|
s.writeNotify()
|
||||||
s.writeLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call by muxreader when it receives a data frame
|
// Call by muxreader when it receives a data frame
|
||||||
|
@ -178,17 +220,30 @@ func (s *MuxedStream) consumeReceiveWindow(bytes uint32) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
s.receiveWindow -= bytes
|
s.receiveWindow -= bytes
|
||||||
if s.receiveWindow < s.receiveWindowCurrentMax/2 {
|
if s.receiveWindow < s.receiveWindowCurrentMax/2 && s.receiveWindowCurrentMax < s.receiveWindowMax {
|
||||||
// exhausting client send window (how much data client can send)
|
// exhausting client send window (how much data client can send)
|
||||||
if s.receiveWindowCurrentMax < s.receiveWindowMax {
|
// and there is room to grow the receive window
|
||||||
s.receiveWindowCurrentMax <<= 1
|
newMax := s.receiveWindowCurrentMax << 1
|
||||||
|
if newMax > s.receiveWindowMax {
|
||||||
|
newMax = s.receiveWindowMax
|
||||||
}
|
}
|
||||||
s.windowUpdate += s.receiveWindowCurrentMax - s.receiveWindow
|
s.windowUpdate += newMax - s.receiveWindowCurrentMax
|
||||||
|
s.receiveWindowCurrentMax = newMax
|
||||||
|
// notify MuxWriter to write WINDOW_UPDATE frame
|
||||||
s.writeNotify()
|
s.writeNotify()
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Arranges for the MuxWriter to send a WINDOW_UPDATE
|
||||||
|
// Called by MuxedStream::Read when data has left the read buffer.
|
||||||
|
func (s *MuxedStream) replenishReceiveWindow(bytes uint32) {
|
||||||
|
s.writeLock.Lock()
|
||||||
|
defer s.writeLock.Unlock()
|
||||||
|
s.windowUpdate += bytes
|
||||||
|
s.writeNotify()
|
||||||
|
}
|
||||||
|
|
||||||
// receiveEOF should be called when the peer indicates no more data will be sent.
|
// receiveEOF should be called when the peer indicates no more data will be sent.
|
||||||
// Returns true if the socket is now closed (i.e. the write side is already closed).
|
// Returns true if the socket is now closed (i.e. the write side is already closed).
|
||||||
func (s *MuxedStream) receiveEOF() (closed bool) {
|
func (s *MuxedStream) receiveEOF() (closed bool) {
|
||||||
|
@ -226,7 +281,8 @@ type streamChunk struct {
|
||||||
// true if a HEADERS frame should be sent
|
// true if a HEADERS frame should be sent
|
||||||
sendHeaders bool
|
sendHeaders bool
|
||||||
headers []Header
|
headers []Header
|
||||||
// nonzero if a WINDOW_UPDATE frame should be sent
|
// nonzero if a WINDOW_UPDATE frame should be sent;
|
||||||
|
// in that case, it is the increment value to use
|
||||||
windowUpdate uint32
|
windowUpdate uint32
|
||||||
// true if data frames should be sent
|
// true if data frames should be sent
|
||||||
sendData bool
|
sendData bool
|
||||||
|
@ -249,11 +305,23 @@ func (s *MuxedStream) getChunk() *streamChunk {
|
||||||
eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow,
|
eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copies at most s.sendWindow bytes
|
// Copy at most s.sendWindow bytes, adjust the sendWindow accordingly
|
||||||
writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow))
|
writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow))
|
||||||
s.sendWindow -= uint32(writeLen)
|
s.sendWindow -= uint32(writeLen)
|
||||||
|
|
||||||
|
// Non-blocking channel send. This will allow MuxedStream::Write() to continue, if needed
|
||||||
|
if s.writeBuffer.Len() < s.writeBufferMaxLen {
|
||||||
|
select {
|
||||||
|
case s.writeBufferHasSpace <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// When we write the chunk, we'll write the WINDOW_UPDATE frame if needed
|
||||||
s.receiveWindow += s.windowUpdate
|
s.receiveWindow += s.windowUpdate
|
||||||
s.windowUpdate = 0
|
s.windowUpdate = 0
|
||||||
|
|
||||||
|
// When we write the chunk, we'll write the headers if needed
|
||||||
s.headersSent = true
|
s.headersSent = true
|
||||||
|
|
||||||
// if this chunk contains the end of the stream, close the stream now
|
// if this chunk contains the end of the stream, close the stream now
|
||||||
|
|
|
@ -23,47 +23,55 @@ func TestFlowControlSingleStream(t *testing.T) {
|
||||||
sendWindow: testWindowSize,
|
sendWindow: testWindowSize,
|
||||||
readyList: NewReadyList(),
|
readyList: NewReadyList(),
|
||||||
}
|
}
|
||||||
|
var tempWindowUpdate uint32
|
||||||
|
var tempStreamChunk *streamChunk
|
||||||
|
|
||||||
assert.True(t, stream.consumeReceiveWindow(testWindowSize/2))
|
assert.True(t, stream.consumeReceiveWindow(testWindowSize/2))
|
||||||
dataSent := testWindowSize / 2
|
dataSent := testWindowSize / 2
|
||||||
assert.Equal(t, testWindowSize-dataSent, stream.receiveWindow)
|
assert.Equal(t, testWindowSize-dataSent, stream.receiveWindow)
|
||||||
assert.Equal(t, testWindowSize, stream.receiveWindowCurrentMax)
|
assert.Equal(t, testWindowSize, stream.receiveWindowCurrentMax)
|
||||||
assert.Equal(t, uint32(0), stream.windowUpdate)
|
|
||||||
tempWindowUpdate := stream.windowUpdate
|
|
||||||
|
|
||||||
streamChunk := stream.getChunk()
|
|
||||||
assert.Equal(t, tempWindowUpdate, streamChunk.windowUpdate)
|
|
||||||
assert.Equal(t, testWindowSize-dataSent, stream.receiveWindow)
|
|
||||||
assert.Equal(t, uint32(0), stream.windowUpdate)
|
|
||||||
assert.Equal(t, testWindowSize, stream.sendWindow)
|
assert.Equal(t, testWindowSize, stream.sendWindow)
|
||||||
|
assert.Equal(t, uint32(0), stream.windowUpdate)
|
||||||
|
|
||||||
|
tempStreamChunk = stream.getChunk()
|
||||||
|
assert.Equal(t, uint32(0), tempStreamChunk.windowUpdate)
|
||||||
|
assert.Equal(t, testWindowSize-dataSent, stream.receiveWindow)
|
||||||
|
assert.Equal(t, testWindowSize, stream.receiveWindowCurrentMax)
|
||||||
|
assert.Equal(t, testWindowSize, stream.sendWindow)
|
||||||
|
assert.Equal(t, uint32(0), stream.windowUpdate)
|
||||||
|
|
||||||
assert.True(t, stream.consumeReceiveWindow(2))
|
assert.True(t, stream.consumeReceiveWindow(2))
|
||||||
dataSent += 2
|
dataSent += 2
|
||||||
assert.Equal(t, testWindowSize-dataSent, stream.receiveWindow)
|
assert.Equal(t, testWindowSize-dataSent, stream.receiveWindow)
|
||||||
assert.Equal(t, testWindowSize<<1, stream.receiveWindowCurrentMax)
|
assert.Equal(t, testWindowSize<<1, stream.receiveWindowCurrentMax)
|
||||||
assert.Equal(t, (testWindowSize<<1)-stream.receiveWindow, stream.windowUpdate)
|
assert.Equal(t, testWindowSize, stream.sendWindow)
|
||||||
|
assert.Equal(t, testWindowSize, stream.windowUpdate)
|
||||||
tempWindowUpdate = stream.windowUpdate
|
tempWindowUpdate = stream.windowUpdate
|
||||||
|
|
||||||
streamChunk = stream.getChunk()
|
tempStreamChunk = stream.getChunk()
|
||||||
assert.Equal(t, tempWindowUpdate, streamChunk.windowUpdate)
|
assert.Equal(t, tempWindowUpdate, tempStreamChunk.windowUpdate)
|
||||||
assert.Equal(t, testWindowSize<<1, stream.receiveWindow)
|
assert.Equal(t, (testWindowSize<<1)-dataSent, stream.receiveWindow)
|
||||||
assert.Equal(t, uint32(0), stream.windowUpdate)
|
assert.Equal(t, testWindowSize<<1, stream.receiveWindowCurrentMax)
|
||||||
assert.Equal(t, testWindowSize, stream.sendWindow)
|
assert.Equal(t, testWindowSize, stream.sendWindow)
|
||||||
|
assert.Equal(t, uint32(0), stream.windowUpdate)
|
||||||
|
|
||||||
assert.True(t, stream.consumeReceiveWindow(testWindowSize+10))
|
assert.True(t, stream.consumeReceiveWindow(testWindowSize+10))
|
||||||
dataSent = testWindowSize + 10
|
dataSent += testWindowSize + 10
|
||||||
assert.Equal(t, (testWindowSize<<1)-dataSent, stream.receiveWindow)
|
assert.Equal(t, (testWindowSize<<1)-dataSent, stream.receiveWindow)
|
||||||
assert.Equal(t, testWindowSize<<2, stream.receiveWindowCurrentMax)
|
assert.Equal(t, testWindowSize<<2, stream.receiveWindowCurrentMax)
|
||||||
assert.Equal(t, (testWindowSize<<2)-stream.receiveWindow, stream.windowUpdate)
|
assert.Equal(t, testWindowSize, stream.sendWindow)
|
||||||
|
assert.Equal(t, testWindowSize<<1, stream.windowUpdate)
|
||||||
tempWindowUpdate = stream.windowUpdate
|
tempWindowUpdate = stream.windowUpdate
|
||||||
|
|
||||||
streamChunk = stream.getChunk()
|
tempStreamChunk = stream.getChunk()
|
||||||
assert.Equal(t, tempWindowUpdate, streamChunk.windowUpdate)
|
assert.Equal(t, tempWindowUpdate, tempStreamChunk.windowUpdate)
|
||||||
assert.Equal(t, testWindowSize<<2, stream.receiveWindow)
|
assert.Equal(t, (testWindowSize<<2)-dataSent, stream.receiveWindow)
|
||||||
assert.Equal(t, uint32(0), stream.windowUpdate)
|
assert.Equal(t, testWindowSize<<2, stream.receiveWindowCurrentMax)
|
||||||
assert.Equal(t, testWindowSize, stream.sendWindow)
|
assert.Equal(t, testWindowSize, stream.sendWindow)
|
||||||
|
assert.Equal(t, uint32(0), stream.windowUpdate)
|
||||||
|
|
||||||
assert.False(t, stream.consumeReceiveWindow(testMaxWindowSize+1))
|
assert.False(t, stream.consumeReceiveWindow(testMaxWindowSize+1))
|
||||||
assert.Equal(t, testWindowSize<<2, stream.receiveWindow)
|
assert.Equal(t, (testWindowSize<<2)-dataSent, stream.receiveWindow)
|
||||||
assert.Equal(t, testMaxWindowSize, stream.receiveWindowCurrentMax)
|
assert.Equal(t, testMaxWindowSize, stream.receiveWindowCurrentMax)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,8 @@ type MuxReader struct {
|
||||||
initialStreamWindow uint32
|
initialStreamWindow uint32
|
||||||
// The max value for the send window of a stream.
|
// The max value for the send window of a stream.
|
||||||
streamWindowMax uint32
|
streamWindowMax uint32
|
||||||
|
// The max size for the write buffer of a stream
|
||||||
|
streamWriteBufferMaxLen int
|
||||||
// r is a reference to the underlying connection used when shutting down.
|
// r is a reference to the underlying connection used when shutting down.
|
||||||
r io.Closer
|
r io.Closer
|
||||||
// updateRTTChan is the channel to send new RTT measurement to muxerMetricsUpdater
|
// updateRTTChan is the channel to send new RTT measurement to muxerMetricsUpdater
|
||||||
|
@ -153,6 +155,8 @@ func (r *MuxReader) newMuxedStream(streamID uint32) *MuxedStream {
|
||||||
streamID: streamID,
|
streamID: streamID,
|
||||||
readBuffer: NewSharedBuffer(),
|
readBuffer: NewSharedBuffer(),
|
||||||
writeBuffer: &bytes.Buffer{},
|
writeBuffer: &bytes.Buffer{},
|
||||||
|
writeBufferMaxLen: r.streamWriteBufferMaxLen,
|
||||||
|
writeBufferHasSpace: make(chan struct{}, 1),
|
||||||
receiveWindow: r.initialStreamWindow,
|
receiveWindow: r.initialStreamWindow,
|
||||||
receiveWindowCurrentMax: r.initialStreamWindow,
|
receiveWindowCurrentMax: r.initialStreamWindow,
|
||||||
receiveWindowMax: r.streamWindowMax,
|
receiveWindowMax: r.streamWindowMax,
|
||||||
|
|
Loading…
Reference in New Issue