cloudflared-mirror/quic/safe_stream.go

105 lines
2.8 KiB
Go
Raw Permalink Normal View History

package quic
import (
"errors"
"net"
"sync"
"sync/atomic"
"time"
"github.com/quic-go/quic-go"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// The error that is throw by the writer when there is `no network activity`.
var idleTimeoutError = quic.IdleTimeoutError{}
type SafeStreamCloser struct {
lock sync.Mutex
stream quic.Stream
writeTimeout time.Duration
log *zerolog.Logger
closing atomic.Bool
}
func NewSafeStreamCloser(stream quic.Stream, writeTimeout time.Duration, log *zerolog.Logger) *SafeStreamCloser {
return &SafeStreamCloser{
stream: stream,
writeTimeout: writeTimeout,
log: log,
}
}
func (s *SafeStreamCloser) Read(p []byte) (n int, err error) {
return s.stream.Read(p)
}
func (s *SafeStreamCloser) Write(p []byte) (n int, err error) {
s.lock.Lock()
defer s.lock.Unlock()
if s.writeTimeout > 0 {
err = s.stream.SetWriteDeadline(time.Now().Add(s.writeTimeout))
if err != nil {
log.Err(err).Msg("Error setting write deadline for QUIC stream")
}
}
nBytes, err := s.stream.Write(p)
if err != nil {
s.handleWriteError(err)
}
return nBytes, err
}
// Handles the timeout error in case it happened, by canceling the stream write.
func (s *SafeStreamCloser) handleWriteError(err error) {
// If we are closing the stream we just ignore any write error.
if s.closing.Load() {
return
}
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
// We don't need to log if what cause the timeout was no network activity.
if !errors.Is(netErr, &idleTimeoutError) {
s.log.Error().Err(netErr).Msg("Closing quic stream due to timeout while writing")
}
// We need to explicitly cancel the write so that it frees all buffers.
s.stream.CancelWrite(0)
}
}
}
func (s *SafeStreamCloser) Close() error {
// Set this stream to a closing state.
s.closing.Store(true)
// Make sure a possible writer does not block the lock forever. We need it, so we can close the writer
// side of the stream safely.
_ = s.stream.SetWriteDeadline(time.Now())
// This lock is eventually acquired despite Write also acquiring it, because we set a deadline to writes.
s.lock.Lock()
defer s.lock.Unlock()
// We have to clean up the receiving stream ourselves since the Close in the bottom does not handle that.
s.stream.CancelRead(0)
return s.stream.Close()
}
func (s *SafeStreamCloser) CloseWrite() error {
s.lock.Lock()
defer s.lock.Unlock()
// As documented by the quic-go library, this doesn't actually close the entire stream.
// It prevents further writes, which in turn will result in an EOF signal being sent the other side of stream when
// reading.
// We can still read from this stream.
return s.stream.Close()
}
func (s *SafeStreamCloser) SetDeadline(deadline time.Time) error {
return s.stream.SetDeadline(deadline)
}