From 8b43454024d6048787ba875490f7565c21a001f7 Mon Sep 17 00:00:00 2001 From: Nick Vollmar Date: Wed, 11 Dec 2019 17:04:36 -0600 Subject: [PATCH 1/4] TUN-2631: only notify that activeStreamMap is closed if ignoreNewStreams=true --- h2mux/activestreammap.go | 7 ++-- h2mux/activestreammap_test.go | 61 +++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/h2mux/activestreammap.go b/h2mux/activestreammap.go index 1138bea4..15203423 100644 --- a/h2mux/activestreammap.go +++ b/h2mux/activestreammap.go @@ -43,6 +43,7 @@ func newActiveStreamMap(useClientStreamNumbers bool, activeStreams prometheus.Ga return m } +// This function should be called while `m` is locked. func (m *activeStreamMap) notifyStreamsEmpty() { m.closeOnce.Do(func() { close(m.streamsEmptyChan) @@ -87,7 +88,9 @@ func (m *activeStreamMap) Delete(streamID uint32) { delete(m.streams, streamID) m.activeStreams.Dec() } - if len(m.streams) == 0 { + + // shutting down, and now the map is empty + if m.ignoreNewStreams && len(m.streams) == 0 { m.notifyStreamsEmpty() } } @@ -104,7 +107,7 @@ func (m *activeStreamMap) Shutdown() (done <-chan struct{}, alreadyInProgress bo } m.ignoreNewStreams = true if len(m.streams) == 0 { - // nothing to shut down + // there are no streams to wait for m.notifyStreamsEmpty() } return m.streamsEmptyChan, false diff --git a/h2mux/activestreammap_test.go b/h2mux/activestreammap_test.go index 5f7cd2cc..f961bcaf 100644 --- a/h2mux/activestreammap_test.go +++ b/h2mux/activestreammap_test.go @@ -60,6 +60,67 @@ func TestShutdown(t *testing.T) { } } +func TestEmptyBeforeShutdown(t *testing.T) { + const numStreams = 1000 + m := newActiveStreamMap(true, NewActiveStreamsMetrics("test", t.Name())) + + // Add all the streams + { + var wg sync.WaitGroup + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + go func(streamID int) { + defer wg.Done() + stream := &MuxedStream{streamID: uint32(streamID)} + ok := m.Set(stream) + assert.True(t, ok) + }(i) + } + wg.Wait() + } + assert.Equal(t, numStreams, m.Len(), "All the streams should have been added") + + // Delete all the streams, bringing m to size 0 + { + var wg sync.WaitGroup + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + go func(streamID int) { + defer wg.Done() + m.Delete(uint32(streamID)) + }(i) + } + wg.Wait() + } + assert.Equal(t, 0, m.Len(), "All the streams should have been deleted") + + // Add one stream back + const soloStreamID = uint32(0) + ok := m.Set(&MuxedStream{streamID: soloStreamID}) + assert.True(t, ok) + + shutdownChan, alreadyInProgress := m.Shutdown() + select { + case <-shutdownChan: + assert.Fail(t, "before Shutdown(), shutdownChan shouldn't be closed") + default: + } + assert.False(t, alreadyInProgress) + + shutdownChan2, alreadyInProgress2 := m.Shutdown() + assert.Equal(t, shutdownChan, shutdownChan2, "repeated calls to Shutdown() should return the same channel") + assert.True(t, alreadyInProgress2, "repeated calls to Shutdown() should return true for 'in progress'") + + // Remove the remaining stream + m.Delete(soloStreamID) + + select { + case <-shutdownChan: + default: + assert.Fail(t, "After all the streams are deleted, shutdownChan should have been closed") + } +} + type noopBuffer struct { isClosed bool } From 6aa48d2eb2ced44016db0c63a880e20d5a1b1e5e Mon Sep 17 00:00:00 2001 From: Nick Vollmar Date: Fri, 6 Dec 2019 15:32:15 -0600 Subject: [PATCH 2/4] TUN-2554: cloudflared calls ReconnectTunnel --- origin/supervisor.go | 26 ++++++--- origin/tunnel.go | 87 ++++++++++++++++++++++++++---- tunnelrpc/pogs/reconnect_tunnel.go | 10 ++-- 3 files changed, 103 insertions(+), 20 deletions(-) diff --git a/origin/supervisor.go b/origin/supervisor.go index 8bb8d046..f5d08ed4 100644 --- a/origin/supervisor.go +++ b/origin/supervisor.go @@ -31,6 +31,8 @@ const ( refreshAuthMaxBackoff = 10 // Waiting time before retrying a failed 'Authenticate' connection refreshAuthRetryDuration = time.Second * 10 + // Maximum time to make an Authenticate RPC + authTokenTimeout = time.Second * 30 ) var ( @@ -90,14 +92,21 @@ func (s *Supervisor) Run(ctx context.Context, connectedSignal *signal.Signal) er return err } var tunnelsWaiting []int + tunnelsActive := s.config.HAConnections + backoff := BackoffHandler{MaxRetries: s.config.Retries, BaseTime: tunnelRetryDuration, RetryForever: true} var backoffTimer <-chan time.Time - tunnelsActive := s.config.HAConnections refreshAuthBackoff := &BackoffHandler{MaxRetries: refreshAuthMaxBackoff, BaseTime: refreshAuthRetryDuration, RetryForever: true} var refreshAuthBackoffTimer <-chan time.Time + if s.config.UseReconnectToken { - refreshAuthBackoffTimer = time.After(refreshAuthRetryDuration) + if timer, err := s.refreshAuth(ctx, refreshAuthBackoff, s.authenticate); err == nil { + refreshAuthBackoffTimer = timer + } else { + logger.WithError(err).Errorf("initial refreshAuth failed, retrying in %v", refreshAuthRetryDuration) + refreshAuthBackoffTimer = time.After(refreshAuthRetryDuration) + } } for { @@ -169,11 +178,11 @@ func (s *Supervisor) Run(ctx context.Context, connectedSignal *signal.Signal) er } } +// Returns nil if initialization succeeded, else the initialization error. func (s *Supervisor) initialize(ctx context.Context, connectedSignal *signal.Signal) error { logger := s.logger edgeIPs, err := s.resolveEdgeIPs() - if err != nil { logger.Infof("ResolveEdgeIPs err") return err @@ -190,7 +199,6 @@ func (s *Supervisor) initialize(ctx context.Context, connectedSignal *signal.Sig select { case <-ctx.Done(): <-s.tunnelErrors - // Error can't be nil. A nil error signals that initialization succeed return ctx.Err() case tunnelError := <-s.tunnelErrors: return tunnelError.err @@ -208,7 +216,7 @@ func (s *Supervisor) initialize(ctx context.Context, connectedSignal *signal.Sig // startTunnel starts the first tunnel connection. The resulting error will be sent on // s.tunnelErrors. It will send a signal via connectedSignal if registration succeed func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal *signal.Signal) { - err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(0), 0, connectedSignal, s.cloudflaredUUID) + err := ServeTunnelLoop(ctx, s, s.config, s.getEdgeIP(0), 0, connectedSignal, s.cloudflaredUUID) defer func() { s.tunnelErrors <- tunnelError{index: 0, err: err} }() @@ -229,14 +237,14 @@ func (s *Supervisor) startFirstTunnel(ctx context.Context, connectedSignal *sign default: return } - err = ServeTunnelLoop(ctx, s.config, s.getEdgeIP(0), 0, connectedSignal, s.cloudflaredUUID) + err = ServeTunnelLoop(ctx, s, s.config, s.getEdgeIP(0), 0, connectedSignal, s.cloudflaredUUID) } } // startTunnel starts a new tunnel connection. The resulting error will be sent on // s.tunnelErrors. func (s *Supervisor) startTunnel(ctx context.Context, index int, connectedSignal *signal.Signal) { - err := ServeTunnelLoop(ctx, s.config, s.getEdgeIP(index), uint8(index), connectedSignal, s.cloudflaredUUID) + err := ServeTunnelLoop(ctx, s, s.config, s.getEdgeIP(index), uint8(index), connectedSignal, s.cloudflaredUUID) s.tunnelErrors <- tunnelError{index: index, err: err} } @@ -347,7 +355,9 @@ func (s *Supervisor) refreshAuth( s.SetReconnectToken(outcome.JWT()) return timeAfter(outcome.RefreshAfter()), nil case tunnelpogs.AuthUnknown: - return timeAfter(outcome.RefreshAfter()), nil + duration := outcome.RefreshAfter() + logger.WithError(outcome).Warnf("Retrying in %v", duration) + return timeAfter(duration), nil case tunnelpogs.AuthFail: return nil, outcome default: diff --git a/origin/tunnel.go b/origin/tunnel.go index 04b77792..3a05e347 100644 --- a/origin/tunnel.go +++ b/origin/tunnel.go @@ -78,6 +78,14 @@ type TunnelConfig struct { UseReconnectToken bool } +// ReconnectTunnelCredentialManager is invoked by functions in this file to +// get/set parameters for ReconnectTunnel RPC calls. +type ReconnectTunnelCredentialManager interface { + ReconnectToken() ([]byte, error) + EventDigest() ([]byte, error) + SetEventDigest(eventDigest []byte) +} + type dupConnRegisterTunnelError struct{} func (e dupConnRegisterTunnelError) Error() string { @@ -152,6 +160,7 @@ func StartTunnelDaemon(ctx context.Context, config *TunnelConfig, connectedSigna } func ServeTunnelLoop(ctx context.Context, + credentialManager ReconnectTunnelCredentialManager, config *TunnelConfig, addr *net.TCPAddr, connectionID uint8, @@ -173,6 +182,7 @@ func ServeTunnelLoop(ctx context.Context, for { err, recoverable := ServeTunnel( ctx, + credentialManager, config, connectionLogger, addr, connectionID, @@ -193,6 +203,7 @@ func ServeTunnelLoop(ctx context.Context, func ServeTunnel( ctx context.Context, + credentialManager ReconnectTunnelCredentialManager, config *TunnelConfig, logger *log.Entry, addr *net.TCPAddr, @@ -237,13 +248,30 @@ func ServeTunnel( errGroup, serveCtx := errgroup.WithContext(ctx) - errGroup.Go(func() error { - err := RegisterTunnel(serveCtx, handler.muxer, config, logger, connectionID, originLocalIP, u) - if err == nil { - connectedFuse.Fuse(true) - backoff.SetGracePeriod() + errGroup.Go(func() (err error) { + defer func() { + if err == nil { + connectedFuse.Fuse(true) + backoff.SetGracePeriod() + } + }() + + if config.UseReconnectToken && connectedFuse.Value() { + token, tokenErr := credentialManager.ReconnectToken() + eventDigest, eventDigestErr := credentialManager.EventDigest() + // if we have both credentials, we can reconnect + if tokenErr == nil && eventDigestErr == nil { + return ReconnectTunnel(ctx, token, eventDigest, handler.muxer, config, logger, connectionID, originLocalIP, u) + } + // log errors and proceed to RegisterTunnel + if tokenErr != nil { + logger.WithError(tokenErr).Error("Couldn't get reconnect token") + } + if eventDigestErr != nil { + logger.WithError(eventDigestErr).Error("Couldn't get event digest") + } } - return err + return RegisterTunnel(serveCtx, credentialManager, handler.muxer, config, logger, connectionID, originLocalIP, u) }) errGroup.Go(func() error { @@ -304,6 +332,7 @@ func ServeTunnel( func RegisterTunnel( ctx context.Context, + credentialManager ReconnectTunnelCredentialManager, muxer *h2mux.Muxer, config *TunnelConfig, logger *log.Entry, @@ -329,12 +358,52 @@ func RegisterTunnel( config.Hostname, config.RegistrationOptions(connectionID, originLocalIP, uuid), ) - if registrationErr := registration.DeserializeError(); registrationErr != nil { // RegisterTunnel RPC failure return processRegisterTunnelError(registrationErr, config.Metrics) } + credentialManager.SetEventDigest(registration.EventDigest) + return processRegistrationSuccess(config, logger, connectionID, registration) +} +func ReconnectTunnel( + ctx context.Context, + token []byte, + eventDigest []byte, + muxer *h2mux.Muxer, + config *TunnelConfig, + logger *log.Entry, + connectionID uint8, + originLocalIP string, + uuid uuid.UUID, +) error { + config.TransportLogger.Debug("initiating RPC stream to reconnect") + tunnelServer, err := connection.NewRPCClient(ctx, muxer, config.TransportLogger.WithField("subsystem", "rpc-reconnect"), openStreamTimeout) + if err != nil { + // RPC stream open error + return newClientRegisterTunnelError(err, config.Metrics.rpcFail) + } + defer tunnelServer.Close() + // Request server info without blocking tunnel registration; must use capnp library directly. + serverInfoPromise := tunnelrpc.TunnelServer{Client: tunnelServer.Client}.GetServerInfo(ctx, func(tunnelrpc.TunnelServer_getServerInfo_Params) error { + return nil + }) + LogServerInfo(serverInfoPromise.Result(), connectionID, config.Metrics, logger) + registration := tunnelServer.ReconnectTunnel( + ctx, + token, + eventDigest, + config.Hostname, + config.RegistrationOptions(connectionID, originLocalIP, uuid), + ) + if registrationErr := registration.DeserializeError(); registrationErr != nil { + // ReconnectTunnel RPC failure + return processRegisterTunnelError(registrationErr, config.Metrics) + } + return processRegistrationSuccess(config, logger, connectionID, registration) +} + +func processRegistrationSuccess(config *TunnelConfig, logger *log.Entry, connectionID uint8, registration *tunnelpogs.TunnelRegistration) error { for _, logLine := range registration.LogLines { logger.Info(logLine) } @@ -378,13 +447,13 @@ func processRegisterTunnelError(err tunnelpogs.TunnelRegistrationError, metrics func UnregisterTunnel(muxer *h2mux.Muxer, gracePeriod time.Duration, logger *log.Logger) error { logger.Debug("initiating RPC stream to unregister") ctx := context.Background() - ts, err := connection.NewRPCClient(ctx, muxer, logger.WithField("subsystem", "rpc-unregister"), openStreamTimeout) + tunnelServer, err := connection.NewRPCClient(ctx, muxer, logger.WithField("subsystem", "rpc-unregister"), openStreamTimeout) if err != nil { // RPC stream open error return err } // gracePeriod is encoded in int64 using capnproto - return ts.UnregisterTunnel(ctx, gracePeriod.Nanoseconds()) + return tunnelServer.UnregisterTunnel(ctx, gracePeriod.Nanoseconds()) } func LogServerInfo( diff --git a/tunnelrpc/pogs/reconnect_tunnel.go b/tunnelrpc/pogs/reconnect_tunnel.go index d3f73528..5a4c4159 100644 --- a/tunnelrpc/pogs/reconnect_tunnel.go +++ b/tunnelrpc/pogs/reconnect_tunnel.go @@ -46,7 +46,7 @@ func (c TunnelServer_PogsClient) ReconnectTunnel( eventDigest []byte, hostname string, options *RegistrationOptions, -) (*TunnelRegistration, error) { +) *TunnelRegistration { client := tunnelrpc.TunnelServer{Client: c.Client} promise := client.ReconnectTunnel(ctx, func(p tunnelrpc.TunnelServer_reconnectTunnel_Params) error { err := p.SetJwt(jwt) @@ -73,7 +73,11 @@ func (c TunnelServer_PogsClient) ReconnectTunnel( }) retval, err := promise.Result().Struct() if err != nil { - return nil, err + return NewRetryableRegistrationError(err, defaultRetryAfterSeconds).Serialize() } - return UnmarshalTunnelRegistration(retval) + registration, err := UnmarshalTunnelRegistration(retval) + if err != nil { + return NewRetryableRegistrationError(err, defaultRetryAfterSeconds).Serialize() + } + return registration } From cc2a1d12043c5c09f3ecdfeedae33564d317b3b1 Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 17 Dec 2019 09:02:28 +0800 Subject: [PATCH 3/4] bug(cloudflared): Set the MaxIdleConnsPerHost of http.Transport to proxy-keepalive-connections (#155) Setting the MaxIdleConns is not enough, the MaxIdleConnsPerHost must be set as well. Otherwise, http.Transport will use the DefaultMaxIdleConnsPerHost, which is 2, and then the connection pool will have only 2 connection hold. --- cmd/cloudflared/tunnel/configuration.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index ef24751b..c746a585 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -197,6 +197,7 @@ func prepareTunnelConfig( httpTransport := &http.Transport{ Proxy: http.ProxyFromEnvironment, MaxIdleConns: c.Int("proxy-keepalive-connections"), + MaxIdleConnsPerHost: c.Int("proxy-keepalive-connections"), IdleConnTimeout: c.Duration("proxy-keepalive-timeout"), TLSHandshakeTimeout: c.Duration("proxy-tls-timeout"), ExpectContinueTimeout: 1 * time.Second, From 789ca6f6f46a72b5138a1700e476e8c1d04528bf Mon Sep 17 00:00:00 2001 From: Niels Hofmans Date: Tue, 17 Dec 2019 02:25:17 +0100 Subject: [PATCH 4/4] refactor(docker): optimize Dockerfile (#126) * refactor(docker): optimize Dockerfile Remove obsolete upx binary compression Run as unprivileged user * Use go 1.13.3 * Use debian buster distroless --- Dockerfile | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/Dockerfile b/Dockerfile index 89e04a66..97ef11e3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,28 @@ -FROM golang:1.12 as builder +# use a builder image for building cloudflare +FROM golang:1.13.3 as builder ENV GO111MODULE=on ENV CGO_ENABLED=0 ENV GOOS=linux -WORKDIR /go/src/github.com/cloudflare/cloudflared/ -RUN apt-get update && apt-get install -y --no-install-recommends upx -# Run after `apt-get update` to improve rebuild scenarios -COPY . . -RUN make cloudflared -RUN upx --no-progress cloudflared -FROM gcr.io/distroless/base -COPY --from=builder /go/src/github.com/cloudflare/cloudflared/cloudflared /usr/local/bin/ +WORKDIR /go/src/github.com/cloudflare/cloudflared/ + +# copy our sources into the builder image +COPY . . + +# compile cloudflared +RUN make cloudflared + +# --- + +# use a distroless base image with glibc +FROM gcr.io/distroless/base-debian10:nonroot + +# copy our compiled binary +COPY --from=builder --chown=nonroot /go/src/github.com/cloudflare/cloudflared/cloudflared /usr/local/bin/ + +# run as non-privileged user +USER nonroot + +# command / entrypoint of container ENTRYPOINT ["cloudflared", "--no-autoupdate"] CMD ["version"]