TUN-3799: extended the Stream interface to take a logger and added debug logs for io.Copy errors

This commit is contained in:
Sudarsan Reddy 2021-02-04 15:07:18 +00:00 committed by Nuno Diegues
parent a6c2348127
commit 8b794390e5
5 changed files with 26 additions and 17 deletions

View File

@ -62,7 +62,7 @@ func (ws *Websocket) ServeStream(options *StartOptions, conn io.ReadWriter) erro
_ = socksServer.Serve(conn) _ = socksServer.Serve(conn)
} else { } else {
ingress.Stream(wsConn, conn) ingress.Stream(wsConn, conn, ws.log)
} }
return nil return nil
} }

View File

@ -8,30 +8,37 @@ import (
"github.com/cloudflare/cloudflared/connection" "github.com/cloudflare/cloudflared/connection"
"github.com/cloudflare/cloudflared/websocket" "github.com/cloudflare/cloudflared/websocket"
gws "github.com/gorilla/websocket" gws "github.com/gorilla/websocket"
"github.com/rs/zerolog"
) )
// OriginConnection is a way to stream to a service running on the user's origin. // OriginConnection is a way to stream to a service running on the user's origin.
// Different concrete implementations will stream different protocols as long as they are io.ReadWriters. // Different concrete implementations will stream different protocols as long as they are io.ReadWriters.
type OriginConnection interface { type OriginConnection interface {
// Stream should generally be implemented as a bidirectional io.Copy. // Stream should generally be implemented as a bidirectional io.Copy.
Stream(tunnelConn io.ReadWriter) Stream(tunnelConn io.ReadWriter, log *zerolog.Logger)
Close() Close()
Type() connection.Type Type() connection.Type
} }
type streamHandlerFunc func(originConn io.ReadWriter, remoteConn net.Conn) type streamHandlerFunc func(originConn io.ReadWriter, remoteConn net.Conn, log *zerolog.Logger)
// Stream copies copy data to & from provided io.ReadWriters. // Stream copies copy data to & from provided io.ReadWriters.
func Stream(conn, backendConn io.ReadWriter) { func Stream(conn, backendConn io.ReadWriter, log *zerolog.Logger) {
proxyDone := make(chan struct{}, 2) proxyDone := make(chan struct{}, 2)
go func() { go func() {
io.Copy(conn, backendConn) _, err := io.Copy(conn, backendConn)
if err != nil {
log.Debug().Msgf("conn to backendConn copy: %v", err)
}
proxyDone <- struct{}{} proxyDone <- struct{}{}
}() }()
go func() { go func() {
io.Copy(backendConn, conn) _, err := io.Copy(backendConn, conn)
if err != nil {
log.Debug().Msgf("backendConn to conn copy: %v", err)
}
proxyDone <- struct{}{} proxyDone <- struct{}{}
}() }()
@ -41,8 +48,8 @@ func Stream(conn, backendConn io.ReadWriter) {
// DefaultStreamHandler is an implementation of streamHandlerFunc that // DefaultStreamHandler is an implementation of streamHandlerFunc that
// performs a two way io.Copy between originConn and remoteConn. // performs a two way io.Copy between originConn and remoteConn.
func DefaultStreamHandler(originConn io.ReadWriter, remoteConn net.Conn) { func DefaultStreamHandler(originConn io.ReadWriter, remoteConn net.Conn, log *zerolog.Logger) {
Stream(originConn, remoteConn) Stream(originConn, remoteConn, log)
} }
// tcpConnection is an OriginConnection that directly streams to raw TCP. // tcpConnection is an OriginConnection that directly streams to raw TCP.
@ -51,8 +58,8 @@ type tcpConnection struct {
streamHandler streamHandlerFunc streamHandler streamHandlerFunc
} }
func (tc *tcpConnection) Stream(tunnelConn io.ReadWriter) { func (tc *tcpConnection) Stream(tunnelConn io.ReadWriter, log *zerolog.Logger) {
tc.streamHandler(tunnelConn, tc.conn) tc.streamHandler(tunnelConn, tc.conn, log)
} }
func (tc *tcpConnection) Close() { func (tc *tcpConnection) Close() {
@ -70,8 +77,8 @@ type wsConnection struct {
resp *http.Response resp *http.Response
} }
func (wsc *wsConnection) Stream(tunnelConn io.ReadWriter) { func (wsc *wsConnection) Stream(tunnelConn io.ReadWriter, log *zerolog.Logger) {
Stream(tunnelConn, wsc.wsConn.UnderlyingConn()) Stream(tunnelConn, wsc.wsConn.UnderlyingConn(), log)
} }
func (wsc *wsConnection) Close() { func (wsc *wsConnection) Close() {

View File

@ -202,7 +202,7 @@ func (p *proxy) proxyConnection(
originConn.Close() originConn.Close()
}() }()
originConn.Stream(eyeballConn) originConn.Stream(eyeballConn, p.log)
return resp, nil return resp, nil
} }

View File

@ -5,6 +5,8 @@ import (
"io" "io"
"net" "net"
"strings" "strings"
"github.com/rs/zerolog"
) )
// RequestHandler is the functions needed to handle a SOCKS5 command // RequestHandler is the functions needed to handle a SOCKS5 command
@ -106,7 +108,7 @@ func (h *StandardRequestHandler) handleAssociate(conn io.ReadWriter, req *Reques
return nil return nil
} }
func StreamHandler(tunnelConn io.ReadWriter, originConn net.Conn) { func StreamHandler(tunnelConn io.ReadWriter, originConn net.Conn, log *zerolog.Logger) {
dialer := NewConnDialer(originConn) dialer := NewConnDialer(originConn)
requestHandler := NewRequestHandler(dialer) requestHandler := NewRequestHandler(dialer)
socksServer := NewConnectionHandler(requestHandler) socksServer := NewConnectionHandler(requestHandler)

View File

@ -54,7 +54,7 @@ func StartProxyServer(
listener net.Listener, listener net.Listener,
staticHost string, staticHost string,
shutdownC <-chan struct{}, shutdownC <-chan struct{},
streamHandler func(originConn io.ReadWriter, remoteConn net.Conn), streamHandler func(originConn io.ReadWriter, remoteConn net.Conn, log *zerolog.Logger),
) error { ) error {
upgrader := websocket.Upgrader{ upgrader := websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 1024,
@ -81,7 +81,7 @@ type handler struct {
log *zerolog.Logger log *zerolog.Logger
staticHost string staticHost string
upgrader websocket.Upgrader upgrader websocket.Upgrader
streamHandler func(originConn io.ReadWriter, remoteConn net.Conn) streamHandler func(originConn io.ReadWriter, remoteConn net.Conn, log *zerolog.Logger)
} }
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -118,7 +118,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
go gorillaConn.pinger(r.Context()) go gorillaConn.pinger(r.Context())
defer conn.Close() defer conn.Close()
h.streamHandler(gorillaConn, stream) h.streamHandler(gorillaConn, stream, h.log)
} }
// NewResponseHeader returns headers needed to return to origin for completing handshake // NewResponseHeader returns headers needed to return to origin for completing handshake