TUN-3208: Reduce copies and allocations on h2mux write path. Pre-allocate 16KB write buffer on the first write if possible. Use explicit byte array for chunks on write thread to avoid copying through intermediate buffer due to io.CopyN.

benchmark                                    old ns/op     new ns/op     delta
BenchmarkSingleStreamLargeResponseBody-8     17786594      12163494      -31.61%

benchmark                                    old allocs     new allocs     delta
BenchmarkSingleStreamLargeResponseBody-8     17086          15869          -7.12%

benchmark                                    old bytes     new bytes     delta
BenchmarkSingleStreamLargeResponseBody-8     58215169      21604391      -62.89%
This commit is contained in:
Igor Postelnik 2020-07-27 14:39:20 -05:00
parent 42fe2e7266
commit bd15c6b8c3
2 changed files with 37 additions and 9 deletions

View File

@ -7,11 +7,12 @@ import (
"sync"
"time"
"github.com/cloudflare/cloudflared/logger"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"golang.org/x/sync/errgroup"
"github.com/cloudflare/cloudflared/logger"
)
const (
@ -21,6 +22,7 @@ const (
defaultTimeout time.Duration = 5 * time.Second
defaultRetries uint64 = 5
defaultWriteBufferMaxLen int = 1024 * 1024 // 1mb
writeBufferInitialSize int = 16 * 1024 // 16KB
SettingMuxerMagic http2.SettingID = 0x42db
MuxerMagicOrigin uint32 = 0xa2e43c8b
@ -206,9 +208,9 @@ func Handshake(
initialStreamWindow: m.config.DefaultWindowSize,
streamWindowMax: m.config.MaxWindowSize,
streamWriteBufferMaxLen: m.config.StreamWriteBufferMaxLen,
r: m.r,
metricsUpdater: m.muxMetricsUpdater,
bytesRead: inBoundCounter,
r: m.r,
metricsUpdater: m.muxMetricsUpdater,
bytesRead: inBoundCounter,
}
m.muxWriter = &MuxWriter{
f: m.f,

View File

@ -138,6 +138,14 @@ func (s *MuxedStream) Write(p []byte) (int, error) {
if s.writeEOF {
return 0, io.EOF
}
// pre-allocate some space in the write buffer if possible
if buffer, ok := s.writeBuffer.(*bytes.Buffer); ok {
if buffer.Cap() == 0 {
buffer.Grow(writeBufferInitialSize)
}
}
totalWritten := 0
for totalWritten < len(p) {
// If the buffer is full, block till there is more room.
@ -367,7 +375,9 @@ type streamChunk struct {
// true if data frames should be sent
sendData bool
eof bool
buffer bytes.Buffer
buffer []byte
offset int
}
// getChunk atomically extracts a chunk of data to be written by MuxWriter.
@ -385,8 +395,17 @@ func (s *MuxedStream) getChunk() *streamChunk {
eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow,
}
// Copy at most s.sendWindow bytes, adjust the sendWindow accordingly
writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow))
s.sendWindow -= uint32(writeLen)
toCopy := int(s.sendWindow)
if toCopy > s.writeBuffer.Len() {
toCopy = s.writeBuffer.Len()
}
if toCopy > 0 {
buf := make([]byte, toCopy)
writeLen, _ := s.writeBuffer.Read(buf)
chunk.buffer = buf[:writeLen]
s.sendWindow -= uint32(writeLen)
}
// Allow MuxedStream::Write() to continue, if needed
if s.writeBuffer.Len() < s.writeBufferMaxLen {
@ -421,8 +440,15 @@ func (c *streamChunk) sendDataFrame() bool {
}
func (c *streamChunk) nextDataFrame(frameSize int) (payload []byte, endStream bool) {
payload = c.buffer.Next(frameSize)
if c.buffer.Len() == 0 {
bytesLeft := len(c.buffer) - c.offset
if frameSize > bytesLeft {
frameSize = bytesLeft
}
nextOffset := c.offset + frameSize
payload = c.buffer[c.offset:nextOffset]
c.offset = nextOffset
if c.offset == len(c.buffer) {
// this is the last data frame in this chunk
c.sendData = false
if c.eof {