From ee499dfa88106c369186167c00be42ac76d902ba Mon Sep 17 00:00:00 2001 From: Chris Branch Date: Tue, 28 Nov 2017 13:41:29 +0000 Subject: [PATCH] Release 2017.11.3 --- cmd/cloudflare-warp/linux_service.go | 6 +- cmd/cloudflare-warp/main.go | 52 ++++-- h2mux/booleanfuse.go | 31 +++- h2mux/h2mux.go | 22 ++- h2mux/muxreader.go | 1 - origin/backoffhandler.go | 41 ++++- origin/backoffhandler_test.go | 34 ++++ origin/tunnel.go | 226 +++++++++++++++++++-------- tunnelrpc/pogs/tunnelrpc.go | 14 +- tunnelrpc/tunnelrpc.capnp | 4 + tunnelrpc/tunnelrpc.capnp.go | 171 +++++++++++--------- 11 files changed, 422 insertions(+), 180 deletions(-) diff --git a/cmd/cloudflare-warp/linux_service.go b/cmd/cloudflare-warp/linux_service.go index 35c7347e..7d9816b9 100644 --- a/cmd/cloudflare-warp/linux_service.go +++ b/cmd/cloudflare-warp/linux_service.go @@ -41,7 +41,7 @@ After=network.target [Service] TimeoutStartSec=0 Type=notify -ExecStart={{ .Path }} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --autoupdate 0s +ExecStart={{ .Path }} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --no-autoupdate [Install] WantedBy=multi-user.target @@ -63,7 +63,7 @@ ExecStart=/bin/bash -c '{{ .Path }} update; code=$?; if [ $code -eq 64 ]; then s Description=Update Cloudflare Warp [Timer] -OnUnitActiveSec=1d +OnUnitActiveSec=1d [Install] WantedBy=timers.target @@ -87,7 +87,7 @@ var sysvTemplate = ServiceTemplate{ # Short-Description: Cloudflare Warp # Description: Cloudflare Warp agent ### END INIT INFO -cmd="{{.Path}} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --pidfile /var/run/$name.pid" +cmd="{{.Path}} --config /etc/cloudflare-warp/config.yml --origincert /etc/cloudflare-warp/cert.pem --pidfile /var/run/$name.pid --autoupdate-freq 24h0m0s" name=$(basename $(readlink -f $0)) pid_file="/var/run/$name.pid" stdout_log="/var/log/$name.log" diff --git a/cmd/cloudflare-warp/main.go b/cmd/cloudflare-warp/main.go index 3ab553c0..3c40bb6e 100644 --- a/cmd/cloudflare-warp/main.go +++ b/cmd/cloudflare-warp/main.go @@ -14,7 +14,6 @@ import ( "syscall" "time" - "github.com/cloudflare/cloudflare-warp/h2mux" "github.com/cloudflare/cloudflare-warp/metrics" "github.com/cloudflare/cloudflare-warp/origin" "github.com/cloudflare/cloudflare-warp/tlsconfig" @@ -84,9 +83,8 @@ WARNING: Usage: "Disable periodic check for updates, restarting the server with the new version.", Value: false, }), - altsrc.NewStringFlag(&cli.StringFlag{ + altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ Name: "edge", - Value: "cftunnel.com:7844", Usage: "Address of the Cloudflare tunnel server.", EnvVars: []string{"TUNNEL_EDGE"}, Hidden: true, @@ -149,6 +147,12 @@ WARNING: Usage: "Listen address for metrics reporting.", EnvVars: []string{"TUNNEL_METRICS"}, }), + altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: "metrics-update-freq", + Usage: "Frequency to update tunnel metrics", + Value: time.Second * 5, + EnvVars: []string{"TUNNEL_METRICS_UPDATE_FREQ"}, + }), altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ Name: "tag", Usage: "Custom tags used to identify this tunnel, in format `KEY=VALUE`. Multiple tags may be specified", @@ -169,9 +173,15 @@ WARNING: altsrc.NewStringFlag(&cli.StringFlag{ Name: "loglevel", Value: "info", - Usage: "Logging level {panic, fatal, error, warn, info, debug}", + Usage: "Application logging level {panic, fatal, error, warn, info, debug}", EnvVars: []string{"TUNNEL_LOGLEVEL"}, }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "proto-loglevel", + Value: "warn", + Usage: "Protocol logging level {panic, fatal, error, warn, info, debug}", + EnvVars: []string{"TUNNEL_PROTO_LOGLEVEL"}, + }), altsrc.NewUintFlag(&cli.UintFlag{ Name: "retries", Value: 5, @@ -188,6 +198,11 @@ WARNING: Usage: "Write the application's PID to this file after first successful connection.", EnvVars: []string{"TUNNEL_PIDFILE"}, }), + altsrc.NewIntFlag(&cli.IntFlag{ + Name: "ha-connections", + Value: 4, + Hidden: true, + }), } app.Action = func(c *cli.Context) error { raven.CapturePanic(func() { startServer(c) }, nil) @@ -259,6 +274,13 @@ func startServer(c *cli.Context) { } log.SetLevel(logLevel) + protoLogLevel, err := log.ParseLevel(c.String("proto-loglevel")) + if err != nil { + log.WithError(err).Fatal("Unknown protocol logging level specified") + } + protoLogger := log.New() + protoLogger.Level = protoLogLevel + hostname, err := validation.ValidateHostname(c.String("hostname")) if err != nil { log.WithError(err).Fatal("Invalid hostname") @@ -325,8 +347,9 @@ If you don't have a certificate signed by Cloudflare, run the command: if err != nil { log.WithError(err).Fatalf("Cannot read %s to load origin certificate", originCertPath) } + tunnelMetrics := origin.NewTunnelMetrics() tunnelConfig := &origin.TunnelConfig{ - EdgeAddr: c.String("edge"), + EdgeAddrs: c.StringSlice("edge"), OriginUrl: url, Hostname: hostname, OriginCert: originCert, @@ -338,20 +361,25 @@ If you don't have a certificate signed by Cloudflare, run the command: ReportedVersion: Version, LBPool: c.String("lb-pool"), Tags: tags, - ConnectedSignal: h2mux.NewSignal(), + HAConnections: c.Int("ha-connections"), + Metrics: tunnelMetrics, + MetricsUpdateFreq: c.Duration("metrics-update-freq"), + ProtocolLogger: protoLogger, } + connectedSignal := make(chan struct{}) tunnelConfig.TlsConfig = tlsconfig.CLIFlags{RootCA: "cacert"}.GetConfig(c) if tunnelConfig.TlsConfig.RootCAs == nil { tunnelConfig.TlsConfig.RootCAs = GetCloudflareRootCA() tunnelConfig.TlsConfig.ServerName = "cftunnel.com" - } else { - tunnelConfig.TlsConfig.ServerName, _, _ = net.SplitHostPort(tunnelConfig.EdgeAddr) + } else if len(tunnelConfig.EdgeAddrs) > 0 { + // Set for development environments and for testing specific origintunneld instances + tunnelConfig.TlsConfig.ServerName, _, _ = net.SplitHostPort(tunnelConfig.EdgeAddrs[0]) } - go writePidFile(tunnelConfig.ConnectedSignal, c.String("pidfile")) + go writePidFile(connectedSignal, c.String("pidfile")) go func() { - errC <- origin.StartTunnelDaemon(tunnelConfig, shutdownC) + errC <- origin.StartTunnelDaemon(tunnelConfig, shutdownC, connectedSignal) wg.Done() }() @@ -476,8 +504,8 @@ func generateRandomClientID() string { return hex.EncodeToString(id) } -func writePidFile(waitForSignal h2mux.Signal, pidFile string) { - waitForSignal.Wait() +func writePidFile(waitForSignal chan struct{}, pidFile string) { + <-waitForSignal daemon.SdNotify(false, "READY=1") if pidFile == "" { return diff --git a/h2mux/booleanfuse.go b/h2mux/booleanfuse.go index b4cbe8d7..8407ecc7 100644 --- a/h2mux/booleanfuse.go +++ b/h2mux/booleanfuse.go @@ -1,11 +1,19 @@ package h2mux -import "sync/atomic" +import "sync" // BooleanFuse is a data structure that can be set once to a particular value using Fuse(value). // Subsequent calls to Fuse() will have no effect. type BooleanFuse struct { value int32 + mu sync.Mutex + cond *sync.Cond +} + +func NewBooleanFuse() *BooleanFuse { + f := &BooleanFuse{} + f.cond = sync.NewCond(&f.mu) + return f } // Value gets the value @@ -13,13 +21,30 @@ func (f *BooleanFuse) Value() bool { // 0: unset // 1: set true // 2: set false - return atomic.LoadInt32(&f.value) == 1 + f.mu.Lock() + defer f.mu.Unlock() + return f.value == 1 } func (f *BooleanFuse) Fuse(result bool) { + f.mu.Lock() + defer f.mu.Unlock() newValue := int32(2) if result { newValue = 1 } - atomic.CompareAndSwapInt32(&f.value, 0, newValue) + if f.value == 0 { + f.value = newValue + f.cond.Broadcast() + } +} + +// Await blocks until Fuse has been called at least once. +func (f *BooleanFuse) Await() bool { + f.mu.Lock() + defer f.mu.Unlock() + for f.value == 0 { + f.cond.Wait() + } + return f.value == 1 } diff --git a/h2mux/h2mux.go b/h2mux/h2mux.go index d47b4059..624123b5 100644 --- a/h2mux/h2mux.go +++ b/h2mux/h2mux.go @@ -43,6 +43,8 @@ type MuxerConfig struct { HeartbeatInterval time.Duration // The minimum number of heartbeats to send before terminating the connection. MaxHeartbeats uint64 + // Logger to use + Logger *log.Logger } type Muxer struct { @@ -70,7 +72,7 @@ type Muxer struct { streams *activeStreamMap // explicitShutdown records whether the Muxer is closing because Shutdown was called, or due to another // error. - explicitShutdown BooleanFuse + explicitShutdown *BooleanFuse } type Header struct { @@ -84,6 +86,13 @@ func Handshake( r io.ReadCloser, config MuxerConfig, ) (*Muxer, error) { + // Set default config values + if config.Timeout == 0 { + config.Timeout = defaultTimeout + } + if config.Logger == nil { + config.Logger = log.New() + } // Initialise connection state fields m := &Muxer{ f: http2.NewFramer(w, r), // A framer that writes to w and reads from r @@ -97,10 +106,6 @@ func Handshake( } m.f.ReadMetaHeaders = hpack.NewDecoder(4096, func(hpack.HeaderField) {}) - if config.Timeout == 0 { - config.Timeout = defaultTimeout - } - // Initialise the settings to identify this connection and confirm the other end is sane. handshakeSetting := http2.Setting{ID: SettingMuxerMagic, Val: MuxerMagicEdge} expectedMagic := MuxerMagicOrigin @@ -134,14 +139,15 @@ func Handshake( // Sanity check to enusre idelDuration is sane if idleDuration == 0 || idleDuration < defaultTimeout { idleDuration = defaultTimeout - log.Warn("Minimum idle time has been adjusted to ", defaultTimeout) + config.Logger.Warn("Minimum idle time has been adjusted to ", defaultTimeout) } maxRetries := config.MaxHeartbeats if maxRetries == 0 { maxRetries = defaultRetries - log.Warn("Minimum number of unacked heartbeats to send before closing the connection has been adjusted to ", maxRetries) + config.Logger.Warn("Minimum number of unacked heartbeats to send before closing the connection has been adjusted to ", maxRetries) } + m.explicitShutdown = NewBooleanFuse() m.muxReader = &MuxReader{ f: m.f, handler: m.config.Handler, @@ -226,7 +232,7 @@ func joinErrorsWithTimeout(errChan <-chan error, receiveCount int, timeout time. } func (m *Muxer) Serve() error { - logger := log.WithField("name", m.config.Name) + logger := m.config.Logger.WithField("name", m.config.Name) errChan := make(chan error) go func() { errChan <- m.muxReader.run(logger) diff --git a/h2mux/muxreader.go b/h2mux/muxreader.go index b916f21a..1e49b8b0 100644 --- a/h2mux/muxreader.go +++ b/h2mux/muxreader.go @@ -310,7 +310,6 @@ func (r *MuxReader) connectionError(err error) error { case MuxerProtocolError: http2Code = e.h2code } - log.Warnf("Connection error %v", http2Code) r.sendGoAway(http2Code) return err } diff --git a/origin/backoffhandler.go b/origin/backoffhandler.go index 67c7235e..c4b3d80e 100644 --- a/origin/backoffhandler.go +++ b/origin/backoffhandler.go @@ -21,6 +21,11 @@ type BackoffHandler struct { // MaxRetries sets the maximum number of retries to perform. The default value // of 0 disables retry completely. MaxRetries uint + // RetryForever caps the exponential backoff period according to MaxRetries + // but allows you to retry indefinitely. + RetryForever bool + // BaseTime sets the initial backoff period. + BaseTime time.Duration retries uint resetDeadline time.Time @@ -38,25 +43,38 @@ func (b BackoffHandler) GetBackoffDuration(ctx context.Context) (time.Duration, // b.retries would be set to 0 at this point return time.Second, true } - if b.retries >= b.MaxRetries { + if b.retries >= b.MaxRetries && !b.RetryForever { return time.Duration(0), false } - return time.Duration(time.Second * 1 << b.retries), true + return time.Duration(b.GetBaseTime() * 1 << b.retries), true } -// Backoff is used to wait according to exponential backoff. Returns false if the -// maximum number of retries have been used or if the underlying context has been cancelled. -func (b *BackoffHandler) Backoff(ctx context.Context) bool { +// BackoffTimer returns a channel that sends the current time when the exponential backoff timeout expires. +// Returns nil if the maximum number of retries have been used. +func (b *BackoffHandler) BackoffTimer() <-chan time.Time { if !b.resetDeadline.IsZero() && timeNow().After(b.resetDeadline) { b.retries = 0 b.resetDeadline = time.Time{} } if b.retries >= b.MaxRetries { + if !b.RetryForever { + return nil + } + } else { + b.retries++ + } + return timeAfter(time.Duration(b.GetBaseTime() * 1 << (b.retries - 1))) +} + +// Backoff is used to wait according to exponential backoff. Returns false if the +// maximum number of retries have been used or if the underlying context has been cancelled. +func (b *BackoffHandler) Backoff(ctx context.Context) bool { + c := b.BackoffTimer() + if c == nil { return false } select { - case <-timeAfter(time.Duration(time.Second * 1 << b.retries)): - b.retries++ + case <-c: return true case <-ctx.Done(): return false @@ -66,5 +84,12 @@ func (b *BackoffHandler) Backoff(ctx context.Context) bool { // Sets a grace period within which the the backoff timer is maintained. After the grace // period expires, the number of retries & backoff duration is reset. func (b *BackoffHandler) SetGracePeriod() { - b.resetDeadline = timeNow().Add(time.Duration(time.Second * 2 << b.retries)) + b.resetDeadline = timeNow().Add(time.Duration(b.GetBaseTime() * 2 << b.retries)) +} + +func (b BackoffHandler) GetBaseTime() time.Duration { + if b.BaseTime == 0 { + return time.Second + } + return b.BaseTime } diff --git a/origin/backoffhandler_test.go b/origin/backoffhandler_test.go index 4e598561..c9968044 100644 --- a/origin/backoffhandler_test.go +++ b/origin/backoffhandler_test.go @@ -111,4 +111,38 @@ func TestGetBackoffDuration(t *testing.T) { if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*4 { t.Fatalf("backoff didn't return 4 seconds on third retry") } + backoff.Backoff(ctx) // noop + if duration, ok := backoff.GetBackoffDuration(ctx); ok || duration != 0 { + t.Fatalf("backoff didn't return 0 seconds on fourth retry (exceeding limit)") + } +} + +func TestBackoffRetryForever(t *testing.T) { + // make backoff return immediately + timeAfter = immediateTimeAfter + ctx := context.Background() + backoff := BackoffHandler{MaxRetries: 3, RetryForever: true} + if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second { + t.Fatalf("backoff didn't return 1 second on first retry") + } + backoff.Backoff(ctx) // noop + if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*2 { + t.Fatalf("backoff didn't return 2 seconds on second retry") + } + backoff.Backoff(ctx) // noop + if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*4 { + t.Fatalf("backoff didn't return 4 seconds on third retry") + } + if !backoff.Backoff(ctx) { + t.Fatalf("backoff refused on fourth retry despire RetryForever") + } + if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*8 { + t.Fatalf("backoff returned %v instead of 8 seconds on fourth retry", duration) + } + if !backoff.Backoff(ctx) { + t.Fatalf("backoff refused on fifth retry despire RetryForever") + } + if duration, ok := backoff.GetBackoffDuration(ctx); !ok || duration != time.Second*8 { + t.Fatalf("backoff returned %v instead of 8 seconds on fifth retry", duration) + } } diff --git a/origin/tunnel.go b/origin/tunnel.go index 358019ca..be286705 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "runtime" + "strconv" "strings" "time" @@ -21,17 +22,19 @@ import ( log "github.com/Sirupsen/logrus" raven "github.com/getsentry/raven-go" "github.com/pkg/errors" + _ "github.com/prometheus/client_golang/prometheus" rpc "zombiezen.com/go/capnproto2/rpc" ) const ( dialTimeout = 15 * time.Second - TagHeaderNamePrefix = "Cf-Warp-Tag-" + TagHeaderNamePrefix = "Cf-Warp-Tag-" + DuplicateConnectionError = "EDUPCONN" ) type TunnelConfig struct { - EdgeAddr string + EdgeAddrs []string OriginUrl string Hostname string OriginCert []byte @@ -43,7 +46,10 @@ type TunnelConfig struct { ReportedVersion string LBPool string Tags []tunnelpogs.Tag - ConnectedSignal h2mux.Signal + HAConnections int + Metrics *TunnelMetrics + MetricsUpdateFreq time.Duration + ProtocolLogger *log.Logger } type dialError struct { @@ -54,6 +60,12 @@ func (e dialError) Error() string { return e.cause.Error() } +type dupConnRegisterTunnelError struct{} + +func (e dupConnRegisterTunnelError) Error() string { + return "already connected to this server" +} + type printableRegisterTunnelError struct { cause error permanent bool @@ -63,10 +75,10 @@ func (e printableRegisterTunnelError) Error() string { return e.cause.Error() } -func (c *TunnelConfig) RegistrationOptions() *tunnelpogs.RegistrationOptions { - policy := tunnelrpc.ExistingTunnelPolicy_disconnect - if c.LBPool != "" { - policy = tunnelrpc.ExistingTunnelPolicy_balance +func (c *TunnelConfig) RegistrationOptions(connectionID uint8, OriginLocalIP string) *tunnelpogs.RegistrationOptions { + policy := tunnelrpc.ExistingTunnelPolicy_balance + if c.HAConnections <= 1 && c.LBPool == "" { + policy = tunnelrpc.ExistingTunnelPolicy_disconnect } return &tunnelpogs.RegistrationOptions{ ClientID: c.ClientID, @@ -75,18 +87,41 @@ func (c *TunnelConfig) RegistrationOptions() *tunnelpogs.RegistrationOptions { ExistingTunnelPolicy: policy, PoolID: c.LBPool, Tags: c.Tags, + ConnectionID: connectionID, + OriginLocalIP: OriginLocalIP, } } -func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}) error { +func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}, connectedSignal chan struct{}) error { ctx, cancel := context.WithCancel(context.Background()) go func() { <-shutdownC cancel() }() + if config.HAConnections > 1 { + return NewSupervisor(config).Run(ctx, connectedSignal) + } else { + addrs, err := ResolveEdgeIPs(config.EdgeAddrs) + if err != nil { + return err + } + return ServeTunnelLoop(ctx, config, addrs[0], 0, connectedSignal) + } +} + +func ServeTunnelLoop(ctx context.Context, config *TunnelConfig, addr *net.TCPAddr, connectionID uint8, connectedSignal chan struct{}) error { backoff := BackoffHandler{MaxRetries: config.Retries} + // Used to close connectedSignal no more than once + connectedFuse := h2mux.NewBooleanFuse() + go func() { + if connectedFuse.Await() { + close(connectedSignal) + } + }() + // Ensure the above goroutine will terminate if we return without connecting + defer connectedFuse.Fuse(false) for { - err, recoverable := ServeTunnel(ctx, config, &backoff) + err, recoverable := ServeTunnel(ctx, config, addr, connectionID, connectedFuse, &backoff) if recoverable { if duration, ok := backoff.GetBackoffDuration(ctx); ok { log.Infof("Retrying in %s seconds", duration) @@ -101,10 +136,24 @@ func StartTunnelDaemon(config *TunnelConfig, shutdownC <-chan struct{}) error { func ServeTunnel( ctx context.Context, config *TunnelConfig, + addr *net.TCPAddr, + connectionID uint8, + connectedFuse *h2mux.BooleanFuse, backoff *BackoffHandler, ) (err error, recoverable bool) { + // Treat panics as recoverable errors + defer func() { + if r := recover(); r != nil { + var ok bool + err, ok = r.(error) + if !ok { + err = fmt.Errorf("ServeTunnel: %v", r) + } + recoverable = true + } + }() // Returns error from parsing the origin URL or handshake errors - handler, err := NewTunnelHandler(ctx, config) + handler, originLocalIP, err := NewTunnelHandler(ctx, config, addr.String(), connectionID) if err != nil { errLog := log.WithError(err) switch err.(type) { @@ -121,19 +170,28 @@ func ServeTunnel( serveCtx, serveCancel := context.WithCancel(ctx) registerErrC := make(chan error, 1) go func() { - err := RegisterTunnel(serveCtx, handler.muxer, config) + err := RegisterTunnel(serveCtx, handler.muxer, config, connectionID, originLocalIP) if err == nil { - config.ConnectedSignal.Signal() + connectedFuse.Fuse(true) backoff.SetGracePeriod() } else { serveCancel() } registerErrC <- err }() + updateMetricsTickC := time.Tick(config.MetricsUpdateFreq) go func() { - <-serveCtx.Done() - handler.muxer.Shutdown() + for { + select { + case <-serveCtx.Done(): + handler.muxer.Shutdown() + break + case <-updateMetricsTickC: + handler.UpdateMetrics() + } + } }() + err = handler.muxer.Serve() serveCancel() registerErr := <-registerErrC @@ -149,6 +207,9 @@ func ServeTunnel( return nil, false } return e.cause, true + } else if e, ok := registerErr.(dupConnRegisterTunnelError); ok { + log.Info("Already connected to this server, selecting a different one") + return e, true } // Only log errors to Sentry that may have been caused by the client side, to reduce dupes raven.CaptureError(registerErr, nil) @@ -168,7 +229,7 @@ func IsRPCStreamResponse(headers []h2mux.Header) bool { return true } -func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig) error { +func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfig, connectionID uint8, originLocalIP string) error { logger := log.WithField("subsystem", "rpc") logger.Debug("initiating RPC stream") stream, err := muxer.OpenStream([]h2mux.Header{ @@ -201,9 +262,9 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi ctx, config.OriginCert, config.Hostname, - config.RegistrationOptions(), + config.RegistrationOptions(connectionID, originLocalIP), ) - LogServerInfo(logger, serverInfoPromise.Result()) + LogServerInfo(logger, serverInfoPromise.Result(), connectionID, config.Metrics) if err != nil { // RegisterTunnel RPC failure return err @@ -211,7 +272,9 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi for _, logLine := range registration.LogLines { logger.Info(logLine) } - if registration.Err != "" { + if registration.Err == DuplicateConnectionError { + return dupConnRegisterTunnelError{} + } else if registration.Err != "" { return printableRegisterTunnelError{ cause: fmt.Errorf("Server error: %s", registration.Err), permanent: registration.PermanentFailure, @@ -226,7 +289,11 @@ func RegisterTunnel(ctx context.Context, muxer *h2mux.Muxer, config *TunnelConfi return nil } -func LogServerInfo(logger *log.Entry, promise tunnelrpc.ServerInfo_Promise) { +func LogServerInfo(logger *log.Entry, + promise tunnelrpc.ServerInfo_Promise, + connectionID uint8, + metrics *TunnelMetrics, +) { serverInfoMessage, err := promise.Struct() if err != nil { logger.WithError(err).Warn("Failed to retrieve server information") @@ -238,56 +305,7 @@ func LogServerInfo(logger *log.Entry, promise tunnelrpc.ServerInfo_Promise) { return } log.Infof("Connected to %s", serverInfo.LocationName) -} - -type TunnelHandler struct { - originUrl string - muxer *h2mux.Muxer - httpClient *http.Client - tags []tunnelpogs.Tag -} - -var dialer = net.Dialer{DualStack: true} - -func NewTunnelHandler(ctx context.Context, config *TunnelConfig) (*TunnelHandler, error) { - url, err := validation.ValidateUrl(config.OriginUrl) - if err != nil { - return nil, fmt.Errorf("Unable to parse origin url %#v", url) - } - h := &TunnelHandler{ - originUrl: url, - httpClient: &http.Client{Timeout: time.Minute}, - tags: config.Tags, - } - // Inherit from parent context so we can cancel (Ctrl-C) while dialing - dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout) - // TUN-92: enforce a timeout on dial and handshake (as tls.Dial does not support one) - plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", config.EdgeAddr) - dialCancel() - if err != nil { - return nil, dialError{cause: err} - } - edgeConn := tls.Client(plaintextEdgeConn, config.TlsConfig) - edgeConn.SetDeadline(time.Now().Add(dialTimeout)) - err = edgeConn.Handshake() - if err != nil { - return nil, dialError{cause: err} - } - // clear the deadline on the conn; h2mux has its own timeouts - edgeConn.SetDeadline(time.Time{}) - // Establish a muxed connection with the edge - // Client mux handshake with agent server - h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{ - Timeout: 5 * time.Second, - Handler: h, - IsClient: true, - HeartbeatInterval: config.HeartbeatInterval, - MaxHeartbeats: config.MaxHeartbeats, - }) - if err != nil { - return h, errors.New("TLS handshake error") - } - return h, err + metrics.registerServerLocation(uint8ToString(connectionID), serverInfo.LocationName) } func H2RequestHeadersToH1Request(h2 []h2mux.Header, h1 *http.Request) error { @@ -327,6 +345,63 @@ func H1ResponseToH2Response(h1 *http.Response) (h2 []h2mux.Header) { return } +type TunnelHandler struct { + originUrl string + muxer *h2mux.Muxer + httpClient *http.Client + tags []tunnelpogs.Tag + metrics *TunnelMetrics + // connectionID is only used by metrics, and prometheus requires labels to be string + connectionID string +} + +var dialer = net.Dialer{DualStack: true} + +// NewTunnelHandler returns a TunnelHandler, origin LAN IP and error +func NewTunnelHandler(ctx context.Context, config *TunnelConfig, addr string, connectionID uint8) (*TunnelHandler, string, error) { + url, err := validation.ValidateUrl(config.OriginUrl) + if err != nil { + return nil, "", fmt.Errorf("Unable to parse origin url %#v", url) + } + h := &TunnelHandler{ + originUrl: url, + httpClient: &http.Client{Timeout: time.Minute}, + tags: config.Tags, + metrics: config.Metrics, + connectionID: uint8ToString(connectionID), + } + // Inherit from parent context so we can cancel (Ctrl-C) while dialing + dialCtx, dialCancel := context.WithTimeout(ctx, dialTimeout) + // TUN-92: enforce a timeout on dial and handshake (as tls.Dial does not support one) + plaintextEdgeConn, err := dialer.DialContext(dialCtx, "tcp", addr) + dialCancel() + if err != nil { + return nil, "", dialError{cause: err} + } + edgeConn := tls.Client(plaintextEdgeConn, config.TlsConfig) + edgeConn.SetDeadline(time.Now().Add(dialTimeout)) + err = edgeConn.Handshake() + if err != nil { + return nil, "", dialError{cause: err} + } + // clear the deadline on the conn; h2mux has its own timeouts + edgeConn.SetDeadline(time.Time{}) + // Establish a muxed connection with the edge + // Client mux handshake with agent server + h.muxer, err = h2mux.Handshake(edgeConn, edgeConn, h2mux.MuxerConfig{ + Timeout: 5 * time.Second, + Handler: h, + IsClient: true, + HeartbeatInterval: config.HeartbeatInterval, + MaxHeartbeats: config.MaxHeartbeats, + Logger: config.ProtocolLogger, + }) + if err != nil { + return h, "", errors.New("TLS handshake error") + } + return h, edgeConn.LocalAddr().String(), err +} + func (h *TunnelHandler) AppendTagHeaders(r *http.Request) { for _, tag := range h.tags { r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value) @@ -334,6 +409,7 @@ func (h *TunnelHandler) AppendTagHeaders(r *http.Request) { } func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error { + h.metrics.incrementRequests(h.connectionID) req, err := http.NewRequest("GET", h.originUrl, h2mux.MuxedStreamReader{MuxedStream: stream}) if err != nil { log.WithError(err).Panic("Unexpected error from http.NewRequest") @@ -348,10 +424,22 @@ func (h *TunnelHandler) ServeStream(stream *h2mux.MuxedStream) error { log.WithError(err).Error("HTTP request error") stream.WriteHeaders([]h2mux.Header{{Name: ":status", Value: "502"}}) stream.Write([]byte("502 Bad Gateway")) + h.metrics.incrementResponses(h.connectionID, "502") } else { defer response.Body.Close() stream.WriteHeaders(H1ResponseToH2Response(response)) io.Copy(stream, response.Body) + h.metrics.incrementResponses(h.connectionID, "200") } + h.metrics.decrementConcurrentRequests(h.connectionID) return nil } + +func (h *TunnelHandler) UpdateMetrics() { + flowCtlMetrics := h.muxer.FlowControlMetrics() + h.metrics.updateTunnelFlowControlMetrics(flowCtlMetrics) +} + +func uint8ToString(input uint8) string { + return strconv.FormatUint(uint64(input), 10) +} diff --git a/tunnelrpc/pogs/tunnelrpc.go b/tunnelrpc/pogs/tunnelrpc.go index df359685..cc2636f8 100644 --- a/tunnelrpc/pogs/tunnelrpc.go +++ b/tunnelrpc/pogs/tunnelrpc.go @@ -43,12 +43,14 @@ func UnmarshalTunnelRegistration(s tunnelrpc.TunnelRegistration) (*TunnelRegistr } type RegistrationOptions struct { - ClientID string `capnp:"clientId"` - Version string - OS string `capnp:"os"` - ExistingTunnelPolicy tunnelrpc.ExistingTunnelPolicy - PoolID string `capnp:"poolId"` - Tags []Tag + ClientID string `capnp:"clientId"` + Version string + OS string `capnp:"os"` + ExistingTunnelPolicy tunnelrpc.ExistingTunnelPolicy + PoolID string `capnp:"poolId"` + Tags []Tag + ConnectionID uint8 `capnp:"connectionId"` + OriginLocalIP string `capnp:"originLocalIp"` } func MarshalRegistrationOptions(s tunnelrpc.RegistrationOptions, p *RegistrationOptions) error { diff --git a/tunnelrpc/tunnelrpc.capnp b/tunnelrpc/tunnelrpc.capnp index ef89a837..a2924a9c 100644 --- a/tunnelrpc/tunnelrpc.capnp +++ b/tunnelrpc/tunnelrpc.capnp @@ -31,6 +31,10 @@ struct RegistrationOptions { poolId @4 :Text; # Client-defined tags to associate with the tunnel tags @5 :List(Tag); + # A unique identifier for a high-availability connection made by a single client. + connectionId @6 :UInt8; + # origin LAN IP + originLocalIp @7 :Text; } struct Tag { diff --git a/tunnelrpc/tunnelrpc.capnp.go b/tunnelrpc/tunnelrpc.capnp.go index 00bc7212..68e350bc 100644 --- a/tunnelrpc/tunnelrpc.capnp.go +++ b/tunnelrpc/tunnelrpc.capnp.go @@ -240,12 +240,12 @@ type RegistrationOptions struct{ capnp.Struct } const RegistrationOptions_TypeID = 0xc793e50592935b4a func NewRegistrationOptions(s *capnp.Segment) (RegistrationOptions, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 5}) + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 6}) return RegistrationOptions{st}, err } func NewRootRegistrationOptions(s *capnp.Segment) (RegistrationOptions, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 5}) + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 6}) return RegistrationOptions{st}, err } @@ -368,12 +368,39 @@ func (s RegistrationOptions) NewTags(n int32) (Tag_List, error) { return l, err } +func (s RegistrationOptions) ConnectionId() uint8 { + return s.Struct.Uint8(2) +} + +func (s RegistrationOptions) SetConnectionId(v uint8) { + s.Struct.SetUint8(2, v) +} + +func (s RegistrationOptions) OriginLocalIp() (string, error) { + p, err := s.Struct.Ptr(5) + return p.Text(), err +} + +func (s RegistrationOptions) HasOriginLocalIp() bool { + p, err := s.Struct.Ptr(5) + return p.IsValid() || err != nil +} + +func (s RegistrationOptions) OriginLocalIpBytes() ([]byte, error) { + p, err := s.Struct.Ptr(5) + return p.TextBytes(), err +} + +func (s RegistrationOptions) SetOriginLocalIp(v string) error { + return s.Struct.SetText(5, v) +} + // RegistrationOptions_List is a list of RegistrationOptions. type RegistrationOptions_List struct{ capnp.List } // NewRegistrationOptions creates a new list of RegistrationOptions. func NewRegistrationOptions_List(s *capnp.Segment, sz int32) (RegistrationOptions_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 5}, sz) + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 6}, sz) return RegistrationOptions_List{l}, err } @@ -1031,73 +1058,77 @@ func (p TunnelServer_getServerInfo_Results_Promise) Result() ServerInfo_Promise return ServerInfo_Promise{Pipeline: p.Pipeline.GetPipeline(0)} } -const schema_db8274f9144abc7e = "x\xda\x9cT_h\x1c\xd5\x1b\xfd\xce\xbd3\xbb-$" + - "\xbf\xcd0\x1bh\x17B\xa0\xe4\x87\xb6\xd0?\xb1*5" + - "\x167\x89m%1m\xf6\xc6Vk\x9b\x87N7\xb7" + - "\x9b\x89\xb33\xeb\xccl\xb4\x85\xb4Z\"R\xc1\xa2\x96" + - "\x82\x05\x11\x15\x0b*E\xfb\xa0h\xa0\x0f\xf5E\x91\"" + - "\xfa\xa0\x82\xd8\x17-E,J!\xf8\xe2\xd3\xc8\x9d\xcd" + - "\xec\x8c)\xa6\xd2\xb7;w\xbe?\xe7\x9c\xfb}g\xcb" + - "\xd3l\x90\xf5\xeb\x9bu\"\xb1]\xcfE\xdb\xeb\xdf\xbc" + - "s\xff\xd9+\xf3d\x94Xt\xfc\xd2h\xf1\xaf\xf0\xe4" + - "OD\xd8:\xc7\x8e\xc1|\x95\xe5\x89\xcc\x97\xd98!" + - "\xea\x1e\xc1\xd5\xcb\xfd\xda\xa7d\xdc\x05\"\x9d\xe7\x89\xb6" + - "\x9eg7@0\x17\xd8G\x84\xa8\xe7\x8f\xe1N\xf7\xe6" + - "\xc9\xcbd\x94\x90\x96j\x05>\xc9Ga\xd6\xd5\xd1\xb4" + - "\xb9\x0a\x1e=x\xe65\xfd\xfa\x99/I\x94\x90\x8d\xd6" + - "U\xb4\xae\xf90\xd7j\xea\xd8\xad=\x01BT\xba\xf8" + - "\xe0\x87\xc3S?^YV;\x867\xa7/\x9a\xa7T" + - "\x9e\xf9\x82\xfe\x0c!b\xd7\xad\xb5\xcf\xfd\xf0\xd0\xd5\x16" + - "\xd0\xb8\xca\xcf\xfa/ -\xda\xf3\xf8\xc1\x99\xd5s\xd7" + - "\xae-Q\x80\xfa\xf5\xbd\x1eS\xf8M/\x13\xa2\xfb\x0e" + - "\x0d\xc9\xc9m\xfbo\x90Q\xe2\xffPcun\x00\xe6" + - "\xda\x9cj\xd2\x9d{\xd1\xac\xabSt\xfa\xf8\x8e\xf1\x07" + - "\xd6}\xbe\x98-\xb7/\xb7\xa8\xca\xd99U\xee\xc8\xb6" + - "\xdf\x1f\xf9\xff\xe9/\x16\x97\xa1\x8e\x03O\xe56\xc0<" + - "\x17W<\xab\x82o\xeez\xf3\xbbR\xa1\xf4\xe72=" + - "b\xf5\x16r30\xbf\x8ec\xbf\xca\xfdJ\x1b\xa3\xb0" + - "\xe9\xba\xd2\xf1\x1bZusr\xacn\xaaZ\x0d\xb71" + - "\xb0\xf3Y;\x08m\xb7\xb67\xbe/W<\xc7\xae\x1e" + - "\xad\x00\xa2\x03\x8c\xc8\xe8\x19 \x02\x8c\xee\x03D`\x86" + - "1LT\xb6k\xae\xe7\xcbh\xca\x0e\xaa\x9e\xebJ\xe2" + - "\xd5\xf0\xc4a\xcb\xb1\xdc\xaal7\xca\xdd\xda\xa8\xd5\xe0" + - "1\xe9\xcfJ\x7f\x93/kv\x10J\xbfu\xd9W\xb1" + - "\x0a\xbeU\x0fD\x07\xd7\x884\x10\x19;\x0f\x10\x89\x1d" + - "\x1c\xa2\xc2`\x00E\xa8\xcb\xdd\xa3Db\x8cC\xecg" + - "0\x18+\xc6\x08\xf7\x0d\x13\x89\x0a\x87\x98d\x88<\xdf" + - "\xae\xd9\xee\xc3\x92\xb8\x1f\xa2\x93\x18:\x09\xd1\xb4\x17\x84" + - "\xaeU\x97D\x84\x0eb\xe8 \x9c\xf0\x1a\xa1\xed\xb9\x01" + - "\xba\xd2\xc9\"\xa0\x8b\xb0\x92VC\xcdpZ\xba\xa1]" + - "\xb5T2Q,S\x0ay\x1d\x91\x18\xe4\x10c\x19\xc8" + - "#\xf7dx$\x90w\x1fNy\xe4\x9f\x92G\x13T" + - "\xbd\xb2n\xd9N\xf2\x95\x90\x19\xa2\xfc\xa3i\xccJ\xf8" + - "&bU\xfd\x18\xddx\xa37f\xa80\xaeic<" + - "\xa7\x14|\x9dC\xbc\x9b\xc1\xf8\xb6R\xf0\x0d\x0e\xf1^" + - "\x06\xe3\xf9\x12\x91x\x8bC\\`\x00/\x82\x13\x19\xef" + - "\x7f@$.p\x88\xcf\x18\x0c\x8d\x17\xa1\x11\x19\x9f\x0c" + - "\x10\x89\x8b\x1c\xe2\x12\x83\xa1kE\xe8D\xc6\xc2\x06\"" + - "\xf11\x87\xf8\x96!\xaa:\xb6t\xc3\x91\xa9\xac\xfe\xb3" + - "\xd2\x0fl\xcfM\xbe\xb9\x17\xb4\x09\xca\xa5\x89Dk8" + - "*^A\x8d$\x0a\xa9\xf7\x10P \x94\x1b\x9e\xe7\x8c" + - "L%y\x85\xd0\xaa\x05\xf8\x1f\xa1\xc2\x81\xae\xd4\x01\x08" + - "\xea\xb2-\x1b[.[oc`\xafUS2\xadj" + - "\xcb\xb4^\xc1\xef\xe3\x10[22mTOy7\x87" + - "\xb8\x97\xa1\xa0\xe6\xa9\xfdl\xb3\x96\xd3\x94\xb7<\xd0\xed" + - "v\xa0&\xc3\xd6i\xc4=\xe2\xf5U,?o\xd5\x83" + - ";\xcc\x9e\x90A\xa1\xe9\x84\x81\xd0\xda\x1c:\xd5\xbb\xac" + - "\xe2\x10E\x86\xb2/\x83\xa6\x13\xa2+\xb5\x98e\xd3\xce" + - "\xff\xad]\xb9\xd5\xa5\xa5\x8fN\xd4\xf6u$vf\xf4" + - "\x1f#f\xac\xcf#\xb5R$\xcei\xf4\xf8\xc4\x8c\xee" + - "|\x94,<\x95[e\x07\x11%\x0c\xa87\xe60\x88" + - "\x0ap\xa7\x062!{\x83\xff\xc2?q\xcd\xdb\xb3o" + - "\xf5)(d\x8a{\xa6\xee\x0c\x91\xe8\xe0\x10k\x18\"" + - "\xc7[\xf2\x82\xc2\x9e\xcc@\xac\xb4\xa3-\xc0\xc9\xa6\x16" + - "T\xb2\xaa\xdf\xd5\xaeo)\x1b\x99\xe4\x10\xd3\x99\xd9\x93" + - "\xea\xf2\x10\x87p2+j\xabe\x9e\xe6\x10\xf3\xe9\x8a" + - ">\xff\x12\x91\x98\xe7\x10\xaf0\xe4\xa5\xef'\x90\xf2M" + - "?5\x16\xc7\xab\x8d\xd9\xae\x0c\xd4B.-\x8c\xfa\xa5" + - "\xd6\xa4!\xfd\xba\xe5J\x17\xe1.\xcbv\x9a\xbeT\x83" + - "B\x0c \xfc\x1d\x00\x00\xff\xff\xfa.\x1fg" +const schema_db8274f9144abc7e = "x\xda\x9cUM\x8c\x14E\x14~_U\xff@\xb2k" + + "o\xa7g\x13\x98\x84\x8c!\x18\x85\x84?Q\x03+q" + + "\x96\x150\xb3.0\xc5\x82A\xe0@3[\xcc\xf6\xda" + + "\xd3=\xe9\xeeA!\x01\x94\x90\x18M$\"\xe1\xe0\xc5" + + "\x83\x89\x17\x0f\xea\xcd\x98x\xc0\x8b\x1c8\xa8\x09\x1a\x89" + + "\x98\xa8@T\x02A7\\<\x986\xd5\xb3=\xd3." + + "\x11\x0c\xb7\x9a\xd7\xaf\xde\xfb\xbe\xf7\xbe\xfaf\xcd\x196" + + "\xca\xd6\xea\xabu\"\xb1Q7\xd2\x8d\xad\xaf\xde\x7f\xea" + + "\xdc\xc5Sd\x97Yz\xfc\xf3\xf1\xd2_\xc9\xc9\x1f\x88" + + "\xb0\xee\x18;\x0a\xe7\x0c3\x89\x9c\xb7\xd8\x0eB:\\" + + "\xc3\x95\xf3k\xb5O\xc9~\x14D:7\x89\xd6}\xc0" + + "n\x80\xe0|\xc6>&\xa4Kn\x8d\x0d\x06\xb7O\x9e" + + "'\xbb\x8c~\xa9n\xe2\x8b|\x1cNK\x1d\x1d\x8f\xab" + + "\xe4\xf1}g\xdf\xd1\xaf\x9f\xbd@\xa2\x8cb\xb6\xa1\xb2" + + "u-\x82\xb3XS\xc7a\xed\x02\x08i\xf9\x93\xa7?" + + "\x1a\x9b\xba|q^\xed\x0c\xde\xdf\xfa\xac\xb3P\xdds" + + "t\xe3eB\xca\xae\xbb\x8b_\xfd\xee\x99+]\xa0Y" + + "\x15\xd7\xf8\x05\xa4\xa5\xdb_\xd87\xb3\xf0\xd8\xd5\xabs" + + "\x14\xa0>\xed62\x0a\x9eQ%\xa4O\x1e\xd8$\xf7" + + "\xaf\xdfs\x83\xec2\xff\xd74\xde0F\xe0\xbc\x9b5" + + "9g\xbc\xee\xdcR\xa7\xf4\xf4\xf1\xcd;6,\xfdb" + + "\xb6X\xee[cV\x95\xfb=+wh\xfd\xcd\xe7\x1e" + + "9\xfd\xe5\xec<\xd4Y\xe2Bs\x05\x9c\xc5\xa6\xaa8" + + "lV\x09\xb7\xb7\xbew\xa9l\x95\xef\xcc\x9bG6\xbd" + + "\x0d\xe6\x0c\x9cmYn\xcd\xfc\x95V\xa6I'\x08\xa4" + + "\x1f\xb5\xb5\xc6\xea\xfc\xd8X\xd5p\xdbA{d\xcb+" + + "^\x9cxAsW\x16\xaf\xd6C\xdfk\x1c\xa9\x03b" + + "\x00\x8c\xc8^2B\x04\xd8\xc3{\x89\xc0l{\x8c\xa8" + + "\xea5\x830\x92\xe9\x94\x177\xc2 \x90\xc4\x1b\xc9\x89" + + "\x83\xae\xef\x06\x0d\xd9kd\xdc\xdd\xa8\xdb`RF\x87" + + "e\xb4*\x92M/Nd\xd4\x0d.\xab\xbbV\xe4\xb6" + + "b1\xc05\"\x0dD\xf6\x96\xbdDb3\x87\xa83" + + "\xd8@\x09*\xb8m\x9cHLp\x88=\x0c6c\xa5" + + "\x0c\xe1\xee1\"Q\xe7\x10\xfb\x19\xd20\xf2\x9a^\xf0" + + "\xac$\x1e%\x18$\x86AB:\x1d\xc6I\xe0\xb6$" + + "\x11a\x80\x18\x06\x08'\xc2v\xe2\x85A\x8c\xa1\xbe\xb2" + + "\x08\x18\"\xdckV\x9b:\xc9\xb4\x0c\x12\xaf\xe1\xaa\xcb" + + "D\xd9\x98\xfa\x90\x97\x12\x89Q\x0e1Q\x80\\{\xbc" + + "\xc0#\x87\xbc\xed`\x9f\x87\xf9\x92<\x92\xa3\xaa\xc8\x96" + + "\xeb\xf9\xf9\xaf\x9c\xcc&2\x9f\xef\xe7\xdc\x0b\xdf\xcel" + + "\xaaQ\x86nG\xbb\x921T\x18\x1f\xeea\xfcFM" + + "\xf0k\x0eq\xa5\x80\xf1\xb2\x9a\xe0%\x0e\xf1S\x01\xe3" + + "\x8fe\"\xf1=\x87\xb8\xc6\x00^\x02'\xb2\x7f\xfe\x90" + + "H\\\xe3\x10\x7f0\xd8\x1a/A#\xb2o\x8d\x10\x89" + + "\xdf8\xc4\x1d\x06[\xd7J\xd0\x89\xec?W\x10\x89\x9b" + + "\x1c\x93\x0b\xc0`\x1b\xac\x04C\xbd7\xcc\x10Mj\xe0" + + "\x98\x1cRqS/)m;\x83\x88\x88&\x07T|" + + "\x11\x18\xd2\x86\xef\xc9 \xa9M\x15\x17vXF\xb1\x17" + + "\x06\xf9o\x1e\xc6\xbd\x89\xc89\x09\xa3\xab\xa6zh)" + + "\x0d\xc3\xea\x9b\x15\x01\x16\xa1\xda\x0eC\xbf6\x95\xdf\xb3" + + "\x12\xb7\x19\xe3!B\x9d\x03C}\xcb \xa8`\x9a\x89" + + "\xbb\x91xd\x85Am\x0a\x061\x18\xbd\xa5L\x84T" + + "i\xb8~\xad}\xd7Z\xd8\xfc\xb5T\xda#\xbb\xdc\xa6" + + "Z\xc3\x82\xde\x1a\x96\xab\xf1,\xe3\x10k\x0akX\xa9" + + "\xa4\xf2\x18\x87x\x82\xc1Rz\xed\xc9\xe2\xb0\xebw\xe4" + + "]\x9d\xee\xf7\xc6\x9a2\xe9\x9ej\xc1\xa1pY\xdd\x8d" + + "L\xb7\x15?\xe0\xed\x9d2\xb6:~\x12\x0b\xad\xc7a" + + "P\xed}\x01\x87(1T#\x19w\xfc\x04C}\x0b" + + "\x9b\xf7\x9a\xf8\x7f\xb5\xabv\xbbt\xe7\xa3\x13\xf5\xfe7" + + "\x90\xdb\xa5\xbd\xf6(1{\xb9\x89\xbeU#wf{" + + "ID\xcc\x1e6\xd3\xdcP\xa8\xda-;\x8a4g@" + + "\x95\x8c\xc3(\xea\xc0\x83\x1a\xd4NY\x89\xff\x0f\xff\xdc" + + "\x95\xef\xcf\xbe\xdb\xc7R\xc8\x14\xf7B\xdd\x19\"1\xc0" + + "!\x161\xa4~8\xe75\xd6\xf6\x82 \xee\xe5\x01]" + + "\xc0\xb9\x13X\xea\xb2\xaa?\xd4\xab\xef*\x9b\xda\xcf!" + + "\xa6\x0b\xda\x93*x\x80C\xf8\x05\x0b\xf0\x94YLs" + + "\x88S}\x0bx\xedM\"q\x8aC\xbc\xcd`\xca(" + + "\xca!\x99\x9d\xa8o\\~\xd8\x9c\xf0\x02\x19\xab\xf7;" + + "\xf7\xbe\xd4'\xf5\xaa\xda2j\xb9\x81\x0c\x90lu=" + + "\xbf\x13I%\x14b\x00\xe1\x9f\x00\x00\x00\xff\xff^." + + "0z" func init() { schemas.Register(schema_db8274f9144abc7e,