cloudflared-mirror/tunnelrpc/utils.go

90 lines
2.9 KiB
Go
Raw Permalink Normal View History

package tunnelrpc
import (
"context"
"io"
"time"
"github.com/pkg/errors"
capnp "zombiezen.com/go/capnproto2"
"zombiezen.com/go/capnproto2/rpc"
)
const (
// These default values are here so that we give some time for the underlying connection/stream
// to recover in the face of what we believe to be temporarily errors.
// We don't want to be too aggressive, as the end result of giving a final error (non-temporary)
// will result in the connection to be dropped.
// In turn, the other side will probably reconnect, which will put again more pressure in the overall system.
// So, the best solution is to give it some conservative time to recover.
defaultSleepBetweenTemporaryError = 500 * time.Millisecond
defaultMaxRetries = 3
)
type readWriterSafeTemporaryErrorCloser struct {
io.ReadWriteCloser
retries int
sleepBetweenRetries time.Duration
maxRetries int
}
func (r *readWriterSafeTemporaryErrorCloser) Read(p []byte) (n int, err error) {
n, err = r.ReadWriteCloser.Read(p)
// if there was a failure reading from the read closer, and the error is temporary, try again in some seconds
// otherwise, just fail without a temporary error.
if n == 0 && err != nil && isTemporaryError(err) {
if r.retries >= r.maxRetries {
return 0, errors.Wrap(err, "failed read from capnproto ReaderWriter after multiple temporary errors")
} else {
r.retries += 1
// sleep for some time to prevent quick read loops that cause exhaustion of CPU resources
time.Sleep(r.sleepBetweenRetries)
}
}
if err == nil {
r.retries = 0
}
return n, err
}
func SafeTransport(rw io.ReadWriteCloser) rpc.Transport {
return rpc.StreamTransport(&readWriterSafeTemporaryErrorCloser{
ReadWriteCloser: rw,
maxRetries: defaultMaxRetries,
sleepBetweenRetries: defaultSleepBetweenTemporaryError,
})
}
// isTemporaryError reports whether e has a Temporary() method that
// returns true.
func isTemporaryError(e error) bool {
type temp interface {
Temporary() bool
}
t, ok := e.(temp)
return ok && t.Temporary()
}
// NoopCapnpLogger provides a logger to discard all capnp rpc internal logging messages as
// they are by default provided to stdout if no logger interface is provided. These logging
// messages in cloudflared have typically not provided a high amount of pratical value
// as the messages are extremely verbose and don't provide a good insight into the message
// contents or rpc method names.
type noopCapnpLogger struct{}
func (noopCapnpLogger) Infof(ctx context.Context, format string, args ...interface{}) {}
func (noopCapnpLogger) Errorf(ctx context.Context, format string, args ...interface{}) {}
func NewClientConn(transport rpc.Transport) *rpc.Conn {
return rpc.NewConn(transport, rpc.ConnLog(noopCapnpLogger{}))
}
func NewServerConn(transport rpc.Transport, client capnp.Client) *rpc.Conn {
return rpc.NewConn(transport, rpc.MainInterface(client), rpc.ConnLog(noopCapnpLogger{}))
}