diff --git a/h2mux/h2mux.go b/h2mux/h2mux.go index a9af47bb..821ed19e 100644 --- a/h2mux/h2mux.go +++ b/h2mux/h2mux.go @@ -7,11 +7,12 @@ import ( "sync" "time" - "github.com/cloudflare/cloudflared/logger" "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "golang.org/x/sync/errgroup" + + "github.com/cloudflare/cloudflared/logger" ) const ( @@ -21,6 +22,7 @@ const ( defaultTimeout time.Duration = 5 * time.Second defaultRetries uint64 = 5 defaultWriteBufferMaxLen int = 1024 * 1024 // 1mb + writeBufferInitialSize int = 16 * 1024 // 16KB SettingMuxerMagic http2.SettingID = 0x42db MuxerMagicOrigin uint32 = 0xa2e43c8b @@ -206,9 +208,9 @@ func Handshake( initialStreamWindow: m.config.DefaultWindowSize, streamWindowMax: m.config.MaxWindowSize, streamWriteBufferMaxLen: m.config.StreamWriteBufferMaxLen, - r: m.r, - metricsUpdater: m.muxMetricsUpdater, - bytesRead: inBoundCounter, + r: m.r, + metricsUpdater: m.muxMetricsUpdater, + bytesRead: inBoundCounter, } m.muxWriter = &MuxWriter{ f: m.f, diff --git a/h2mux/muxedstream.go b/h2mux/muxedstream.go index a37270cc..66b4507e 100644 --- a/h2mux/muxedstream.go +++ b/h2mux/muxedstream.go @@ -138,6 +138,14 @@ func (s *MuxedStream) Write(p []byte) (int, error) { if s.writeEOF { return 0, io.EOF } + + // pre-allocate some space in the write buffer if possible + if buffer, ok := s.writeBuffer.(*bytes.Buffer); ok { + if buffer.Cap() == 0 { + buffer.Grow(writeBufferInitialSize) + } + } + totalWritten := 0 for totalWritten < len(p) { // If the buffer is full, block till there is more room. @@ -367,7 +375,9 @@ type streamChunk struct { // true if data frames should be sent sendData bool eof bool - buffer bytes.Buffer + + buffer []byte + offset int } // getChunk atomically extracts a chunk of data to be written by MuxWriter. @@ -385,8 +395,17 @@ func (s *MuxedStream) getChunk() *streamChunk { eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow, } // Copy at most s.sendWindow bytes, adjust the sendWindow accordingly - writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow)) - s.sendWindow -= uint32(writeLen) + toCopy := int(s.sendWindow) + if toCopy > s.writeBuffer.Len() { + toCopy = s.writeBuffer.Len() + } + + if toCopy > 0 { + buf := make([]byte, toCopy) + writeLen, _ := s.writeBuffer.Read(buf) + chunk.buffer = buf[:writeLen] + s.sendWindow -= uint32(writeLen) + } // Allow MuxedStream::Write() to continue, if needed if s.writeBuffer.Len() < s.writeBufferMaxLen { @@ -421,8 +440,15 @@ func (c *streamChunk) sendDataFrame() bool { } func (c *streamChunk) nextDataFrame(frameSize int) (payload []byte, endStream bool) { - payload = c.buffer.Next(frameSize) - if c.buffer.Len() == 0 { + bytesLeft := len(c.buffer) - c.offset + if frameSize > bytesLeft { + frameSize = bytesLeft + } + nextOffset := c.offset + frameSize + payload = c.buffer[c.offset:nextOffset] + c.offset = nextOffset + + if c.offset == len(c.buffer) { // this is the last data frame in this chunk c.sendData = false if c.eof {