TUN-8419: Add capnp safe transport
To help support temporary errors that can occur in the capnp rpc calls, a wrapper is introduced to inspect the error conditions and allow for retrying within a short window.
This commit is contained in:
parent
eb2e4349e8
commit
2db00211f5
|
@ -3,8 +3,9 @@
|
||||||
package proto
|
package proto
|
||||||
|
|
||||||
import (
|
import (
|
||||||
context "golang.org/x/net/context"
|
|
||||||
strconv "strconv"
|
strconv "strconv"
|
||||||
|
|
||||||
|
context "golang.org/x/net/context"
|
||||||
capnp "zombiezen.com/go/capnproto2"
|
capnp "zombiezen.com/go/capnproto2"
|
||||||
text "zombiezen.com/go/capnproto2/encoding/text"
|
text "zombiezen.com/go/capnproto2/encoding/text"
|
||||||
schemas "zombiezen.com/go/capnproto2/schemas"
|
schemas "zombiezen.com/go/capnproto2/schemas"
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
||||||
|
"github.com/cloudflare/cloudflared/tunnelrpc"
|
||||||
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -29,7 +30,7 @@ func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, reques
|
||||||
if n != len(rpcStreamProtocolSignature) {
|
if n != len(rpcStreamProtocolSignature) {
|
||||||
return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n)
|
return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n)
|
||||||
}
|
}
|
||||||
transport := rpc.StreamTransport(stream)
|
transport := tunnelrpc.SafeTransport(stream)
|
||||||
conn := rpc.NewConn(transport)
|
conn := rpc.NewConn(transport)
|
||||||
client := pogs.NewCloudflaredServer_PogsClient(conn.Bootstrap(ctx), conn)
|
client := pogs.NewCloudflaredServer_PogsClient(conn.Bootstrap(ctx), conn)
|
||||||
return &CloudflaredClient{
|
return &CloudflaredClient{
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"zombiezen.com/go/capnproto2/rpc"
|
"zombiezen.com/go/capnproto2/rpc"
|
||||||
|
|
||||||
|
"github.com/cloudflare/cloudflared/tunnelrpc"
|
||||||
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -53,7 +54,7 @@ func (s *CloudflaredServer) Serve(ctx context.Context, stream io.ReadWriteCloser
|
||||||
func (s *CloudflaredServer) handleRPC(ctx context.Context, stream io.ReadWriteCloser) error {
|
func (s *CloudflaredServer) handleRPC(ctx context.Context, stream io.ReadWriteCloser) error {
|
||||||
ctx, cancel := context.WithTimeout(ctx, s.responseTimeout)
|
ctx, cancel := context.WithTimeout(ctx, s.responseTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
transport := rpc.StreamTransport(stream)
|
transport := tunnelrpc.SafeTransport(stream)
|
||||||
defer transport.Close()
|
defer transport.Close()
|
||||||
|
|
||||||
main := pogs.CloudflaredServer_ServerToClient(s.sessionManager, s.configManager)
|
main := pogs.CloudflaredServer_ServerToClient(s.sessionManager, s.configManager)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"zombiezen.com/go/capnproto2/rpc"
|
"zombiezen.com/go/capnproto2/rpc"
|
||||||
|
|
||||||
|
"github.com/cloudflare/cloudflared/tunnelrpc"
|
||||||
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,7 +29,7 @@ func NewSessionClient(ctx context.Context, stream io.ReadWriteCloser, requestTim
|
||||||
if n != len(rpcStreamProtocolSignature) {
|
if n != len(rpcStreamProtocolSignature) {
|
||||||
return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n)
|
return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n)
|
||||||
}
|
}
|
||||||
transport := rpc.StreamTransport(stream)
|
transport := tunnelrpc.SafeTransport(stream)
|
||||||
conn := rpc.NewConn(transport)
|
conn := rpc.NewConn(transport)
|
||||||
return &SessionClient{
|
return &SessionClient{
|
||||||
client: pogs.NewSessionManager_PogsClient(conn.Bootstrap(ctx), conn),
|
client: pogs.NewSessionManager_PogsClient(conn.Bootstrap(ctx), conn),
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"zombiezen.com/go/capnproto2/rpc"
|
"zombiezen.com/go/capnproto2/rpc"
|
||||||
|
|
||||||
|
"github.com/cloudflare/cloudflared/tunnelrpc"
|
||||||
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -43,7 +44,7 @@ func (s *SessionManagerServer) Serve(ctx context.Context, stream io.ReadWriteClo
|
||||||
ctx, cancel := context.WithTimeout(ctx, s.responseTimeout)
|
ctx, cancel := context.WithTimeout(ctx, s.responseTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
transport := rpc.StreamTransport(stream)
|
transport := tunnelrpc.SafeTransport(stream)
|
||||||
defer transport.Close()
|
defer transport.Close()
|
||||||
|
|
||||||
main := pogs.SessionManager_ServerToClient(s.sessionManager)
|
main := pogs.SessionManager_ServerToClient(s.sessionManager)
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
package tunnelrpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"zombiezen.com/go/capnproto2/rpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// These default values are here so that we give some time for the underlying connection/stream
|
||||||
|
// to recover in the face of what we believe to be temporarily errors.
|
||||||
|
// We don't want to be too aggressive, as the end result of giving a final error (non-temporary)
|
||||||
|
// will result in the connection to be dropped.
|
||||||
|
// In turn, the other side will probably reconnect, which will put again more pressure in the overall system.
|
||||||
|
// So, the best solution is to give it some conservative time to recover.
|
||||||
|
defaultSleepBetweenTemporaryError = 500 * time.Millisecond
|
||||||
|
defaultMaxRetries = 3
|
||||||
|
)
|
||||||
|
|
||||||
|
type readWriterSafeTemporaryErrorCloser struct {
|
||||||
|
io.ReadWriteCloser
|
||||||
|
|
||||||
|
retries int
|
||||||
|
sleepBetweenRetries time.Duration
|
||||||
|
maxRetries int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readWriterSafeTemporaryErrorCloser) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = r.ReadWriteCloser.Read(p)
|
||||||
|
|
||||||
|
// if there was a failure reading from the read closer, and the error is temporary, try again in some seconds
|
||||||
|
// otherwise, just fail without a temporary error.
|
||||||
|
if n == 0 && err != nil && isTemporaryError(err) {
|
||||||
|
if r.retries >= r.maxRetries {
|
||||||
|
return 0, errors.Wrap(err, "failed read from capnproto ReaderWriter after multiple temporary errors")
|
||||||
|
} else {
|
||||||
|
r.retries += 1
|
||||||
|
|
||||||
|
// sleep for some time to prevent quick read loops that cause exhaustion of CPU resources
|
||||||
|
time.Sleep(r.sleepBetweenRetries)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
r.retries = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func SafeTransport(rw io.ReadWriteCloser) rpc.Transport {
|
||||||
|
return rpc.StreamTransport(&readWriterSafeTemporaryErrorCloser{
|
||||||
|
ReadWriteCloser: rw,
|
||||||
|
maxRetries: defaultMaxRetries,
|
||||||
|
sleepBetweenRetries: defaultSleepBetweenTemporaryError,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// isTemporaryError reports whether e has a Temporary() method that
|
||||||
|
// returns true.
|
||||||
|
func isTemporaryError(e error) bool {
|
||||||
|
type temp interface {
|
||||||
|
Temporary() bool
|
||||||
|
}
|
||||||
|
t, ok := e.(temp)
|
||||||
|
return ok && t.Temporary()
|
||||||
|
}
|
Loading…
Reference in New Issue