Release 2017.11.3

This commit is contained in:
Chris Branch 2017-11-28 13:41:29 +00:00
parent ff67bd23f2
commit ee499dfa88
11 changed files with 422 additions and 180 deletions

View File

@ -41,7 +41,7 @@ After=network.target
[Service]
TimeoutStartSec=0
Type=notify
ExecStart={{ .Path }} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --autoupdate 0s
ExecStart={{ .Path }} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --no-autoupdate
[Install]
WantedBy=multi-user.target
@ -63,7 +63,7 @@ ExecStart=/bin/bash -c '{{ .Path }} update; code=$?; if [ $code -eq 64 ]; then s
Description=Update Cloudflare Warp
[Timer]
OnUnitActiveSec=1d
OnUnitActiveSec=1d
[Install]
WantedBy=timers.target
@ -87,7 +87,7 @@ var sysvTemplate = ServiceTemplate{
# Short-Description: Cloudflare Warp
# Description: Cloudflare Warp agent
### END INIT INFO
cmd="{{.Path}} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --pidfile /var/run/$name.pid"
cmd="{{.Path}} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --pidfile /var/run/$name.pid --autoupdate-freq 24h0m0s"
name=$(basename $(readlink -f $0))
pid_file="/var/run/$name.pid"
stdout_log="/var/log/$name.log"

View File

@ -14,7 +14,6 @@ import (
"syscall"
"time"
"github.com/cloudflare/cloudflare-warp/h2mux"
"github.com/cloudflare/cloudflare-warp/metrics"
"github.com/cloudflare/cloudflare-warp/origin"
"github.com/cloudflare/cloudflare-warp/tlsconfig"
@ -84,9 +83,8 @@ WARNING:
Usage: "Disable periodic check for updates, restarting the server with the new version.",
Value: false,
}),
altsrc.NewStringFlag(&cli.StringFlag{
altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "edge",
Value: "cftunnel.com:7844",
Usage: "Address of the Cloudflare tunnel server.",
EnvVars: []string{"TUNNEL_EDGE"},
Hidden: true,
@ -149,6 +147,12 @@ WARNING:
Usage: "Listen address for metrics reporting.",
EnvVars: []string{"TUNNEL_METRICS"},
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: "metrics-update-freq",
Usage: "Frequency to update tunnel metrics",
Value: time.Second * 5,
EnvVars: []string{"TUNNEL_METRICS_UPDATE_FREQ"},
}),
altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "tag",
Usage: "Custom tags used to identify this tunnel, in format `KEY=VALUE`. Multiple tags may be specified",
@ -169,9 +173,15 @@ WARNING:
altsrc.NewStringFlag(&cli.StringFlag{
Name: "loglevel",
Value: "info",
Usage: "Logging level {panic, fatal, error, warn, info, debug}",
Usage: "Application logging level {panic, fatal, error, warn, info, debug}",
EnvVars: []string{"TUNNEL_LOGLEVEL"},
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "proto-loglevel",
Value: "warn",
Usage: "Protocol logging level {panic, fatal, error, warn, info, debug}",
EnvVars: []string{"TUNNEL_PROTO_LOGLEVEL"},
}),
altsrc.NewUintFlag(&cli.UintFlag{
Name: "retries",
Value: 5,
@ -188,6 +198,11 @@ WARNING:
Usage: "Write the application's PID to this file after first successful connection.",
EnvVars: []string{"TUNNEL_PIDFILE"},
}),
altsrc.NewIntFlag(&cli.IntFlag{
Name: "ha-connections",
Value: 4,
Hidden: true,
}),
}
app.Action = func(c *cli.Context) error {
raven.CapturePanic(func() { startServer(c) }, nil)
@ -259,6 +274,13 @@ func startServer(c *cli.Context) {
}
log.SetLevel(logLevel)
protoLogLevel, err := log.ParseLevel(c.String("proto-loglevel"))
if err != nil {
log.WithError(err).Fatal("Unknown protocol logging level specified")
}
protoLogger := log.New()
protoLogger.Level = protoLogLevel
hostname, err := validation.ValidateHostname(c.String("hostname"))
if err != nil {
log.WithError(err).Fatal("Invalid hostname")
@ -325,8 +347,9 @@ If you don't have a certificate signed by Cloudflare, run the command:
if err != nil {
log.WithError(err).Fatalf("Cannot read %s to load origin certificate", originCertPath)
}
tunnelMetrics := origin.NewTunnelMetrics()
tunnelConfig := &origin.TunnelConfig{
EdgeAddr: c.String("edge"),
EdgeAddrs: c.StringSlice("edge"),
OriginUrl: url,
Hostname: hostname,
OriginCert: originCert,
@ -338,20 +361,25 @@ If you don't have a certificate signed by Cloudflare, run the command:
ReportedVersion: Version,
LBPool: c.String("lb-pool"),
Tags: tags,
ConnectedSignal: h2mux.NewSignal(),
HAConnections: c.Int("ha-connections"),
Metrics: tunnelMetrics,
MetricsUpdateFreq: c.Duration("metrics-update-freq"),
ProtocolLogger: protoLogger,
}
connectedSignal := make(chan struct{})
tunnelConfig.TlsConfig = tlsconfig.CLIFlags{RootCA: "cacert"}.GetConfig(c)
if tunnelConfig.TlsConfig.RootCAs == nil {
tunnelConfig.TlsConfig.RootCAs = GetCloudflareRootCA()
tunnelConfig.TlsConfig.ServerName = "cftunnel.com"
} else {
tunnelConfig.TlsConfig.ServerName, _, _ = net.SplitHostPort(tunnelConfig.EdgeAddr)
} else if len(tunnelConfig.EdgeAddrs) > 0 {
// Set for development environments and for testing specific origintunneld instances
tunnelConfig.TlsConfig.ServerName, _, _ = net.SplitHostPort(tunnelConfig.EdgeAddrs[0])
}
go writePidFile(tunnelConfig.ConnectedSignal, c.String("pidfile"))
go writePidFile(connectedSignal, c.String("pidfile"))
go func() {
errC <- origin.StartTunnelDaemon(tunnelConfig, shutdownC)
errC <- origin.StartTunnelDaemon(tunnelConfig, shutdownC, connectedSignal)
wg.Done()
}()
@ -476,8 +504,8 @@ func generateRandomClientID() string {
return hex.EncodeToString(id)
}
func writePidFile(waitForSignal h2mux.Signal, pidFile string) {
waitForSignal.Wait()
func writePidFile(waitForSignal chan struct{}, pidFile string) {
<-waitForSignal
daemon.SdNotify(false, "READY=1")
if pidFile == "" {
return

View File

@ -1,11 +1,19 @@
package h2mux
import "sync/atomic"
import "sync"
// BooleanFuse is a data structure that can be set once to a particular value using Fuse(value).
// Subsequent calls to Fuse() will have no effect.
type BooleanFuse struct {
value int32
mu sync.Mutex
cond *sync.Cond
}
func NewBooleanFuse() *BooleanFuse {
f := &BooleanFuse{}
f.cond = sync.NewCond(&f.mu)
return f
}
// Value gets the value
@ -13,13 +21,30 @@ func (f *BooleanFuse) Value() bool {
// 0: unset
// 1: set true
// 2: set false
return atomic.LoadInt32(&f.value) == 1
f.mu.Lock()
defer f.mu.Unlock()
return f.value == 1
}
func (f *BooleanFuse) Fuse(result bool) {
f.mu.Lock()
defer f.mu.Unlock()
newValue := int32(2)
if result {
newValue = 1
}
atomic.CompareAndSwapInt32(&f.value, 0, newValue)
if f.value == 0 {
f.value = newValue
f.cond.Broadcast()
}
}
// Await blocks until Fuse has been called at least once.
func (f *BooleanFuse) Await() bool {
f.mu.Lock()
defer f.mu.Unlock()
for f.value == 0 {
f.cond.Wait()
}
return f.value == 1
}

View File

@ -43,6 +43,8 @@ type MuxerConfig struct {
HeartbeatInterval time.Duration
// The minimum number of heartbeats to send before terminating the connection.
MaxHeartbeats uint64
// Logger to use
Logger *log.Logger
}
type Muxer struct {
@ -70,7 +72,7 @@ type Muxer struct {
streams *activeStreamMap
// explicitShutdown records whether the Muxer is closing because Shutdown was called, or due to another
// error.
explicitShutdown BooleanFuse
explicitShutdown *BooleanFuse
}
type Header struct {
@ -84,6 +86,13 @@ func Handshake(
r io.ReadCloser,
config MuxerConfig,
) (*Muxer, error) {
// Set default config values
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
if config.Logger == nil {
config.Logger = log.New()
}
// Initialise connection state fields
m := &Muxer{
f: http2.NewFramer(w, r), // A framer that writes to w and reads from r
@ -97,10 +106,6 @@ func Handshake(
}
m.f.ReadMetaHeaders = hpack.NewDecoder(4096, func(hpack.HeaderField) {})
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
// Initialise the settings to identify this connection and confirm the other end is sane.
handshakeSetting := http2.Setting{ID: SettingMuxerMagic, Val: MuxerMagicEdge}
expectedMagic := MuxerMagicOrigin
@ -134,14 +139,15 @@ func Handshake(
// Sanity check to enusre idelDuration is sane
if idleDuration == 0 || idleDuration < defaultTimeout {
idleDuration = defaultTimeout
log.Warn("Minimum idle time has been adjusted to ", defaultTimeout)
config.Logger.Warn("Minimum idle time has been adjusted to ", defaultTimeout)
}
maxRetries := config.MaxHeartbeats
if maxRetries == 0 {
maxRetries = defaultRetries
log.Warn("Minimum number of unacked heartbeats to send before closing the connection has been adjusted to ", maxRetries)
config.Logger.Warn("Minimum number of unacked heartbeats to send before closing the connection has been adjusted to ", maxRetries)
}
m.explicitShutdown = NewBooleanFuse()
m.muxReader = &MuxReader{
f: m.f,
handler: m.config.Handler,
@ -226,7 +232,7 @@ func joinErrorsWithTimeout(errChan <-chan error, receiveCount int, timeout time.
}
func (m *Muxer) Serve() error {
logger := log.WithField("name", m.config.Name)
logger := m.config.Logger.WithField("name", m.config.Name)
errChan := make(chan error)
go func() {
errChan <- m.muxReader.run(logger)

View File

@ -310,7 +310,6 @@ func (r *MuxReader) connectionError(err error) error {
case MuxerProtocolError:
http2Code = e.h2code
}
log.Warnf("Connection error %v", http2Code)
r.sendGoAway(http2Code)
return err
}

View File

@ -21,6 +21,11 @@ type BackoffHandler struct {
// MaxRetries sets the maximum number of retries to perform. The default value
// of 0 disables retry completely.
MaxRetries uint
// RetryForever caps the exponential backoff period according to MaxRetries
// but allows you to retry indefinitely.
RetryForever bool
// BaseTime sets the initial backoff period.
BaseTime time.Duration
retries uint
resetDeadline time.Time
@ -38,25 +43,38 @@ func (b BackoffHandler) GetBackoffDuration(ctx context.Context) (time.Duration,
// b.retries would be set to 0 at this point
return time.Second, true
}
if b.retries >= b.MaxRetries {
if b.retries >= b.MaxRetries && !b.RetryForever {
return time.Duration(0), false
}
return time.Duration(time.Second * 1 << b.retries), true
return time.Duration(b.GetBaseTime() * 1 << b.retries), true
}
// Backoff is used to wait according to exponential backoff. Returns false if the
// maximum number of retries have been used or if the underlying context has been cancelled.
func (b *BackoffHandler) Backoff(ctx context.Context) bool {
// BackoffTimer returns a channel that sends the current time when the exponential backoff timeout expires.
// Returns nil if the maximum number of retries have been used.
func (b *BackoffHandler) BackoffTimer() <-chan time.Time {
if !b.resetDeadline.IsZero() && timeNow().After(b.resetDeadline) {
b.retries = 0
b.resetDeadline = time.Time{}
}
if b.retries >= b.MaxRetries {
if !b.RetryForever {
return nil
}
} else {
b.retries++
}
return timeAfter(time.Duration(b.GetBaseTime() * 1 << (b.retries - 1)))
}
// Backoff is used to wait according to exponential backoff. Returns false if the
// maximum number of retries have been used or if the underlying context has been cancelled.
func (b *BackoffHandler) Backoff(ctx context.Context) bool {
c := b.BackoffTimer()
if c == nil {
return false
}
select {
case <-timeAfter(time.Duration(time.Second * 1 << b.retries)):
b.retries++
case <-c:
return true
case <-ctx.Done():
return false
@ -66,5 +84,12 @@ func (b *BackoffHandler) Backoff(ctx context.Context) bool {
// Sets a grace period within which the the backoff timer is maintained. After the grace
// period expires, the number of retries & backoff duration is reset.
func (b *BackoffHandler) SetGracePeriod() {
b.resetDeadline = timeNow().Add(time.Duration(time.Second * 2 << b.retries))
b.resetDeadline = timeNow().Add(time.Duration(b.GetBaseTime() * 2 << b.retries))
}
func (b BackoffHandler) GetBaseTime() time.Duration {
if b.BaseTime == 0 {
return time.Second
}
return b.BaseTime
}

View File

@ -111,4 +111,38 @@ func TestGetBackoffDuration(t *testing.T) {
if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*4 {
t.Fatalf("backoff didn't return 4 seconds on third retry")
}
backoff.Backoff(ctx) // noop
if duration, ok := backoff.GetBackoffDuration(ctx); ok || duration != 0 {
t.Fatalf("backoff didn't return 0 seconds on fourth retry (exceeding limit)")
}
}
func TestBackoffRetryForever(t *testing.T) {
// make backoff return immediately
timeAfter = immediateTimeAfter
ctx := context.Background()
backoff := BackoffHandler{MaxRetries: 3, RetryForever: true}
if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second {
t.Fatalf("backoff didn't return 1 second on first retry")
}
backoff.Backoff(ctx) // noop
if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*2 {
t.Fatalf("backoff didn't return 2 seconds on second retry")
}
backoff.Backoff(ctx) // noop
if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*4 {
t.Fatalf("backoff didn't return 4 seconds on third retry")
}
if !backoff.Backoff(ctx) {
t.Fatalf("backoff refused on fourth retry despire RetryForever")
}
if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*8 {
t.Fatalf("backoff returned %v instead of 8 seconds on fourth retry", duration)
}
if !backoff.Backoff(ctx) {
t.Fatalf("backoff refused on fifth retry despire RetryForever")
}
if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*8 {
t.Fatalf("backoff returned %v instead of 8 seconds on fifth retry", duration)
}
}

View File

@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"runtime"
"strconv"
"strings"
"time"
@ -21,17 +22,19 @@ import (
log "github.com/Sirupsen/logrus"
raven "github.com/getsentry/raven-go"
"github.com/pkg/errors"
_ "github.com/prometheus/client_golang/prometheus"
rpc "zombiezen.com/go/capnproto2/rpc"
)
const (
dialTimeout = 15 * time.Second
TagHeaderNamePrefix = "Cf-Warp-Tag-"
TagHeaderNamePrefix = "Cf-Warp-Tag-"
DuplicateConnectionError = "EDUPCONN"
)
type TunnelConfig struct {
EdgeAddr string
EdgeAddrs []string
OriginUrl string
Hostname string
OriginCert []byte
@ -43,7 +46,10 @@ type TunnelConfig struct {
ReportedVersion string
LBPool string
Tags []tunnelpogs.Tag
ConnectedSignal h2mux.Signal
HAConnections int
Metrics *TunnelMetrics
MetricsUpdateFreq time.Duration
ProtocolLogger *log.Logger
}
type dialError struct {
@ -54,6 +60,12 @@ func (e dialError) Error() string {
return e.cause.Error()
}
type dupConnRegisterTunnelError struct{}
func (e dupConnRegisterTunnelError) Error() string {
return "already connected to this server"
}
type printableRegisterTunnelError struct {
cause error
permanent bool
@ -63,10 +75,10 @@ func (e printableRegisterTunnelError) Error() string {
return e.cause.Error()
}
func (c *TunnelConfig) RegistrationOptions() *tunnelpogs.RegistrationOptions {
policy := tunnelrpc.ExistingTunnelPolicy_disconnect
if c.LBPool != "" {
policy = tunnelrpc.ExistingTunnelPolicy_balance
func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string) *tunnelpogs.RegistrationOptions {
policy := tunnelrpc.ExistingTunnelPolicy_balance
if c.HAConnections <= 1 && c.LBPool == "" {
policy = tunnelrpc.ExistingTunnelPolicy_disconnect
}
return &tunnelpogs.RegistrationOptions{
ClientID: c.ClientID,
@ -75,18 +87,41 @@ func (c *TunnelConfig) RegistrationOptions() *tunnelpogs.RegistrationOptions {
ExistingTunnelPolicy: policy,
PoolID: c.LBPool,
Tags: c.Tags,
ConnectionID: connectionID,
OriginLocalIP: OriginLocalIP,
}
}
func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}) error {
func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}, connectedSignal chan struct{}) error {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-shutdownC
cancel()
}()
if config.HAConnections > 1 {
return NewSupervisor(config).Run(ctx, connectedSignal)
} else {
addrs, err := ResolveEdgeIPs(config.EdgeAddrs)
if err != nil {
return err
}
return ServeTunnelLoop(ctx, config, addrs[0], 0, connectedSignal)
}
}
func ServeTunnelLoop(ctx context.Context, config *TunnelConfig, addr *net.TCPAddr, connectionID uint8, connectedSignal chan struct{}) error {
backoff := BackoffHandler{MaxRetries: config.Retries}
// Used to close connectedSignal no more than once
connectedFuse := h2mux.NewBooleanFuse()
go func() {
if connectedFuse.Await() {
close(connectedSignal)
}
}()
// Ensure the above goroutine will terminate if we return without connecting
defer connectedFuse.Fuse(false)
for {
err, recoverable := ServeTunnel(ctx, config, &backoff)
err, recoverable := ServeTunnel(ctx, config, addr, connectionID, connectedFuse, &backoff)
if recoverable {
if duration, ok := backoff.GetBackoffDuration(ctx); ok {
log.Infof("Retrying in %s seconds", duration)
@ -101,10 +136,24 @@ func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}) error {
func ServeTunnel(
ctx context.Context,
config *TunnelConfig,
addr *net.TCPAddr,
connectionID uint8,
connectedFuse *h2mux.BooleanFuse,
backoff *BackoffHandler,
) (err error, recoverable bool) {
// Treat panics as recoverable errors
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("ServeTunnel: %v", r)
}
recoverable = true
}
}()
// Returns error from parsing the origin URL or handshake errors
handler, err := NewTunnelHandler(ctx, config)
handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID)
if err != nil {
errLog := log.WithError(err)
switch err.(type) {
@ -121,19 +170,28 @@ func ServeTunnel(
serveCtx, serveCancel := context.WithCancel(ctx)
registerErrC := make(chan error, 1)
go func() {
err := RegisterTunnel(serveCtx, handler.muxer, config)
err := RegisterTunnel(serveCtx, handler.muxer, config, connectionID, originLocalIP)
if err == nil {
config.ConnectedSignal.Signal()
connectedFuse.Fuse(true)
backoff.SetGracePeriod()
} else {
serveCancel()
}
registerErrC <- err
}()
updateMetricsTickC := time.Tick(config.MetricsUpdateFreq)
go func() {
<-serveCtx.Done()
handler.muxer.Shutdown()
for {
select {
case <-serveCtx.Done():
handler.muxer.Shutdown()
break
case <-updateMetricsTickC:
handler.UpdateMetrics()
}
}
}()
err = handler.muxer.Serve()
serveCancel()
registerErr := <-registerErrC
@ -149,6 +207,9 @@ func ServeTunnel(
return nil, false
}
return e.cause, true
} else if e, ok := registerErr.(dupConnRegisterTunnelError); ok {
log.Info("Already connected to this server, selecting a different one")
return e, true
}
// Only log errors to Sentry that may have been caused by the client side, to reduce dupes
raven.CaptureError(registerErr, nil)
@ -168,7 +229,7 @@ func IsRPCStreamResponse(headers []h2mux.Header) bool {
return true
}
func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig) error {
func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig, connectionID uint8, originLocalIP string) error {
logger := log.WithField("subsystem", "rpc")
logger.Debug("initiating RPC stream")
stream, err := muxer.OpenStream([]h2mux.Header{
@ -201,9 +262,9 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi
ctx,
config.OriginCert,
config.Hostname,
config.RegistrationOptions(),
config.RegistrationOptions(connectionID, originLocalIP),
)
LogServerInfo(logger, serverInfoPromise.Result())
LogServerInfo(logger, serverInfoPromise.Result(), connectionID, config.Metrics)
if err != nil {
// RegisterTunnel RPC failure
return err
@ -211,7 +272,9 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi
for _, logLine := range registration.LogLines {
logger.Info(logLine)
}
if registration.Err != "" {
if registration.Err == DuplicateConnectionError {
return dupConnRegisterTunnelError{}
} else if registration.Err != "" {
return printableRegisterTunnelError{
cause: fmt.Errorf("Server error: %s", registration.Err),
permanent: registration.PermanentFailure,
@ -226,7 +289,11 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi
return nil
}
func LogServerInfo(logger *log.Entry, promise tunnelrpc.ServerInfo_Promise) {
func LogServerInfo(logger *log.Entry,
promise tunnelrpc.ServerInfo_Promise,
connectionID uint8,
metrics *TunnelMetrics,
) {
serverInfoMessage, err := promise.Struct()
if err != nil {
logger.WithError(err).Warn("Failed to retrieve server information")
@ -238,56 +305,7 @@ func LogServerInfo(logger *log.Entry, promise tunnelrpc.ServerInfo_Promise) {
return
}
log.Infof("Connected to %s", serverInfo.LocationName)
}
type TunnelHandler struct {
originUrl string
muxer *h2mux.Muxer
httpClient *http.Client
tags []tunnelpogs.Tag
}
var dialer = net.Dialer{DualStack: true}
func NewTunnelHandler(ctx context.Context, config *TunnelConfig) (*TunnelHandler, error) {
url, err := validation.ValidateUrl(config.OriginUrl)
if err != nil {
return nil, fmt.Errorf("Unable to parse origin url %#v", url)
}
h := &TunnelHandler{
originUrl: url,
httpClient: &http.Client{Timeout: time.Minute},
tags: config.Tags,
}
// Inherit from parent context so we can cancel (Ctrl-C) while dialing
dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout)
// TUN-92: enforce a timeout on dial and handshake (as tls.Dial does not support one)
plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", config.EdgeAddr)
dialCancel()
if err != nil {
return nil, dialError{cause: err}
}
edgeConn := tls.Client(plaintextEdgeConn, config.TlsConfig)
edgeConn.SetDeadline(time.Now().Add(dialTimeout))
err = edgeConn.Handshake()
if err != nil {
return nil, dialError{cause: err}
}
// clear the deadline on the conn; h2mux has its own timeouts
edgeConn.SetDeadline(time.Time{})
// Establish a muxed connection with the edge
// Client mux handshake with agent server
h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
Timeout: 5 * time.Second,
Handler: h,
IsClient: true,
HeartbeatInterval: config.HeartbeatInterval,
MaxHeartbeats: config.MaxHeartbeats,
})
if err != nil {
return h, errors.New("TLS handshake error")
}
return h, err
metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName)
}
func H2RequestHeadersToH1Request(h2 []h2mux.Header, h1 *http.Request) error {
@ -327,6 +345,63 @@ func H1ResponseToH2Response(h1 *http.Response) (h2 []h2mux.Header) {
return
}
type TunnelHandler struct {
originUrl string
muxer *h2mux.Muxer
httpClient *http.Client
tags []tunnelpogs.Tag
metrics *TunnelMetrics
// connectionID is only used by metrics, and prometheus requires labels to be string
connectionID string
}
var dialer = net.Dialer{DualStack: true}
// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error
func NewTunnelHandler(ctx context.Context, config *TunnelConfig, addr string, connectionID uint8) (*TunnelHandler, string, error) {
url, err := validation.ValidateUrl(config.OriginUrl)
if err != nil {
return nil, "", fmt.Errorf("Unable to parse origin url %#v", url)
}
h := &TunnelHandler{
originUrl: url,
httpClient: &http.Client{Timeout: time.Minute},
tags: config.Tags,
metrics: config.Metrics,
connectionID: uint8ToString(connectionID),
}
// Inherit from parent context so we can cancel (Ctrl-C) while dialing
dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout)
// TUN-92: enforce a timeout on dial and handshake (as tls.Dial does not support one)
plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", addr)
dialCancel()
if err != nil {
return nil, "", dialError{cause: err}
}
edgeConn := tls.Client(plaintextEdgeConn, config.TlsConfig)
edgeConn.SetDeadline(time.Now().Add(dialTimeout))
err = edgeConn.Handshake()
if err != nil {
return nil, "", dialError{cause: err}
}
// clear the deadline on the conn; h2mux has its own timeouts
edgeConn.SetDeadline(time.Time{})
// Establish a muxed connection with the edge
// Client mux handshake with agent server
h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{
Timeout: 5 * time.Second,
Handler: h,
IsClient: true,
HeartbeatInterval: config.HeartbeatInterval,
MaxHeartbeats: config.MaxHeartbeats,
Logger: config.ProtocolLogger,
})
if err != nil {
return h, "", errors.New("TLS handshake error")
}
return h, edgeConn.LocalAddr().String(), err
}
func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
for _, tag := range h.tags {
r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
@ -334,6 +409,7 @@ func (h *TunnelHandler) AppendTagHeaders(r *http.Request) {
}
func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
h.metrics.incrementRequests(h.connectionID)
req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream})
if err != nil {
log.WithError(err).Panic("Unexpected error from http.NewRequest")
@ -348,10 +424,22 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error {
log.WithError(err).Error("HTTP request error")
stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "502"}})
stream.Write([]byte("502 Bad Gateway"))
h.metrics.incrementResponses(h.connectionID, "502")
} else {
defer response.Body.Close()
stream.WriteHeaders(H1ResponseToH2Response(response))
io.Copy(stream, response.Body)
h.metrics.incrementResponses(h.connectionID, "200")
}
h.metrics.decrementConcurrentRequests(h.connectionID)
return nil
}
func (h *TunnelHandler) UpdateMetrics() {
flowCtlMetrics := h.muxer.FlowControlMetrics()
h.metrics.updateTunnelFlowControlMetrics(flowCtlMetrics)
}
func uint8ToString(input uint8) string {
return strconv.FormatUint(uint64(input), 10)
}

View File

@ -43,12 +43,14 @@ func UnmarshalTunnelRegistration(s tunnelrpc.TunnelRegistration) (*TunnelRegistr
}
type RegistrationOptions struct {
ClientID string `capnp:"clientId"`
Version string
OS string `capnp:"os"`
ExistingTunnelPolicy tunnelrpc.ExistingTunnelPolicy
PoolID string `capnp:"poolId"`
Tags []Tag
ClientID string `capnp:"clientId"`
Version string
OS string `capnp:"os"`
ExistingTunnelPolicy tunnelrpc.ExistingTunnelPolicy
PoolID string `capnp:"poolId"`
Tags []Tag
ConnectionID uint8 `capnp:"connectionId"`
OriginLocalIP string `capnp:"originLocalIp"`
}
func MarshalRegistrationOptions(s tunnelrpc.RegistrationOptions, p *RegistrationOptions) error {

View File

@ -31,6 +31,10 @@ struct RegistrationOptions {
poolId @4 :Text;
# Client-defined tags to associate with the tunnel
tags @5 :List(Tag);
# A unique identifier for a high-availability connection made by a single client.
connectionId @6 :UInt8;
# origin LAN IP
originLocalIp @7 :Text;
}
struct Tag {

View File

@ -240,12 +240,12 @@ type RegistrationOptions struct{ capnp.Struct }
const RegistrationOptions_TypeID = 0xc793e50592935b4a
func NewRegistrationOptions(s *capnp.Segment) (RegistrationOptions, error) {
st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 5})
st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 6})
return RegistrationOptions{st}, err
}
func NewRootRegistrationOptions(s *capnp.Segment) (RegistrationOptions, error) {
st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 5})
st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 6})
return RegistrationOptions{st}, err
}
@ -368,12 +368,39 @@ func (s RegistrationOptions) NewTags(n int32) (Tag_List, error) {
return l, err
}
func (s RegistrationOptions) ConnectionId() uint8 {
return s.Struct.Uint8(2)
}
func (s RegistrationOptions) SetConnectionId(v uint8) {
s.Struct.SetUint8(2, v)
}
func (s RegistrationOptions) OriginLocalIp() (string, error) {
p, err := s.Struct.Ptr(5)
return p.Text(), err
}
func (s RegistrationOptions) HasOriginLocalIp() bool {
p, err := s.Struct.Ptr(5)
return p.IsValid() || err != nil
}
func (s RegistrationOptions) OriginLocalIpBytes() ([]byte, error) {
p, err := s.Struct.Ptr(5)
return p.TextBytes(), err
}
func (s RegistrationOptions) SetOriginLocalIp(v string) error {
return s.Struct.SetText(5, v)
}
// RegistrationOptions_List is a list of RegistrationOptions.
type RegistrationOptions_List struct{ capnp.List }
// NewRegistrationOptions creates a new list of RegistrationOptions.
func NewRegistrationOptions_List(s *capnp.Segment, sz int32) (RegistrationOptions_List, error) {
l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 5}, sz)
l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 6}, sz)
return RegistrationOptions_List{l}, err
}
@ -1031,73 +1058,77 @@ func (p TunnelServer_getServerInfo_Results_Promise) Result() ServerInfo_Promise
return ServerInfo_Promise{Pipeline: p.Pipeline.GetPipeline(0)}
}
const schema_db8274f9144abc7e = "x\xda\x9cT_h\x1c\xd5\x1b\xfd\xce\xbd3\xbb-$" +
"\xbf\xcd0\x1bh\x17B\xa0\xe4\x87\xb6\xd0?\xb1*5" +
"\x167\x89m%1m\xf6\xc6Vk\x9b\x87N7\xb7" +
"\x9b\x89\xb33\xeb\xccl\xb4\x85\xb4Z\"R\xc1\xa2\x96" +
"\x82\x05\x11\x15\x0b*E\xfb\xa0h\xa0\x0f\xf5E\x91\"" +
"\xfa\xa0\x82\xd8\x17-E,J!\xf8\xe2\xd3\xc8\x9d\xcd" +
"\xec\x8c)\xa6\xd2\xb7;w\xbe?\xe7\x9c\xfb}g\xcb" +
"\xd3l\x90\xf5\xeb\x9bu\"\xb1]\xcfE\xdb\xeb\xdf\xbc" +
"s\xff\xd9+\xf3d\x94Xt\xfc\xd2h\xf1\xaf\xf0\xe4" +
"OD\xd8:\xc7\x8e\xc1|\x95\xe5\x89\xcc\x97\xd98!" +
"\xea\x1e\xc1\xd5\xcb\xfd\xda\xa7d\xdc\x05\"\x9d\xe7\x89\xb6" +
"\x9eg7@0\x17\xd8G\x84\xa8\xe7\x8f\xe1N\xf7\xe6" +
"\xc9\xcbd\x94\x90\x96j\x05>\xc9Ga\xd6\xd5\xd1\xb4" +
"\xb9\x0a\x1e=x\xe65\xfd\xfa\x99/I\x94\x90\x8d\xd6" +
"U\xb4\xae\xf90\xd7j\xea\xd8\xad=\x01BT\xba\xf8" +
"\xe0\x87\xc3S?^YV;\x867\xa7/\x9a\xa7T" +
"\x9e\xf9\x82\xfe\x0c!b\xd7\xad\xb5\xcf\xfd\xf0\xd0\xd5\x16" +
"\xd0\xb8\xca\xcf\xfa/ -\xda\xf3\xf8\xc1\x99\xd5s\xd7" +
"\xae-Q\x80\xfa\xf5\xbd\x1eS\xf8M/\x13\xa2\xfb\x0e" +
"\x0d\xc9\xc9m\xfbo\x90Q\xe2\xffPcun\x00\xe6" +
"\xda\x9cj\xd2\x9d{\xd1\xac\xabSt\xfa\xf8\x8e\xf1\x07" +
"\xd6}\xbe\x98-\xb7/\xb7\xa8\xca\xd99U\xee\xc8\xb6" +
"\xdf\x1f\xf9\xff\xe9/\x16\x97\xa1\x8e\x03O\xe56\xc0<" +
"\x17W<\xab\x82o\xeez\xf3\xbbR\xa1\xf4\xe72=" +
"b\xf5\x16r30\xbf\x8ec\xbf\xca\xfdJ\x1b\xa3\xb0" +
"\xe9\xba\xd2\xf1\x1bZusr\xacn\xaaZ\x0d\xb71" +
"\xb0\xf3Y;\x08m\xb7\xb67\xbe/W<\xc7\xae\x1e" +
"\xad\x00\xa2\x03\x8c\xc8\xe8\x19 \x02\x8c\xee\x03D`\x86" +
"1LT\xb6k\xae\xe7\xcbh\xca\x0e\xaa\x9e\xebJ\xe2" +
"\xd5\xf0\xc4a\xcb\xb1\xdc\xaal7\xca\xdd\xda\xa8\xd5\xe0" +
"1\xe9\xcfJ\x7f\x93/kv\x10J\xbfu\xd9W\xb1" +
"\x0a\xbeU\x0fD\x07\xd7\x884\x10\x19;\x0f\x10\x89\x1d" +
"\x1c\xa2\xc2`\x00E\xa8\xcb\xdd\xa3Db\x8cC\xecg" +
"0\x18+\xc6\x08\xf7\x0d\x13\x89\x0a\x87\x98d\x88<\xdf" +
"\xae\xd9\xee\xc3\x92\xb8\x1f\xa2\x93\x18:\x09\xd1\xb4\x17\x84" +
"\xaeU\x97D\x84\x0eb\xe8 \x9c\xf0\x1a\xa1\xed\xb9\x01" +
"\xba\xd2\xc9\"\xa0\x8b\xb0\x92VC\xcdpZ\xba\xa1]" +
"\xb5T2Q,S\x0ay\x1d\x91\x18\xe4\x10c\x19\xc8" +
"#\xf7dx$\x90w\x1fNy\xe4\x9f\x92G\x13T" +
"\xbd\xb2n\xd9N\xf2\x95\x90\x19\xa2\xfc\xa3i\xccJ\xf8" +
"&bU\xfd\x18\xddx\xa37f\xa80\xaeic<" +
"\xa7\x14|\x9dC\xbc\x9b\xc1\xf8\xb6R\xf0\x0d\x0e\xf1^" +
"\x06\xe3\xf9\x12\x91x\x8bC\\`\x00/\x82\x13\x19\xef" +
"\x7f@$.p\x88\xcf\x18\x0c\x8d\x17\xa1\x11\x19\x9f\x0c" +
"\x10\x89\x8b\x1c\xe2\x12\x83\xa1kE\xe8D\xc6\xc2\x06\"" +
"\xf11\x87\xf8\x96!\xaa:\xb6t\xc3\x91\xa9\xac\xfe\xb3" +
"\xd2\x0fl\xcfM\xbe\xb9\x17\xb4\x09\xca\xa5\x89Dk8" +
"*^A\x8d$\x0a\xa9\xf7\x10P \x94\x1b\x9e\xe7\x8c" +
"L%y\x85\xd0\xaa\x05\xf8\x1f\xa1\xc2\x81\xae\xd4\x01\x08" +
"\xea\xb2-\x1b[.[oc`\xafUS2\xadj" +
"\xcb\xb4^\xc1\xef\xe3\x10[22mTOy7\x87" +
"\xb8\x97\xa1\xa0\xe6\xa9\xfdl\xb3\x96\xd3\x94\xb7<\xd0\xed" +
"v\xa0&\xc3\xd6i\xc4=\xe2\xf5U,?o\xd5\x83" +
";\xcc\x9e\x90A\xa1\xe9\x84\x81\xd0\xda\x1c:\xd5\xbb\xac" +
"\xe2\x10E\x86\xb2/\x83\xa6\x13\xa2+\xb5\x98e\xd3\xce" +
"\xff\xad]\xb9\xd5\xa5\xa5\x8fN\xd4\xf6u$vf\xf4" +
"\x1f#f\xac\xcf#\xb5R$\xcei\xf4\xf8\xc4\x8c\xee" +
"|\x94,<\x95[e\x07\x11%\x0c\xa87\xe60\x88" +
"\x0ap\xa7\x062!{\x83\xff\xc2?q\xcd\xdb\xb3o" +
"\xf5)(d\x8a{\xa6\xee\x0c\x91\xe8\xe0\x10k\x18\"" +
"\xc7[\xf2\x82\xc2\x9e\xcc@\xac\xb4\xa3-\xc0\xc9\xa6\x16" +
"T\xb2\xaa\xdf\xd5\xaeo)\x1b\x99\xe4\x10\xd3\x99\xd9\x93" +
"\xea\xf2\x10\x87p2+j\xabe\x9e\xe6\x10\xf3\xe9\x8a" +
">\xff\x12\x91\x98\xe7\x10\xaf0\xe4\xa5\xef'\x90\xf2M" +
"?5\x16\xc7\xab\x8d\xd9\xae\x0c\xd4B.-\x8c\xfa\xa5" +
"\xd6\xa4!\xfd\xba\xe5J\x17\xe1.\xcbv\x9a\xbeT\x83" +
"B\x0c \xfc\x1d\x00\x00\xff\xff\xfa.\x1fg"
const schema_db8274f9144abc7e = "x\xda\x9cUM\x8c\x14E\x14~_U\xff@\xb2k" +
"o\xa7g\x13\x98\x84\x8c!\x18\x85\x84?Q\x03+q" +
"\x96\x150\xb3.0\xc5\x82A\xe0@3[\xcc\xf6\xda" +
"\xd3=\xe9\xeeA!\x01\x94\x90\x18M$\"\xe1\xe0\xc5" +
"\x83\x89\x17\x0f\xea\xcd\x98x\xc0\x8b\x1c8\xa8\x09\x1a\x89" +
"\x98\xa8@T\x02A7\\<\x986\xd5\xb3=\xd3." +
"\x11\x0c\xb7\x9a\xd7\xaf\xde\xfb\xbe\xf7\xbe\xfaf\xcd\x196" +
"\xca\xd6\xea\xabu\"\xb1Q7\xd2\x8d\xad\xaf\xde\x7f\xea" +
"\xdc\xc5Sd\x97Yz\xfc\xf3\xf1\xd2_\xc9\xc9\x1f\x88" +
"\xb0\xee\x18;\x0a\xe7\x0c3\x89\x9c\xb7\xd8\x0eB:\\" +
"\xc3\x95\xf3k\xb5O\xc9~\x14D:7\x89\xd6}\xc0" +
"n\x80\xe0|\xc6>&\xa4Kn\x8d\x0d\x06\xb7O\x9e" +
"'\xbb\x8c~\xa9n\xe2\x8b|\x1cNK\x1d\x1d\x8f\xab" +
"\xe4\xf1}g\xdf\xd1\xaf\x9f\xbd@\xa2\x8cb\xb6\xa1\xb2" +
"u-\x82\xb3XS\xc7a\xed\x02\x08i\xf9\x93\xa7?" +
"\x1a\x9b\xba|q^\xed\x0c\xde\xdf\xfa\xac\xb3P\xdds" +
"t\xe3eB\xca\xae\xbb\x8b_\xfd\xee\x99+]\xa0Y" +
"\x15\xd7\xf8\x05\xa4\xa5\xdb_\xd87\xb3\xf0\xd8\xd5\xabs" +
"\x14\xa0>\xed62\x0a\x9eQ%\xa4O\x1e\xd8$\xf7" +
"\xaf\xdfs\x83\xec2\xff\xd74\xde0F\xe0\xbc\x9b5" +
"9g\xbc\xee\xdcR\xa7\xf4\xf4\xf1\xcd;6,\xfdb" +
"\xb6X\xee[cV\x95\xfb=+wh\xfd\xcd\xe7\x1e" +
"9\xfd\xe5\xec<\xd4Y\xe2Bs\x05\x9c\xc5\xa6\xaa8" +
"lV\x09\xb7\xb7\xbew\xa9l\x95\xef\xcc\x9bG6\xbd" +
"\x0d\xe6\x0c\x9cmYn\xcd\xfc\x95V\xa6I'\x08\xa4" +
"\x1f\xb5\xb5\xc6\xea\xfc\xd8X\xd5p\xdbA{d\xcb+" +
"^\x9cxAsW\x16\xaf\xd6C\xdfk\x1c\xa9\x03b" +
"\x00\x8c\xc8^2B\x04\xd8\xc3{\x89\xc0l{\x8c\xa8" +
"\xea5\x830\x92\xe9\x94\x177\xc2 \x90\xc4\x1b\xc9\x89" +
"\x83\xae\xef\x06\x0d\xd9kd\xdc\xdd\xa8\xdb`RF\x87" +
"e\xb4*\x92M/Nd\xd4\x0d.\xab\xbbV\xe4\xb6" +
"b1\xc05\"\x0dD\xf6\x96\xbdDb3\x87\xa83" +
"\xd8@\x09*\xb8m\x9cHLp\x88=\x0c6c\xa5" +
"\x0c\xe1\xee1\"Q\xe7\x10\xfb\x19\xd20\xf2\x9a^\xf0" +
"\xac$\x1e%\x18$\x86AB:\x1d\xc6I\xe0\xb6$" +
"\x11a\x80\x18\x06\x08'\xc2v\xe2\x85A\x8c\xa1\xbe\xb2" +
"\x08\x18\"\xdckV\x9b:\xc9\xb4\x0c\x12\xaf\xe1\xaa\xcb" +
"D\xd9\x98\xfa\x90\x97\x12\x89Q\x0e1Q\x80\\{\xbc" +
"\xc0#\x87\xbc\xed`\x9f\x87\xf9\x92<\x92\xa3\xaa\xc8\x96" +
"\xeb\xf9\xf9\xaf\x9c\xcc&2\x9f\xef\xe7\xdc\x0b\xdf\xcel" +
"\xaaQ\x86nG\xbb\x921T\x18\x1f\xeea\xfcFM" +
"\xf0k\x0eq\xa5\x80\xf1\xb2\x9a\xe0%\x0e\xf1S\x01\xe3" +
"\x8fe\"\xf1=\x87\xb8\xc6\x00^\x02'\xb2\x7f\xfe\x90" +
"H\\\xe3\x10\x7f0\xd8\x1a/A#\xb2o\x8d\x10\x89" +
"\xdf8\xc4\x1d\x06[\xd7J\xd0\x89\xec?W\x10\x89\x9b" +
"\x1c\x93\x0b\xc0`\x1b\xac\x04C\xbd7\xcc\x10Mj\xe0" +
"\x98\x1cRqS/)m;\x83\x88\x88&\x07T|" +
"\x11\x18\xd2\x86\xef\xc9 \xa9M\x15\x17vXF\xb1\x17" +
"\x06\xf9o\x1e\xc6\xbd\x89\xc89\x09\xa3\xab\xa6zh)" +
"\x0d\xc3\xea\x9b\x15\x01\x16\xa1\xda\x0eC\xbf6\x95\xdf\xb3" +
"\x12\xb7\x19\xe3!B\x9d\x03C}\xcb \xa8`\x9a\x89" +
"\xbb\x91xd\x85Am\x0a\x061\x18\xbd\xa5L\x84T" +
"i\xb8~\xad}\xd7Z\xd8\xfc\xb5T\xda#\xbb\xdc\xa6" +
"Z\xc3\x82\xde\x1a\x96\xab\xf1,\xe3\x10k\x0akX\xa9" +
"\xa4\xf2\x18\x87x\x82\xc1Rz\xed\xc9\xe2\xb0\xebw\xe4" +
"]\x9d\xee\xf7\xc6\x9a2\xe9\x9ej\xc1\xa1pY\xdd\x8d" +
"L\xb7\x15?\xe0\xed\x9d2\xb6:~\x12\x0b\xad\xc7a" +
"P\xed}\x01\x87(1T#\x19w\xfc\x04C}\x0b" +
"\x9b\xf7\x9a\xf8\x7f\xb5\xabv\xbbt\xe7\xa3\x13\xf5\xfe7" +
"\x90\xdb\xa5\xbd\xf6(1{\xb9\x89\xbeU#wf{" +
"ID\xcc\x1e6\xd3\xdcP\xa8\xda-;\x8a4g@" +
"\x95\x8c\xc3(\xea\xc0\x83\x1a\xd4NY\x89\xff\x0f\xff\xdc" +
"\x95\xef\xcf\xbe\xdb\xc7R\xc8\x14\xf7B\xdd\x19\"1\xc0" +
"!\x161\xa4~8\xe75\xd6\xf6\x82 \xee\xe5\x01]" +
"\xc0\xb9\x13X\xea\xb2\xaa?\xd4\xab\xef*\x9b\xda\xcf!" +
"\xa6\x0b\xda\x93*x\x80C\xf8\x05\x0b\xf0\x94YLs" +
"\x88S}\x0bx\xedM\"q\x8aC\xbc\xcd`\xca(" +
"\xca!\x99\x9d\xa8o\\~\xd8\x9c\xf0\x02\x19\xab\xf7;" +
"\xf7\xbe\xd4'\xf5\xaa\xda2j\xb9\x81\x0c\x90lu=" +
"\xbf\x13I%\x14b\x00\xe1\x9f\x00\x00\x00\xff\xff^." +
"0z"
func init() {
schemas.Register(schema_db8274f9144abc7e,