From bd15c6b8c358d1a48689449c8a29082d484e2a3a Mon Sep 17 00:00:00 2001 From: Igor Postelnik Date: Mon, 27 Jul 2020 14:39:20 -0500 Subject: [PATCH] TUN-3208: Reduce copies and allocations on h2mux write path. Pre-allocate 16KB write buffer on the first write if possible. Use explicit byte array for chunks on write thread to avoid copying through intermediate buffer due to io.CopyN. benchmark old ns/op new ns/op delta BenchmarkSingleStreamLargeResponseBody-8 17786594 12163494 -31.61% benchmark old allocs new allocs delta BenchmarkSingleStreamLargeResponseBody-8 17086 15869 -7.12% benchmark old bytes new bytes delta BenchmarkSingleStreamLargeResponseBody-8 58215169 21604391 -62.89% --- h2mux/h2mux.go | 10 ++++++---- h2mux/muxedstream.go | 36 +++++++++++++++++++++++++++++++----- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/h2mux/h2mux.go b/h2mux/h2mux.go index a9af47bb..821ed19e 100644 --- a/h2mux/h2mux.go +++ b/h2mux/h2mux.go @@ -7,11 +7,12 @@ import ( "sync" "time" - "github.com/cloudflare/cloudflared/logger" "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "golang.org/x/sync/errgroup" + + "github.com/cloudflare/cloudflared/logger" ) const ( @@ -21,6 +22,7 @@ const ( defaultTimeout time.Duration = 5 * time.Second defaultRetries uint64 = 5 defaultWriteBufferMaxLen int = 1024 * 1024 // 1mb + writeBufferInitialSize int = 16 * 1024 // 16KB SettingMuxerMagic http2.SettingID = 0x42db MuxerMagicOrigin uint32 = 0xa2e43c8b @@ -206,9 +208,9 @@ func Handshake( initialStreamWindow: m.config.DefaultWindowSize, streamWindowMax: m.config.MaxWindowSize, streamWriteBufferMaxLen: m.config.StreamWriteBufferMaxLen, - r: m.r, - metricsUpdater: m.muxMetricsUpdater, - bytesRead: inBoundCounter, + r: m.r, + metricsUpdater: m.muxMetricsUpdater, + bytesRead: inBoundCounter, } m.muxWriter = &MuxWriter{ f: m.f, diff --git a/h2mux/muxedstream.go b/h2mux/muxedstream.go index a37270cc..66b4507e 100644 --- a/h2mux/muxedstream.go +++ b/h2mux/muxedstream.go @@ -138,6 +138,14 @@ func (s *MuxedStream) Write(p []byte) (int, error) { if s.writeEOF { return 0, io.EOF } + + // pre-allocate some space in the write buffer if possible + if buffer, ok := s.writeBuffer.(*bytes.Buffer); ok { + if buffer.Cap() == 0 { + buffer.Grow(writeBufferInitialSize) + } + } + totalWritten := 0 for totalWritten < len(p) { // If the buffer is full, block till there is more room. @@ -367,7 +375,9 @@ type streamChunk struct { // true if data frames should be sent sendData bool eof bool - buffer bytes.Buffer + + buffer []byte + offset int } // getChunk atomically extracts a chunk of data to be written by MuxWriter. @@ -385,8 +395,17 @@ func (s *MuxedStream) getChunk() *streamChunk { eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow, } // Copy at most s.sendWindow bytes, adjust the sendWindow accordingly - writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow)) - s.sendWindow -= uint32(writeLen) + toCopy := int(s.sendWindow) + if toCopy > s.writeBuffer.Len() { + toCopy = s.writeBuffer.Len() + } + + if toCopy > 0 { + buf := make([]byte, toCopy) + writeLen, _ := s.writeBuffer.Read(buf) + chunk.buffer = buf[:writeLen] + s.sendWindow -= uint32(writeLen) + } // Allow MuxedStream::Write() to continue, if needed if s.writeBuffer.Len() < s.writeBufferMaxLen { @@ -421,8 +440,15 @@ func (c *streamChunk) sendDataFrame() bool { } func (c *streamChunk) nextDataFrame(frameSize int) (payload []byte, endStream bool) { - payload = c.buffer.Next(frameSize) - if c.buffer.Len() == 0 { + bytesLeft := len(c.buffer) - c.offset + if frameSize > bytesLeft { + frameSize = bytesLeft + } + nextOffset := c.offset + frameSize + payload = c.buffer[c.offset:nextOffset] + c.offset = nextOffset + + if c.offset == len(c.buffer) { // this is the last data frame in this chunk c.sendData = false if c.eof {