TUN-2162: Decomplect OpenStream to allow finer-grained timeouts

This commit is contained in:
Nick Vollmar 2019-08-26 11:30:27 -05:00
parent 5e85a8bd16
commit b836cb350a
4 changed files with 60 additions and 63 deletions

View File

@ -20,11 +20,12 @@ var (
ErrUnknownStream = MuxerProtocolError{"2002 unknown stream", http2.ErrCodeProtocol} ErrUnknownStream = MuxerProtocolError{"2002 unknown stream", http2.ErrCodeProtocol}
ErrInvalidStream = MuxerProtocolError{"2003 invalid stream", http2.ErrCodeProtocol} ErrInvalidStream = MuxerProtocolError{"2003 invalid stream", http2.ErrCodeProtocol}
ErrStreamHeadersSent = MuxerApplicationError{"3000 headers already sent"} ErrStreamHeadersSent = MuxerApplicationError{"3000 headers already sent"}
ErrConnectionClosed = MuxerApplicationError{"3001 connection closed"} ErrStreamRequestConnectionClosed = MuxerApplicationError{"3001 connection closed while opening stream"}
ErrConnectionDropped = MuxerApplicationError{"3002 connection dropped"} ErrConnectionDropped = MuxerApplicationError{"3002 connection dropped"}
ErrOpenStreamTimeout = MuxerApplicationError{"3003 open stream timeout"} ErrStreamRequestTimeout = MuxerApplicationError{"3003 open stream timeout"}
ErrResponseHeadersTimeout = MuxerApplicationError{"3004 timeout waiting for initial response headers"} ErrResponseHeadersTimeout = MuxerApplicationError{"3004 timeout waiting for initial response headers"}
ErrResponseHeadersConnectionClosed = MuxerApplicationError{"3005 connection closed while waiting for initial response headers"}
ErrClosedStream = MuxerStreamError{"4000 stream closed", http2.ErrCodeStreamClosed} ErrClosedStream = MuxerStreamError{"4000 stream closed", http2.ErrCodeStreamClosed}
) )

View File

@ -1,7 +1,6 @@
package h2mux package h2mux
import ( import (
"bytes"
"context" "context"
"io" "io"
"strings" "strings"
@ -388,72 +387,52 @@ func isConnectionClosedError(err error) bool {
// OpenStream opens a new data stream with the given headers. // OpenStream opens a new data stream with the given headers.
// Called by proxy server and tunnel // Called by proxy server and tunnel
func (m *Muxer) OpenStream(ctx context.Context, headers []Header, body io.Reader) (*MuxedStream, error) { func (m *Muxer) OpenStream(ctx context.Context, headers []Header, body io.Reader) (*MuxedStream, error) {
stream := &MuxedStream{ stream := m.NewStream(headers)
responseHeadersReceived: make(chan struct{}), if err := m.MakeMuxedStreamRequest(ctx, MuxedStreamRequest{stream, body}); err != nil {
readBuffer: NewSharedBuffer(), return nil, err
writeBuffer: &bytes.Buffer{},
writeBufferMaxLen: m.config.StreamWriteBufferMaxLen,
writeBufferHasSpace: make(chan struct{}, 1),
receiveWindow: m.config.DefaultWindowSize,
receiveWindowCurrentMax: m.config.DefaultWindowSize,
receiveWindowMax: m.config.MaxWindowSize,
sendWindow: m.config.DefaultWindowSize,
readyList: m.readyList,
writeHeaders: headers,
dictionaries: m.muxReader.dictionaries,
} }
if err := m.AwaitResponseHeaders(ctx, stream); err != nil {
return nil, err
}
return stream, nil
}
func (m *Muxer) OpenRPCStream(ctx context.Context) (*MuxedStream, error) {
stream := m.NewStream(RPCHeaders())
if err := m.MakeMuxedStreamRequest(ctx, MuxedStreamRequest{stream: stream, body: nil}); err != nil {
return nil, err
}
if err := m.AwaitResponseHeaders(ctx, stream); err != nil {
return nil, err
}
return stream, nil
}
func (m *Muxer) NewStream(headers []Header) *MuxedStream {
return NewStream(m.config, headers, m.readyList, m.muxReader.dictionaries)
}
func (m *Muxer) MakeMuxedStreamRequest(ctx context.Context, request MuxedStreamRequest) error {
select { select {
case <-ctx.Done():
return ErrStreamRequestTimeout
case <-m.abortChan:
return ErrStreamRequestConnectionClosed
// Will be received by mux writer // Will be received by mux writer
case <-ctx.Done(): case m.newStreamChan <- request:
return nil, ErrOpenStreamTimeout return nil
case <-m.abortChan:
return nil, ErrConnectionClosed
case m.newStreamChan <- MuxedStreamRequest{stream: stream, body: body}:
}
select {
case <-ctx.Done():
return nil, ErrResponseHeadersTimeout
case <-m.abortChan:
return nil, ErrConnectionClosed
case <-stream.responseHeadersReceived:
return stream, nil
} }
} }
func (m *Muxer) OpenRPCStream(ctx context.Context) (*MuxedStream, error) { func (m *Muxer) AwaitResponseHeaders(ctx context.Context, stream *MuxedStream) error {
stream := &MuxedStream{
responseHeadersReceived: make(chan struct{}),
readBuffer: NewSharedBuffer(),
writeBuffer: &bytes.Buffer{},
writeBufferMaxLen: m.config.StreamWriteBufferMaxLen,
writeBufferHasSpace: make(chan struct{}, 1),
receiveWindow: m.config.DefaultWindowSize,
receiveWindowCurrentMax: m.config.DefaultWindowSize,
receiveWindowMax: m.config.MaxWindowSize,
sendWindow: m.config.DefaultWindowSize,
readyList: m.readyList,
writeHeaders: RPCHeaders(),
dictionaries: m.muxReader.dictionaries,
}
select {
// Will be received by mux writer
case <-ctx.Done():
return nil, ErrOpenStreamTimeout
case <-m.abortChan:
return nil, ErrConnectionClosed
case m.newStreamChan <- MuxedStreamRequest{stream: stream, body: nil}:
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ErrResponseHeadersTimeout return ErrResponseHeadersTimeout
case <-m.abortChan: case <-m.abortChan:
return nil, ErrConnectionClosed return ErrResponseHeadersConnectionClosed
case <-stream.responseHeadersReceived: case <-stream.responseHeadersReceived:
return stream, nil return nil
} }
} }

View File

@ -584,8 +584,8 @@ func TestOpenAfterDisconnect(t *testing.T) {
[]Header{{Name: "test-header", Value: "headerValue"}}, []Header{{Name: "test-header", Value: "headerValue"}},
nil, nil,
) )
if err != ErrConnectionClosed { if err != ErrStreamRequestConnectionClosed && err != ErrResponseHeadersConnectionClosed {
t.Fatalf("unexpected error in OpenStream: %s", err) t.Fatalf("case %v: unexpected error in OpenStream: %v", i, err)
} }
} }
} }

View File

@ -88,6 +88,23 @@ func (th TunnelHostname) IsSet() bool {
return th != "" return th != ""
} }
func NewStream(config MuxerConfig, writeHeaders []Header, readyList *ReadyList, dictionaries h2Dictionaries) *MuxedStream {
return &MuxedStream{
responseHeadersReceived: make(chan struct{}),
readBuffer: NewSharedBuffer(),
writeBuffer: &bytes.Buffer{},
writeBufferMaxLen: config.StreamWriteBufferMaxLen,
writeBufferHasSpace: make(chan struct{}, 1),
receiveWindow: config.DefaultWindowSize,
receiveWindowCurrentMax: config.DefaultWindowSize,
receiveWindowMax: config.MaxWindowSize,
sendWindow: config.DefaultWindowSize,
readyList: readyList,
writeHeaders: writeHeaders,
dictionaries: dictionaries,
}
}
func (s *MuxedStream) Read(p []byte) (n int, err error) { func (s *MuxedStream) Read(p []byte) (n int, err error) {
var readBuffer ReadWriteClosedCloser var readBuffer ReadWriteClosedCloser
if s.dictionaries.read != nil { if s.dictionaries.read != nil {