TUN-2489: Delete stream from activestreammap when read and write are both closed
This commit is contained in:
parent
068b148e05
commit
3a9a0a0d75
|
@ -397,7 +397,6 @@ func (m *Muxer) OpenStream(ctx context.Context, headers []Header, body io.Reader
|
||||||
return stream, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (m *Muxer) OpenRPCStream(ctx context.Context) (*MuxedStream, error) {
|
func (m *Muxer) OpenRPCStream(ctx context.Context) (*MuxedStream, error) {
|
||||||
stream := m.NewStream(RPCHeaders())
|
stream := m.NewStream(RPCHeaders())
|
||||||
if err := m.MakeMuxedStreamRequest(ctx, MuxedStreamRequest{stream: stream, body: nil}); err != nil {
|
if err := m.MakeMuxedStreamRequest(ctx, MuxedStreamRequest{stream: stream, body: nil}); err != nil {
|
||||||
|
@ -425,6 +424,13 @@ func (m *Muxer) MakeMuxedStreamRequest(ctx context.Context, request MuxedStreamR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Muxer) CloseStreamRead(stream *MuxedStream) {
|
||||||
|
stream.CloseRead()
|
||||||
|
if stream.WriteClosed() {
|
||||||
|
m.streams.Delete(stream.streamID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Muxer) AwaitResponseHeaders(ctx context.Context, stream *MuxedStream) error {
|
func (m *Muxer) AwaitResponseHeaders(ctx context.Context, stream *MuxedStream) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -192,6 +192,12 @@ func (s *MuxedStream) CloseWrite() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *MuxedStream) WriteClosed() bool {
|
||||||
|
s.writeLock.Lock()
|
||||||
|
defer s.writeLock.Unlock()
|
||||||
|
return s.writeEOF
|
||||||
|
}
|
||||||
|
|
||||||
func (s *MuxedStream) WriteHeaders(headers []Header) error {
|
func (s *MuxedStream) WriteHeaders(headers []Header) error {
|
||||||
s.writeLock.Lock()
|
s.writeLock.Lock()
|
||||||
defer s.writeLock.Unlock()
|
defer s.writeLock.Unlock()
|
||||||
|
@ -351,7 +357,6 @@ func (s *MuxedStream) getChunk() *streamChunk {
|
||||||
sendData: !s.sentEOF,
|
sendData: !s.sentEOF,
|
||||||
eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow,
|
eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy at most s.sendWindow bytes, adjust the sendWindow accordingly
|
// Copy at most s.sendWindow bytes, adjust the sendWindow accordingly
|
||||||
writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow))
|
writeLen, _ := io.CopyN(&chunk.buffer, s.writeBuffer, int64(s.sendWindow))
|
||||||
s.sendWindow -= uint32(writeLen)
|
s.sendWindow -= uint32(writeLen)
|
||||||
|
|
Loading…
Reference in New Issue