From 2db00211f58cb06ba7c1b27c61e679f19ee70a30 Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Wed, 15 May 2024 13:06:58 -0700 Subject: [PATCH] TUN-8419: Add capnp safe transport To help support temporary errors that can occur in the capnp rpc calls, a wrapper is introduced to inspect the error conditions and allow for retrying within a short window. --- tunnelrpc/proto/tunnelrpc.capnp.go | 3 +- tunnelrpc/quic/cloudflared_client.go | 3 +- tunnelrpc/quic/cloudflared_server.go | 3 +- tunnelrpc/quic/session_client.go | 3 +- tunnelrpc/quic/session_server.go | 3 +- tunnelrpc/utils.go | 69 ++++++++++++++++++++++++++++ 6 files changed, 79 insertions(+), 5 deletions(-) create mode 100644 tunnelrpc/utils.go diff --git a/tunnelrpc/proto/tunnelrpc.capnp.go b/tunnelrpc/proto/tunnelrpc.capnp.go index 5355ef80..e6ab3ec9 100644 --- a/tunnelrpc/proto/tunnelrpc.capnp.go +++ b/tunnelrpc/proto/tunnelrpc.capnp.go @@ -3,8 +3,9 @@ package proto import ( - context "golang.org/x/net/context" strconv "strconv" + + context "golang.org/x/net/context" capnp "zombiezen.com/go/capnproto2" text "zombiezen.com/go/capnproto2/encoding/text" schemas "zombiezen.com/go/capnproto2/schemas" diff --git a/tunnelrpc/quic/cloudflared_client.go b/tunnelrpc/quic/cloudflared_client.go index 0efde6a5..a5397d17 100644 --- a/tunnelrpc/quic/cloudflared_client.go +++ b/tunnelrpc/quic/cloudflared_client.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" + "github.com/cloudflare/cloudflared/tunnelrpc" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -29,7 +30,7 @@ func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, reques if n != len(rpcStreamProtocolSignature) { return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n) } - transport := rpc.StreamTransport(stream) + transport := tunnelrpc.SafeTransport(stream) conn := rpc.NewConn(transport) client := pogs.NewCloudflaredServer_PogsClient(conn.Bootstrap(ctx), conn) return &CloudflaredClient{ diff --git a/tunnelrpc/quic/cloudflared_server.go b/tunnelrpc/quic/cloudflared_server.go index 8da0ce6e..cb2c8b1a 100644 --- a/tunnelrpc/quic/cloudflared_server.go +++ b/tunnelrpc/quic/cloudflared_server.go @@ -8,6 +8,7 @@ import ( "zombiezen.com/go/capnproto2/rpc" + "github.com/cloudflare/cloudflared/tunnelrpc" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -53,7 +54,7 @@ func (s *CloudflaredServer) Serve(ctx context.Context, stream io.ReadWriteCloser func (s *CloudflaredServer) handleRPC(ctx context.Context, stream io.ReadWriteCloser) error { ctx, cancel := context.WithTimeout(ctx, s.responseTimeout) defer cancel() - transport := rpc.StreamTransport(stream) + transport := tunnelrpc.SafeTransport(stream) defer transport.Close() main := pogs.CloudflaredServer_ServerToClient(s.sessionManager, s.configManager) diff --git a/tunnelrpc/quic/session_client.go b/tunnelrpc/quic/session_client.go index 46ceb527..a8641e43 100644 --- a/tunnelrpc/quic/session_client.go +++ b/tunnelrpc/quic/session_client.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "zombiezen.com/go/capnproto2/rpc" + "github.com/cloudflare/cloudflared/tunnelrpc" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -28,7 +29,7 @@ func NewSessionClient(ctx context.Context, stream io.ReadWriteCloser, requestTim if n != len(rpcStreamProtocolSignature) { return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n) } - transport := rpc.StreamTransport(stream) + transport := tunnelrpc.SafeTransport(stream) conn := rpc.NewConn(transport) return &SessionClient{ client: pogs.NewSessionManager_PogsClient(conn.Bootstrap(ctx), conn), diff --git a/tunnelrpc/quic/session_server.go b/tunnelrpc/quic/session_server.go index 1498863d..f5269e3c 100644 --- a/tunnelrpc/quic/session_server.go +++ b/tunnelrpc/quic/session_server.go @@ -8,6 +8,7 @@ import ( "zombiezen.com/go/capnproto2/rpc" + "github.com/cloudflare/cloudflared/tunnelrpc" "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) @@ -43,7 +44,7 @@ func (s *SessionManagerServer) Serve(ctx context.Context, stream io.ReadWriteClo ctx, cancel := context.WithTimeout(ctx, s.responseTimeout) defer cancel() - transport := rpc.StreamTransport(stream) + transport := tunnelrpc.SafeTransport(stream) defer transport.Close() main := pogs.SessionManager_ServerToClient(s.sessionManager) diff --git a/tunnelrpc/utils.go b/tunnelrpc/utils.go new file mode 100644 index 00000000..dd99dcea --- /dev/null +++ b/tunnelrpc/utils.go @@ -0,0 +1,69 @@ +package tunnelrpc + +import ( + "io" + "time" + + "github.com/pkg/errors" + "zombiezen.com/go/capnproto2/rpc" +) + +const ( + // These default values are here so that we give some time for the underlying connection/stream + // to recover in the face of what we believe to be temporarily errors. + // We don't want to be too aggressive, as the end result of giving a final error (non-temporary) + // will result in the connection to be dropped. + // In turn, the other side will probably reconnect, which will put again more pressure in the overall system. + // So, the best solution is to give it some conservative time to recover. + defaultSleepBetweenTemporaryError = 500 * time.Millisecond + defaultMaxRetries = 3 +) + +type readWriterSafeTemporaryErrorCloser struct { + io.ReadWriteCloser + + retries int + sleepBetweenRetries time.Duration + maxRetries int +} + +func (r *readWriterSafeTemporaryErrorCloser) Read(p []byte) (n int, err error) { + n, err = r.ReadWriteCloser.Read(p) + + // if there was a failure reading from the read closer, and the error is temporary, try again in some seconds + // otherwise, just fail without a temporary error. + if n == 0 && err != nil && isTemporaryError(err) { + if r.retries >= r.maxRetries { + return 0, errors.Wrap(err, "failed read from capnproto ReaderWriter after multiple temporary errors") + } else { + r.retries += 1 + + // sleep for some time to prevent quick read loops that cause exhaustion of CPU resources + time.Sleep(r.sleepBetweenRetries) + } + } + + if err == nil { + r.retries = 0 + } + + return n, err +} + +func SafeTransport(rw io.ReadWriteCloser) rpc.Transport { + return rpc.StreamTransport(&readWriterSafeTemporaryErrorCloser{ + ReadWriteCloser: rw, + maxRetries: defaultMaxRetries, + sleepBetweenRetries: defaultSleepBetweenTemporaryError, + }) +} + +// isTemporaryError reports whether e has a Temporary() method that +// returns true. +func isTemporaryError(e error) bool { + type temp interface { + Temporary() bool + } + t, ok := e.(temp) + return ok && t.Temporary() +}